From 9b7c7b0760f6d6d520eccbbc550f7c6e00afedce Mon Sep 17 00:00:00 2001 From: didfet Date: Thu, 12 Mar 2015 19:21:52 +0100 Subject: [PATCH] Implemented FileWatcher. --- .../fetter/logstashforwarder/FileState.java | 70 +++++- .../fetter/logstashforwarder/FileWatcher.java | 223 +++++++++++++++--- .../logstashforwarder/FileWatcherTest.java | 71 ++++++ 3 files changed, 330 insertions(+), 34 deletions(-) create mode 100644 src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java index d99964a..6197dbe 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -19,19 +19,27 @@ package info.fetter.logstashforwarder; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; public class FileState { private File file; private String filePath; private long lastModified; private long size; + private boolean deleted = false; + private long signature; + private int signatureLength; + private boolean changed = false; + private RandomAccessFile randomAccessFile; + private long pointer = 0; + private FileState oldFileState; + private String fileName; public FileState(File file) throws IOException { this.file = file; filePath = file.getCanonicalPath(); - } - - public void refresh() { + fileName = file.getName(); + randomAccessFile = new RandomAccessFile(file, "r"); lastModified = file.lastModified(); size = file.length(); } @@ -52,4 +60,60 @@ public class FileState { return filePath; } + public boolean isDeleted() { + return deleted; + } + + public void setDeleted() { + deleted = true; + } + + public boolean hasChanged() { + return changed; + } + + public void setChanged(boolean changed) { + this.changed = changed; + } + + public long getSignature() { + return signature; + } + + public void setSignature(long signature) { + this.signature = signature; + } + + public RandomAccessFile getRandomAccessFile() { + return randomAccessFile; + } + + public long getPointer() { + return pointer; + } + + public void setPointer(long pointer) { + this.pointer = pointer; + } + + public int getSignatureLength() { + return signatureLength; + } + + public void setSignatureLength(int signatureLength) { + this.signatureLength = signatureLength; + } + + public FileState getOldFileState() { + return oldFileState; + } + + public void setOldFileState(FileState oldFileState) { + this.oldFileState = oldFileState; + } + + public String getFileName() { + return fileName; + } + } diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index ee53547..0b7df96 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -41,6 +41,8 @@ public class FileWatcher implements FileAlterationListener { private static final int ONE_DAY = 24 * 3600 * 1000; private long deadTime; private Map watchMap = new HashMap(); + private Map changedWatchMap = new HashMap(); + private static int MAX_SIGNATURE_LENGTH = 1024; public FileWatcher(long deadTime) { this.deadTime = deadTime; @@ -64,6 +66,129 @@ public class FileWatcher implements FileAlterationListener { } } + public void checkFiles() throws IOException { + logger.debug("Checking files"); + printWatchMap(); + for(FileAlterationObserver observer : observerList) { + observer.checkAndNotify(); + } + processModifications(); + + printWatchMap(); + } + + private void processModifications() throws IOException { + removeMarkedFilesFromWatchMap(); + + for(File file : changedWatchMap.keySet()) { + FileState state = changedWatchMap.get(file); + logger.trace("Checking file : " + file.getCanonicalPath()); + + // Determine if file is still the same + logger.trace("Determine if file name has not changed"); + FileState oldState = watchMap.get(file); + if(oldState != null) { + if(oldState.getSize() > state.getSize()) { + // File is shorter, can't be the same + logger.trace("File shorter : file can't be the same"); + } else { + if(oldState.getSignatureLength() == state.getSignatureLength() && oldState.getSignature() == state.getSignature()) { + // File is the same + state.setOldFileState(oldState); + logger.trace("Same signature size and value : file is the same"); + } else if(oldState.getSignatureLength() < state.getSignatureLength()){ + // Compute the signature on the new file + long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength()); + if(signature == oldState.getSignature()) { + // File is the same + state.setOldFileState(oldState); + logger.trace("Same signature : file is the same"); + } else { + // File can't be the same + logger.trace("Signature different : file can't be the same"); + } + } else if(oldState.getSignatureLength() > state.getSignatureLength()){ + // File can't be the same + logger.trace("Signature shorter : file can't be the same"); + } + } + } + + // Determine if there was a file with the same size and last modification date in the same directory and with a different name + logger.trace("Determine if file has just been renamed"); + if(state.getOldFileState() == null) { + for(File otherFile : changedWatchMap.keySet()) { + FileState otherState = watchMap.get(otherFile); + if(otherState != null + && state.getLastModified() == otherState.getLastModified() + && state.getSize() == otherState.getSize() + && state.getFilePath().equals(otherState.getFilePath()) + && ! state.getFileName().equals(otherState.getFileName())) { + logger.trace("Comparing to : " + otherFile.getCanonicalPath()); + // Assume file has been renamed + state.setOldFileState(otherState); + logger.trace("Same directory, same size and last modification date : file has been renamed to : " + otherFile.getCanonicalPath()); + } + } + } + + // Determine if file has been renamed and appended + logger.trace("Determine if file has been renamed and appended"); + if(state.getOldFileState() == null) { + for(File otherFile : changedWatchMap.keySet()) { + FileState otherState = watchMap.get(otherFile); + if(otherState != null && state.getSize() > otherState.getSize() && state.getFilePath().equals(otherState.getFilePath())) { + logger.trace("Comparing to : " + otherFile.getCanonicalPath()); + // File in the same directory which was smaller + if(otherState.getSignatureLength() == state.getSignatureLength() && otherState.getSignature() == state.getSignature()) { + // File is the same + state.setOldFileState(otherState); + logger.trace("Same signature size and value : file is the same"); + } else if(otherState.getSignatureLength() < state.getSignatureLength()){ + // Compute the signature on the new file + long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength()); + if(signature == otherState.getSignature()) { + // File is the same + state.setOldFileState(otherState); + logger.trace("Same signature : file is the same"); + } else { + // File can't be the same + logger.trace("Signature different : file can't be the same"); + } + } else if(otherState.getSignatureLength() > state.getSignatureLength()){ + // File can't be the same + logger.trace("Signature shorter : file can't be the same"); + } + } + } + } + } + + // Refresh file state + logger.trace("refreshing file state"); + for(File file : changedWatchMap.keySet()) { + logger.trace("Refreshing file : " + file.getCanonicalPath()); + FileState state = changedWatchMap.get(file); + FileState oldState = state.getOldFileState(); + if(oldState == null) { + logger.trace("File has been truncated or created, not retrieving pointer"); + } else { + logger.trace("File has not been truncated or created, retrieving pointer"); + state.setPointer(oldState.getPointer()); + oldState.getRandomAccessFile().close(); + } + } + + // Replacing old state + logger.trace("Replacing old state"); + for(File file : changedWatchMap.keySet()) { + logger.trace("Replacing file : " + file.getCanonicalPath()); + FileState state = changedWatchMap.get(file); + watchMap.put(file, state); + } + + } + private void watchFile(String fileToWatch) throws Exception { logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); String directory = FilenameUtils.getFullPath(fileToWatch); @@ -72,7 +197,7 @@ public class FileWatcher implements FileAlterationListener { FileFilterUtils.fileFileFilter(), FileFilterUtils.nameFileFilter(fileName), new LastModifiedFileFilter(deadTime)); - updateWatchMap(new File(directory), fileFilter); + initializeWatchMap(new File(directory), fileFilter); } private void watchWildCardFiles(String filesToWatch) throws Exception { @@ -84,35 +209,93 @@ public class FileWatcher implements FileAlterationListener { FileFilterUtils.fileFileFilter(), new WildcardFileFilter(wildcard), new LastModifiedFileFilter(deadTime)); - updateWatchMap(new File(directory), fileFilter); + initializeWatchMap(new File(directory), fileFilter); } private void watchStdIn() { logger.info("Watching stdin : not implemented yet"); } - private void updateWatchMap(File directory, IOFileFilter fileFilter) throws Exception { + private void initializeWatchMap(File directory, IOFileFilter fileFilter) throws Exception { FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter); observer.addListener(this); observerList.add(observer); observer.initialize(); for(File file : FileUtils.listFiles(directory, fileFilter, null)) { - addFileToWatchMap(file); + addFileToWatchMap(watchMap, file); } } - private void addFileToWatchMap(File file) { + private void addFileToWatchMap(Map map, File file) { try { FileState state = new FileState(file); - state.refresh(); - watchMap.put(file, state); + int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize()); + state.setSignatureLength(signatureLength); + long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); + state.setSignature(signature); + logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature); + map.put(file, state); } catch(IOException e) { logger.error("Caught IOException : " + e.getMessage()); } } - private void removeFileFromWatchMap(File file) { - watchMap.remove(file); + public void onFileChange(File file) { + try { + logger.debug("Change detected on file : " + file.getCanonicalPath()); + addFileToWatchMap(changedWatchMap, file); + } catch (IOException e) { + logger.error("Caught IOException : " + e.getMessage()); + } + } + + public void onFileCreate(File file) { + try { + logger.debug("Create detected on file : " + file.getCanonicalPath()); + addFileToWatchMap(changedWatchMap, file); + } catch (IOException e) { + logger.error("Caught IOException : " + e.getMessage()); + } + } + + public void onFileDelete(File file) { + try { + logger.debug("Delete detected on file : " + file.getCanonicalPath()); + watchMap.get(file).setDeleted(); + } catch (IOException e) { + logger.error("Caught IOException : " + e.getMessage()); + } + } + + private void printWatchMap() { + if(logger.isTraceEnabled()) { + logger.trace("WatchMap contents : "); + for(File file : watchMap.keySet()) { + FileState state = watchMap.get(file); + logger.trace("\tFile : " + state.getFilePath() + " marked for deletion : " + state.isDeleted()); + } + } + } + + private void removeMarkedFilesFromWatchMap() throws IOException { + logger.trace("Removing deleted files from watchMap"); + List markedList = null; + for(File file : watchMap.keySet()) { + FileState state = watchMap.get(file); + if(state.isDeleted()) { + if(markedList == null) { + markedList = new ArrayList(); + } + markedList.add(file); + } + } + if(markedList != null) { + for(File file : markedList) { + FileState state = watchMap.remove(file); + state.getRandomAccessFile().close(); + logger.trace("\tFile : " + state.getFilePath() + " removed"); + } + } } public void onDirectoryChange(File directory) { @@ -127,20 +310,6 @@ public class FileWatcher implements FileAlterationListener { // Do nothing } - public void onFileChange(File file) { - logger.debug("Change detected on file : " + file.getAbsolutePath()); - } - - public void onFileCreate(File file) { - logger.debug("Create detected on file : " + file.getAbsolutePath()); - addFileToWatchMap(file); - } - - public void onFileDelete(File file) { - logger.debug("Delete detected on file : " + file.getAbsolutePath()); - removeFileFromWatchMap(file); - } - public void onStart(FileAlterationObserver observer) { // TODO Auto-generated method stub } @@ -148,12 +317,4 @@ public class FileWatcher implements FileAlterationListener { public void onStop(FileAlterationObserver observer) { // TODO Auto-generated method stub } - - public void checkFiles() { - logger.debug("Checking files"); - for(FileAlterationObserver observer : observerList) { - observer.checkAndNotify(); - } - } - } diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java new file mode 100644 index 0000000..b985de0 --- /dev/null +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -0,0 +1,71 @@ +package info.fetter.logstashforwarder; + +import static org.apache.log4j.Level.*; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.RootLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FileWatcherTest { + Logger logger = Logger.getLogger(FileWatcherTest.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BasicConfigurator.configure(); + RootLogger.getRootLogger().setLevel(TRACE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + BasicConfigurator.resetConfiguration(); + } + + //@Test + public void testFileWatch() throws InterruptedException, IOException { + FileWatcher watcher = new FileWatcher(); + watcher.addFilesToWatch("./test.txt"); + for(int i = 0; i < 100; i++) { + Thread.sleep(1000); + watcher.checkFiles(); + } + } + + @Test + public void testWildcardWatch() throws InterruptedException, IOException { + FileWatcher watcher = new FileWatcher(); + watcher.addFilesToWatch("./test*.txt"); + + File file1 = new File("test1.txt"); + File file2 = new File("test2.txt"); + File file3 = new File("test3.txt"); + File file4 = new File("test4.txt"); + + watcher.checkFiles(); + Thread.sleep(500); + FileUtils.write(file1, "line 1\n", true); + Thread.sleep(500); + watcher.checkFiles(); + FileUtils.forceDeleteOnExit(file1); +// FileUtils.touch(file2); +// Thread.sleep(500); +// watcher.checkFiles(); +// FileUtils.touch(file3); +// FileUtils.forceDelete(file1); +// FileUtils.forceDelete(file2); +// Thread.sleep(500); +// watcher.checkFiles(); +// FileUtils.moveFile(file3, file4); +// FileUtils.touch(file3); +// Thread.sleep(500); +// watcher.checkFiles(); +// FileUtils.forceDelete(file3); +// FileUtils.forceDelete(file4); + } +}