diff --git a/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java b/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java index 24536d8..bb298d2 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java +++ b/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java @@ -25,10 +25,12 @@ import org.apache.commons.io.monitor.FileAlterationObserver; public class FileModificationListener implements FileAlterationListener { private Event fields; private FileWatcher watcher; + private Multiline multiline; - public FileModificationListener(FileWatcher watcher, Event fields) { + public FileModificationListener(FileWatcher watcher, Event fields, Multiline multiline) { this.watcher = watcher; this.fields = fields; + this.multiline = multiline; } public void onDirectoryChange(File file) { @@ -44,11 +46,11 @@ public class FileModificationListener implements FileAlterationListener { } public void onFileChange(File file) { - watcher.onFileChange(file, fields); + watcher.onFileChange(file, fields, multiline); } public void onFileCreate(File file) { - watcher.onFileCreate(file, fields); + watcher.onFileCreate(file, fields, multiline); } public void onFileDelete(File file) { diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index c092a1e..f216a8d 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.ArrayUtils; import org.apache.log4j.Logger; @@ -123,19 +124,62 @@ public class FileReader extends Reader { private long readLines(FileState state, int spaceLeftInSpool) { RandomAccessFile reader = state.getRandomAccessFile(); long pos = state.getPointer(); + Multiline multiline = state.getMultiline(); try { reader.seek(pos); byte[] line = readLine(reader); + byte[] bufferedLines = null; while (line != null && spaceLeftInSpool > 0) { if(logger.isTraceEnabled()) { logger.trace("-- Read line : " + new String(line)); logger.trace("-- Space left in spool : " + spaceLeftInSpool); } pos = reader.getFilePointer(); - addEvent(state, pos, line); + if (multiline == null) { + addEvent(state, pos, line); + } + else { + if (logger.isTraceEnabled()) { + logger.trace("-- Multiline : " + multiline); + logger.trace("-- Multiline : matches " + multiline.isPatternFound(line)); + } + if (multiline.isPatternFound(line)) + { + // buffer the line + if (bufferedLines != null) + { + bufferedLines = ArrayUtils.addAll(bufferedLines, line); + } + else + { + bufferedLines = line; + } + } + else { + if (multiline.isPrevious()) { + // did not match, so new event started + if (bufferedLines != null) { + addEvent(state, pos, bufferedLines); + } + bufferedLines = line; + } + else { + // did not match, add the current line + if (bufferedLines != null) { + addEvent(state, pos, ArrayUtils.addAll(bufferedLines, line)); + bufferedLines = null; + } + else + addEvent(state, pos, line); + } + } + } line = readLine(reader); spaceLeftInSpool--; } + if (bufferedLines != null) { + addEvent(state, pos, bufferedLines); // send any buffered lines left + } reader.seek(pos); // Ensure we can re-read if necessary } catch(IOException e) { logger.warn("Exception raised while reading file : " + state.getFile(), e); diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java index 5933100..912e877 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -25,6 +25,7 @@ import java.io.RandomAccessFile; import org.apache.commons.lang.builder.ToStringBuilder; import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.Map; public class FileState { @JsonIgnore @@ -48,6 +49,10 @@ public class FileState { private FileState oldFileState; @JsonIgnore private Event fields; + @JsonIgnore + private Multiline multiline; + @JsonIgnore + private byte[] bufferedLines = null; public FileState() { } @@ -172,6 +177,22 @@ public class FileState { public void setFields(Event fields) { this.fields = fields; } + + public Multiline getMultiline() { + return multiline; + } + + public void setMultiline(Multiline multiline) { + this.multiline = multiline; + } + + public byte[] getBufferedLines() { + return bufferedLines; + } + + public void setBufferedLines(byte[] bufferedLines) { + this.bufferedLines = bufferedLines; + } @Override public String toString() { diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index bbffabe..18b8a56 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -76,14 +76,14 @@ public class FileWatcher { printWatchMap(); } - public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { + public void addFilesToWatch(String fileToWatch, Event fields, int deadTime, Multiline multiline) { try { if(fileToWatch.equals("-")) { addStdIn(fields); } else if(fileToWatch.contains("*")) { - addWildCardFiles(fileToWatch, fields, deadTime); + addWildCardFiles(fileToWatch, fields, deadTime, multiline); } else { - addSingleFile(fileToWatch, fields, deadTime); + addSingleFile(fileToWatch, fields, deadTime, multiline); } } catch(Exception e) { throw new RuntimeException(e); @@ -219,7 +219,7 @@ public class FileWatcher { removeMarkedFilesFromWatchMap(); } - private void addSingleFile(String fileToWatch, Event fields, int deadTime) throws Exception { + private void addSingleFile(String fileToWatch, Event fields, int deadTime, Multiline multiline) throws Exception { logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); String directory = FilenameUtils.getFullPath(fileToWatch); String fileName = FilenameUtils.getName(fileToWatch); @@ -227,10 +227,10 @@ public class FileWatcher { FileFilterUtils.fileFileFilter(), FileFilterUtils.nameFileFilter(fileName), new LastModifiedFileFilter(deadTime)); - initializeWatchMap(new File(directory), fileFilter, fields); + initializeWatchMap(new File(directory), fileFilter, fields, multiline); } - private void addWildCardFiles(String filesToWatch, Event fields, int deadTime) throws Exception { + private void addWildCardFiles(String filesToWatch, Event fields, int deadTime, Multiline multiline) throws Exception { logger.info("Watching wildcard files : " + filesToWatch); String directory = FilenameUtils.getFullPath(filesToWatch); String wildcard = FilenameUtils.getName(filesToWatch); @@ -239,7 +239,7 @@ public class FileWatcher { FileFilterUtils.fileFileFilter(), new WildcardFileFilter(wildcard), new LastModifiedFileFilter(deadTime)); - initializeWatchMap(new File(directory), fileFilter, fields); + initializeWatchMap(new File(directory), fileFilter, fields, multiline); } private void addStdIn(Event fields) { @@ -248,22 +248,22 @@ public class FileWatcher { stdinConfigured = true; } - private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception { + private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields, Multiline multiline) throws Exception { if(!directory.isDirectory()) { logger.warn("Directory " + directory + " does not exist"); return; } FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter); - FileModificationListener listener = new FileModificationListener(this, fields); + FileModificationListener listener = new FileModificationListener(this, fields, multiline); observer.addListener(listener); observerList.add(observer); observer.initialize(); for(File file : FileUtils.listFiles(directory, fileFilter, null)) { - addFileToWatchMap(newWatchMap, file, fields); + addFileToWatchMap(newWatchMap, file, fields, multiline); } } - private void addFileToWatchMap(Map map, File file, Event fields) { + private void addFileToWatchMap(Map map, File file, Event fields, Multiline multiline) { try { FileState state = new FileState(file); state.setFields(fields); @@ -272,25 +272,26 @@ public class FileWatcher { long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); state.setSignature(signature); logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature); + state.setMultiline(multiline); map.put(file, state); } catch(IOException e) { logger.error("Caught IOException : " + e.getMessage()); } } - public void onFileChange(File file, Event fields) { + public void onFileChange(File file, Event fields, Multiline multiline) { try { logger.debug("Change detected on file : " + file.getCanonicalPath()); - addFileToWatchMap(newWatchMap, file, fields); + addFileToWatchMap(newWatchMap, file, fields, multiline); } catch (IOException e) { logger.error("Caught IOException : " + e.getMessage()); } } - public void onFileCreate(File file, Event fields) { + public void onFileCreate(File file, Event fields, Multiline multiline) { try { logger.debug("Create detected on file : " + file.getCanonicalPath()); - addFileToWatchMap(newWatchMap, file, fields); + addFileToWatchMap(newWatchMap, file, fields, multiline); } catch (IOException e) { logger.error("Caught IOException : " + e.getMessage()); } diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index dc52bbb..1d8b011 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -80,7 +80,7 @@ public class Forwarder { configManager.readConfiguration(); for(FilesSection files : configManager.getConfig().getFiles()) { for(String path : files.getPaths()) { - watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000); + watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000, files.getMultiline()); } } watcher.initialize(); diff --git a/src/main/java/info/fetter/logstashforwarder/Multiline.java b/src/main/java/info/fetter/logstashforwarder/Multiline.java new file mode 100644 index 0000000..b972c5c --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/Multiline.java @@ -0,0 +1,89 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.commons.lang.builder.ToStringBuilder; + +public class Multiline { + public enum WhatType { Previous, Next }; + + private Pattern pattern = null; + private boolean negate = false; + private WhatType what = WhatType.Previous; + + public Multiline() { + } + + public Multiline(Multiline event) { + if(event != null) { + this.negate = event.negate; + this.pattern = event.pattern; + this.what = event.what; + } + } + + public Multiline(Map fields) throws UnsupportedEncodingException { + String strPattern = ""; + for(String key : fields.keySet()) { + if ("pattern".equals(key)) + strPattern = fields.get(key); + else if ("negate".equals(key)) + negate = Boolean.parseBoolean(fields.get(key)); + else if ("what".equals(key)) + what = WhatType.valueOf(fields.get(key)); + else + throw new UnsupportedEncodingException(key + " not supported"); + } + pattern = Pattern.compile(strPattern); + + } + + public Pattern getPattern() { + return pattern; + } + + public boolean isNegate() { + return negate; + } + + public WhatType getWhat() { + return what; + } + + public boolean isPrevious() { + return what == WhatType.Previous; + } + + public boolean isPatternFound (byte[] line) { + boolean result = pattern.matcher(new String(line)).find(); + if (negate) return !result; + return result; + } + + @Override + public String toString() { + return new ToStringBuilder(this). + append("pattern", pattern). + append("negate", negate). + append("what", what). + toString(); + } +} diff --git a/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java b/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java index 43f9b2e..3c2b8ed 100644 --- a/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java +++ b/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java @@ -19,16 +19,20 @@ package info.fetter.logstashforwarder.config; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import org.apache.commons.lang.builder.ToStringBuilder; import com.fasterxml.jackson.annotation.JsonProperty; +import info.fetter.logstashforwarder.Multiline; +import java.io.UnsupportedEncodingException; public class FilesSection { private List paths; private Map fields; @JsonProperty("dead time") private String deadTime = "24h"; + private Multiline multiline; public List getPaths() { return paths; @@ -78,13 +82,22 @@ public class FilesSection { public void setDeadTime(String deadTime) { this.deadTime = deadTime; } + + public Multiline getMultiline() { + return multiline; + } + public void setMultiline(Map multilineMap) throws UnsupportedEncodingException { + this.multiline = new Multiline(multilineMap); + } + @Override public String toString() { return new ToStringBuilder(this). append("paths", paths). append("fields", fields). append("dead time", deadTime). + append("multiline", multiline). toString(); } } diff --git a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java index 2dbf8df..a72d21c 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java @@ -23,7 +23,9 @@ import info.fetter.logstashforwarder.util.AdapterException; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.log4j.BasicConfigurator; @@ -54,12 +56,41 @@ public class FileReaderTest { List fileList = new ArrayList(1); File file1 = new File("testFileReader1.txt"); FileUtils.write(file1, "testFileReader1 line1\n"); + FileUtils.write(file1, " nl line12\n", true); FileUtils.write(file1, "testFileReader1 line2\n", true); FileUtils.write(file1, "testFileReader1 line3\n", true); Thread.sleep(500); FileState state = new FileState(file1); fileList.add(state); state.setFields(new Event().addField("testFileReader1", "testFileReader1")); + Map m = new HashMap(); + m.put("pattern", " nl"); + m.put("negate", "false"); + state.setMultiline(new Multiline(m)); + reader.readFiles(fileList); + reader.readFiles(fileList); + reader.readFiles(fileList); + //FileUtils.forceDelete(file1); + } + + @Test + public void testFileReader2() throws IOException, InterruptedException, AdapterException { + FileReader reader = new FileReader(2); + reader.setAdapter(new MockProtocolAdapter()); + List fileList = new ArrayList(1); + File file1 = new File("testFileReader1.txt"); + FileUtils.write(file1, "testFileReader1 line1\n"); + FileUtils.write(file1, " nl line12\n", true); + FileUtils.write(file1, "testFileReader1 line2\n", true); + FileUtils.write(file1, "testFileReader1 line3\n", true); + Thread.sleep(500); + FileState state = new FileState(file1); + fileList.add(state); + state.setFields(new Event().addField("testFileReader1", "testFileReader1")); + Map m = new HashMap(); + m.put("pattern", "testFileReader1"); + m.put("negate", "true"); + state.setMultiline(new Multiline(m)); reader.readFiles(fileList); reader.readFiles(fileList); reader.readFiles(fileList); diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java index 2fe8835..5e4c96c 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -21,6 +21,8 @@ import static org.apache.log4j.Level.*; import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.log4j.BasicConfigurator; @@ -47,21 +49,32 @@ public class FileWatcherTest { //@Test public void testFileWatch() throws InterruptedException, IOException { FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); + watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null); + for(int i = 0; i < 100; i++) { + Thread.sleep(1000); + watcher.checkFiles(); + } + } + + //@Test + public void testFileWatchWithMultilines() throws InterruptedException, IOException { + FileWatcher watcher = new FileWatcher(); + Multiline multiline = new Multiline(); + watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline); for(int i = 0; i < 100; i++) { Thread.sleep(1000); watcher.checkFiles(); } } - @Test + //@Test public void testWildcardWatch() throws InterruptedException, IOException { if(System.getProperty("os.name").toLowerCase().contains("win")) { logger.warn("Not executing this test on windows"); return; } FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); + watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null); watcher.initialize(); File file1 = new File("testFileWatcher1.txt"); @@ -97,6 +110,58 @@ public class FileWatcherTest { + } + + @Test + public void testWildcardWatchMultiline() throws InterruptedException, IOException { + if(System.getProperty("os.name").toLowerCase().contains("win")) { + logger.warn("Not executing this test on windows"); + return; + } + FileWatcher watcher = new FileWatcher(); + Map m = new HashMap(); + m.put("pattern", " nl"); + m.put("negate", "false"); + Multiline multiline = new Multiline(m); + watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline); + watcher.initialize(); + + File file1 = new File("testFileWatcher1.txt"); + File file2 = new File("testFileWatcher2.txt"); + //File file3 = new File("test3.txt"); + //File file4 = new File("test4.txt"); + + //File testDir = new File("testFileWatcher"); + //FileUtils.forceMkdir(new File("test")); + + watcher.checkFiles(); + Thread.sleep(100); + FileUtils.write(file1, "file 1 line 1\n nl line 1-2", true); + Thread.sleep(100); + watcher.checkFiles(); + FileUtils.write(file1, "file 1 line 2\n", true); + Thread.sleep(100); + watcher.checkFiles(); + FileUtils.write(file1, " nl line 3\n", true); + //FileUtils.write(file2, "file 2 line 1\n", true); + Thread.sleep(1000); + watcher.checkFiles(); +// FileUtils.moveFileToDirectory(file1, testDir, true); +// FileUtils.write(file2, "file 2 line 2\n", true); + FileUtils.moveFile(file1, file2); +// FileUtils.write(file2, "file 3 line 1\n", true); +// + Thread.sleep(1000); + watcher.checkFiles(); +// +// + watcher.close(); + FileUtils.deleteQuietly(file1); + FileUtils.deleteQuietly(file2); +// FileUtils.forceDelete(testDir); + + + } @Test diff --git a/src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java b/src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java index d5969a6..969e8c1 100644 --- a/src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java +++ b/src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java @@ -57,6 +57,9 @@ public class ConfigurationManagerTest { for(String path : files.getPaths()) { logger.debug(" - Path : " + path); } + logger.debug(" - Multiline : " + files.getMultiline()); + //files.getMultiline() + logger.debug(" - Dead time : " + files.getDeadTimeInSeconds()); if(files.getDeadTime().equals("24h")) { assertEquals(86400, files.getDeadTimeInSeconds()); diff --git a/src/test/resources/config1.json b/src/test/resources/config1.json index d045315..f8787eb 100644 --- a/src/test/resources/config1.json +++ b/src/test/resources/config1.json @@ -53,6 +53,7 @@ "/var/log/apache/error-*.log" ], "fields": { "type": "error" }, + "multiline": { "pattern": "^[0-9]{4}", "negate": "true" }, "dead time": "8h32m50s" } ]