From fbc6bc352ef6a8ab96d9482b69eea711bd46aab2 Mon Sep 17 00:00:00 2001 From: didfet Date: Thu, 19 Mar 2015 18:25:15 +0100 Subject: [PATCH] Added -signaturelength option Bug corrections on saved state loading Bug corrections on file reading --- pom.xml | 2 +- .../fetter/logstashforwarder/FileReader.java | 35 ++--- .../fetter/logstashforwarder/FileState.java | 58 ++++---- .../fetter/logstashforwarder/FileWatcher.java | 124 ++++++++---------- .../fetter/logstashforwarder/Forwarder.java | 11 ++ .../logstashforwarder/FileWatcherTest.java | 42 +++--- 6 files changed, 137 insertions(+), 135 deletions(-) diff --git a/pom.xml b/pom.xml index 8141ec5..5445c6f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 logstash-forwarder-java logstash-forwarder-java - 0.1.1 + 0.1.2-SNAPSHOT logstash-forwarder-java Java version of logstash forwarder https://github.com/didfet/logstash-forwarder-java diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 8623237..de93faa 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -52,7 +52,7 @@ public class FileReader { eventList = new ArrayList(spoolSize); } - public int readFiles(Collection fileList) throws IOException, AdapterException { + public int readFiles(Collection fileList) throws AdapterException { int eventCount = 0; if(logger.isTraceEnabled()) { logger.trace("Reading " + fileList.size() + " file(s)"); @@ -71,12 +71,12 @@ public class FileReader { return eventCount; // Return number of events sent to adapter } - private int readFile(FileState state, int spaceLeftInSpool) throws IOException { + private int readFile(FileState state, int spaceLeftInSpool) { int eventListSizeBefore = eventList.size(); File file = state.getFile(); long pointer = state.getPointer(); if(logger.isTraceEnabled()) { - logger.trace("File : " + file.getCanonicalPath() + " pointer : " + pointer); + logger.trace("File : " + file + " pointer : " + pointer); logger.trace("Space left in spool : " + spaceLeftInSpool); } pointer = readLines(state, spaceLeftInSpool); @@ -84,22 +84,27 @@ public class FileReader { return eventList.size() - eventListSizeBefore; // Return number of events read } - private long readLines(FileState state, int spaceLeftInSpool) throws IOException { + private long readLines(FileState state, int spaceLeftInSpool) { RandomAccessFile reader = state.getRandomAccessFile(); long pos = state.getPointer(); - reader.seek(pos); - String line = readLine(reader); - while (line != null && spaceLeftInSpool > 0) { - if(logger.isTraceEnabled()) { - logger.trace("-- Read line : " + line); - logger.trace("-- Space left in spool : " + spaceLeftInSpool); + try { + reader.seek(pos); + String line = readLine(reader); + while (line != null && spaceLeftInSpool > 0) { + if(logger.isTraceEnabled()) { + logger.trace("-- Read line : " + line); + logger.trace("-- Space left in spool : " + spaceLeftInSpool); + } + pos = reader.getFilePointer(); + addEvent(state, pos, line); + line = readLine(reader); + spaceLeftInSpool--; } - pos = reader.getFilePointer(); - addEvent(state, pos, line); - line = readLine(reader); - spaceLeftInSpool--; + reader.seek(pos); // Ensure we can re-read if necessary + } catch(IOException e) { + logger.warn("Exception raised while reading file : " + state.getFile()); + e.printStackTrace(); } - reader.seek(pos); // Ensure we can re-read if necessary return pos; } diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java index 832676a..53e38da 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -18,6 +18,7 @@ package info.fetter.logstashforwarder; */ import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; @@ -47,10 +48,10 @@ public class FileState { private FileState oldFileState; @JsonIgnore private Event fields; - + public FileState() { } - + public FileState(File file) throws IOException { this.file = file; directory = file.getCanonicalFile().getParent(); @@ -59,9 +60,16 @@ public class FileState { lastModified = file.lastModified(); size = file.length(); } - - private void setFileFromDirectoryAndName() { - this.file = new File(directory + File.separator + fileName); + + private void setFileFromDirectoryAndName() throws FileNotFoundException { + file = new File(directory + File.separator + fileName); + if(file.exists()) { + randomAccessFile = new RandomAccessFile(file, "r"); + lastModified = file.lastModified(); + size = file.length(); + } else { + deleted = true; + } } public File getFile() { @@ -75,41 +83,41 @@ public class FileState { public long getSize() { return size; } - + public String getDirectory() { return directory; } - - public void setDirectory(String directory) { + + public void setDirectory(String directory) throws FileNotFoundException { this.directory = directory; if(fileName != null && directory != null) { setFileFromDirectoryAndName(); } } - + public String getFileName() { return fileName; } - - public void setFileName(String fileName) { + + public void setFileName(String fileName) throws FileNotFoundException { this.fileName = fileName; if(fileName != null && directory != null) { setFileFromDirectoryAndName(); } } - + public boolean isDeleted() { return deleted; } - + public void setDeleted() { deleted = true; } - + public boolean hasChanged() { return changed; } - + public void setChanged(boolean changed) { this.changed = changed; } @@ -129,7 +137,7 @@ public class FileState { public long getPointer() { return pointer; } - + public void setPointer(long pointer) { this.pointer = pointer; } @@ -157,16 +165,16 @@ public class FileState { public void setFields(Event fields) { this.fields = fields; } - + @Override public String toString() { - return new ToStringBuilder(this). - append("fileName", fileName). - append("directory", directory). - append("pointer", pointer). - append("signature", signature). - append("signatureLength", signatureLength). - toString(); + return new ToStringBuilder(this). + append("fileName", fileName). + append("directory", directory). + append("pointer", pointer). + append("signature", signature). + append("signatureLength", signatureLength). + toString(); } - + } diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index 48c4917..db60ce9 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -39,10 +39,10 @@ public class FileWatcher { private static final Logger logger = Logger.getLogger(FileWatcher.class); private List observerList = new ArrayList(); public static final int ONE_DAY = 24 * 3600 * 1000; - private Map watchMap = new HashMap(); - private Map changedWatchMap = new HashMap(); + private Map oldWatchMap = new HashMap(); + private Map newWatchMap = new HashMap(); private FileState[] savedStates; - private static int MAX_SIGNATURE_LENGTH = 1024; + private int maxSignatureLength; public FileWatcher() { try { @@ -52,6 +52,17 @@ public class FileWatcher { } } + public void initialize() throws IOException { + logger.debug("Initializing FileWatcher"); + if(savedStates != null) { + for(FileState state : savedStates) { + oldWatchMap.put(state.getFile(), state); + } + } + processModifications(); + printWatchMap(); + } + public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { try { if(fileToWatch.equals("-")) { @@ -79,15 +90,15 @@ public class FileWatcher { public int readFiles(FileReader reader) throws IOException, AdapterException { logger.debug("Reading files"); logger.trace("=============="); - int numberOfLinesRead = reader.readFiles(watchMap.values()); - Registrar.writeStateToJson(watchMap.values()); + int numberOfLinesRead = reader.readFiles(oldWatchMap.values()); + Registrar.writeStateToJson(oldWatchMap.values()); return numberOfLinesRead; } private void processModifications() throws IOException { - for(File file : changedWatchMap.keySet()) { - FileState state = changedWatchMap.get(file); + for(File file : newWatchMap.keySet()) { + FileState state = newWatchMap.get(file); if(logger.isTraceEnabled()) { logger.trace("Checking file : " + file.getCanonicalPath()); logger.trace("-- Last modified : " + state.getLastModified()); @@ -97,7 +108,7 @@ public class FileWatcher { } logger.trace("Determine if file has just been written to"); - FileState oldState = watchMap.get(file); + FileState oldState = oldWatchMap.get(file); if(oldState != null) { if(oldState.getSize() > state.getSize()) { logger.trace("File shorter : file can't be the same"); @@ -123,8 +134,8 @@ public class FileWatcher { if(state.getOldFileState() == null) { logger.trace("Determine if file has been renamed and/or written to"); - for(File otherFile : watchMap.keySet()) { - FileState otherState = watchMap.get(otherFile); + for(File otherFile : oldWatchMap.keySet()) { + FileState otherState = oldWatchMap.get(otherFile); if(otherState != null && state.getSize() >= otherState.getSize() && state.getDirectory().equals(otherState.getDirectory())) { if(logger.isTraceEnabled()) { logger.trace("Comparing to : " + otherFile.getCanonicalPath()); @@ -151,29 +162,31 @@ public class FileWatcher { } logger.trace("Refreshing file state"); - for(File file : changedWatchMap.keySet()) { + for(File file : newWatchMap.keySet()) { if(logger.isTraceEnabled()) { logger.trace("Refreshing file : " + file.getCanonicalPath()); } - FileState state = changedWatchMap.get(file); + FileState state = newWatchMap.get(file); FileState oldState = state.getOldFileState(); if(oldState == null) { logger.trace("File has been truncated or created, not retrieving pointer"); } else { logger.trace("File has not been truncated or created, retrieving pointer"); state.setPointer(oldState.getPointer()); - oldState.getRandomAccessFile().close(); + try { + oldState.getRandomAccessFile().close(); + } catch(Exception e) {} } } logger.trace("Replacing old state"); - for(File file : changedWatchMap.keySet()) { - FileState state = changedWatchMap.get(file); - watchMap.put(file, state); + for(File file : newWatchMap.keySet()) { + FileState state = newWatchMap.get(file); + oldWatchMap.put(file, state); } // Truncating changedWatchMap - changedWatchMap.clear(); + newWatchMap.clear(); removeMarkedFilesFromWatchMap(); } @@ -216,46 +229,7 @@ public class FileWatcher { observerList.add(observer); observer.initialize(); for(File file : FileUtils.listFiles(directory, fileFilter, null)) { - FileState savedState = null; - if(savedStates != null) { - for(FileState state : savedStates) { - logger.trace("Comparing file : " + file + " with saved file : " + state.getFile()); - if(file.equals(state.getFile())) { - savedState = state; - logger.debug("Match found with saved file " + state.getFile()); - } - } - } - if(savedState == null) { - addFileToWatchMap(watchMap, file, fields); - } else { - addSavedFileToWatchMap(savedState, fields); - } - } - } - - private void addSavedFileToWatchMap(FileState savedFileState, Event fields) { - try { - File file = savedFileState.getFile(); - FileState state = new FileState(file); - state.setFields(fields); - int savedSignatureLength = savedFileState.getSignatureLength(); - state.setSignatureLength(savedSignatureLength); - long savedSignature = savedFileState.getSignature(); - int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize()); - long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); - if(signature == savedSignature) { - state.setPointer(savedFileState.getPointer()); - logger.debug("Restoring signature of size : " + savedSignatureLength + " on file : " + file + " : " + savedSignature); - } else { - logger.debug("File " + file + " signature has changed"); - logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature); - } - state.setSignatureLength(signatureLength); - state.setSignature(signature); - watchMap.put(file, state); - } catch (IOException e) { - logger.error("Caught IOException : " + e.getMessage()); + addFileToWatchMap(newWatchMap, file, fields); } } @@ -263,7 +237,7 @@ public class FileWatcher { try { FileState state = new FileState(file); state.setFields(fields); - int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize()); + int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize()); state.setSignatureLength(signatureLength); long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); state.setSignature(signature); @@ -277,7 +251,7 @@ public class FileWatcher { public void onFileChange(File file, Event fields) { try { logger.debug("Change detected on file : " + file.getCanonicalPath()); - addFileToWatchMap(changedWatchMap, file, fields); + addFileToWatchMap(newWatchMap, file, fields); } catch (IOException e) { logger.error("Caught IOException : " + e.getMessage()); } @@ -286,7 +260,7 @@ public class FileWatcher { public void onFileCreate(File file, Event fields) { try { logger.debug("Create detected on file : " + file.getCanonicalPath()); - addFileToWatchMap(changedWatchMap, file, fields); + addFileToWatchMap(newWatchMap, file, fields); } catch (IOException e) { logger.error("Caught IOException : " + e.getMessage()); } @@ -295,7 +269,7 @@ public class FileWatcher { public void onFileDelete(File file) { try { logger.debug("Delete detected on file : " + file.getCanonicalPath()); - watchMap.get(file).setDeleted(); + oldWatchMap.get(file).setDeleted(); } catch (IOException e) { logger.error("Caught IOException : " + e.getMessage()); } @@ -304,8 +278,8 @@ public class FileWatcher { private void printWatchMap() throws IOException { if(logger.isTraceEnabled()) { logger.trace("WatchMap contents : "); - for(File file : watchMap.keySet()) { - FileState state = watchMap.get(file); + for(File file : oldWatchMap.keySet()) { + FileState state = oldWatchMap.get(file); logger.trace("\tFile : " + file.getCanonicalPath() + " marked for deletion : " + state.isDeleted()); } } @@ -314,8 +288,8 @@ public class FileWatcher { private void removeMarkedFilesFromWatchMap() throws IOException { logger.trace("Removing deleted files from watchMap"); List markedList = null; - for(File file : watchMap.keySet()) { - FileState state = watchMap.get(file); + for(File file : oldWatchMap.keySet()) { + FileState state = oldWatchMap.get(file); if(state.isDeleted()) { if(markedList == null) { markedList = new ArrayList(); @@ -325,19 +299,29 @@ public class FileWatcher { } if(markedList != null) { for(File file : markedList) { - FileState state = watchMap.remove(file); - state.getRandomAccessFile().close(); - logger.trace("\tFile : " + file.getCanonicalFile() + " removed"); + FileState state = oldWatchMap.remove(file); + try { + state.getRandomAccessFile().close(); + } catch(Exception e) {} + logger.trace("\tFile : " + file + " removed"); } } } public void close() throws IOException { logger.debug("Closing all files"); - for(File file : watchMap.keySet()) { - FileState state = watchMap.get(file); + for(File file : oldWatchMap.keySet()) { + FileState state = oldWatchMap.get(file); state.getRandomAccessFile().close(); } } + public int getMaxSignatureLength() { + return maxSignatureLength; + } + + public void setMaxSignatureLength(int maxSignatureLength) { + this.maxSignatureLength = maxSignatureLength; + } + } diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index 46dc469..faebcd7 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -52,6 +52,7 @@ public class Forwarder { private static Level logLevel = INFO; private static ProtocolAdapter adapter; private static Random random = new Random(); + private static int signatureLength = 4096; public static void main(String[] args) { try { @@ -62,6 +63,7 @@ public class Forwarder { // Logger.getLogger(FileReader.class).setLevel(TRACE); // Logger.getLogger(FileReader.class).setAdditivity(false); watcher = new FileWatcher(); + watcher.setMaxSignatureLength(signatureLength); configManager = new ConfigurationManager(config); configManager.readConfiguration(); for(FilesSection files : configManager.getConfig().getFiles()) { @@ -69,6 +71,7 @@ public class Forwarder { watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY); } } + watcher.initialize(); reader = new FileReader(spoolSize); connectToServer(); infiniteLoop(); @@ -128,6 +131,10 @@ public class Forwarder { .isRequired() .withDescription("path to logstash-forwarder configuration file") .create("config"); + Option signatureLengthOption = OptionBuilder.withArgName("signature length") + .hasArg() + .withDescription("Maximum length of file signature") + .create("signaturelength"); options.addOption(helpOption) .addOption(idleTimeoutOption) @@ -135,6 +142,7 @@ public class Forwarder { .addOption(quiet) .addOption(debug) .addOption(trace) + .addOption(signatureLengthOption) .addOption(configOption); CommandLineParser parser = new GnuParser(); try { @@ -148,6 +156,9 @@ public class Forwarder { if(line.hasOption("config")) { config = line.getOptionValue("config"); } + if(line.hasOption("signaturelength")) { + signatureLength = Integer.parseInt(line.getOptionValue("signaturelength")); + } if(line.hasOption("quiet")) { logLevel = ERROR; } diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java index 5b433e6..04f9a08 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -54,17 +54,18 @@ public class FileWatcherTest { } } - //@Test + @Test public void testWildcardWatch() throws InterruptedException, IOException { FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); + watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); + watcher.initialize(); - File file1 = new File("test1.txt"); - File file2 = new File("test2.txt"); + File file1 = new File("testFileWatcher1.txt"); + File file2 = new File("testFileWatcher2.txt"); //File file3 = new File("test3.txt"); //File file4 = new File("test4.txt"); - File testDir = new File("test"); + //File testDir = new File("testFileWatcher"); //FileUtils.forceMkdir(new File("test")); watcher.checkFiles(); @@ -76,29 +77,22 @@ public class FileWatcherTest { FileUtils.write(file2, "file 2 line 1\n", true); Thread.sleep(1000); watcher.checkFiles(); - FileUtils.moveFileToDirectory(file1, testDir, true); - FileUtils.write(file2, "file 2 line 2\n", true); - FileUtils.moveFile(file2, file1); - FileUtils.write(file2, "file 3 line 1\n", true); -// FileUtils.touch(file3); +// FileUtils.moveFileToDirectory(file1, testDir, true); +// FileUtils.write(file2, "file 2 line 2\n", true); +// FileUtils.moveFile(file2, file1); +// FileUtils.write(file2, "file 3 line 1\n", true); +// +// Thread.sleep(1000); +// watcher.checkFiles(); +// +// +// watcher.close(); // FileUtils.forceDelete(file1); // FileUtils.forceDelete(file2); - Thread.sleep(1000); - watcher.checkFiles(); +// FileUtils.forceDelete(testDir); - watcher.close(); - FileUtils.forceDelete(file1); - FileUtils.forceDelete(file2); - FileUtils.forceDelete(testDir); - - -// FileUtils.moveFile(file3, file4); -// FileUtils.touch(file3); -// Thread.sleep(500); -// watcher.checkFiles(); -// FileUtils.forceDelete(file3); -// FileUtils.forceDelete(file4); + } @Test