Added FileModificationListener.

This commit is contained in:
didfet
2015-03-15 17:17:53 +01:00
parent d6d4897acb
commit 33f50488a1
4 changed files with 78 additions and 45 deletions

View File

@@ -32,16 +32,19 @@ public class Event {
} }
} }
public void addField(String key, byte[] value) { public Event addField(String key, byte[] value) {
keyValues.put(key, value); keyValues.put(key, value);
return this;
} }
public void addField(String key, String value) { public Event addField(String key, String value) {
keyValues.put(key, value.getBytes()); keyValues.put(key, value.getBytes());
return this;
} }
public void addField(String key, long value) { public Event addField(String key, long value) {
keyValues.put(key, String.valueOf(value).getBytes()); keyValues.put(key, String.valueOf(value).getBytes());
return this;
} }
public Map<String,byte[]> getKeyValues() { public Map<String,byte[]> getKeyValues() {

View File

@@ -0,0 +1,49 @@
package info.fetter.logstashforwarder;
import java.io.File;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationObserver;
public class FileModificationListener implements FileAlterationListener {
private Event fields;
private FileWatcher watcher;
public FileModificationListener(FileWatcher watcher, Event fields) {
this.watcher = watcher;
this.fields = fields;
}
public void onDirectoryChange(File file) {
// Not implemented
}
public void onDirectoryCreate(File file) {
// Not implemented
}
public void onDirectoryDelete(File file) {
// Not implemented
}
public void onFileChange(File file) {
watcher.onFileChange(file, fields);
}
public void onFileCreate(File file) {
watcher.onFileCreate(file, fields);
}
public void onFileDelete(File file) {
watcher.onFileDelete(file);
}
public void onStart(FileAlterationObserver file) {
// Not implemented
}
public void onStop(FileAlterationObserver file) {
// Not implemented
}
}

View File

@@ -31,11 +31,10 @@ import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationObserver; import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
public class FileWatcher implements FileAlterationListener { public class FileWatcher {
private static final Logger logger = Logger.getLogger(FileWatcher.class); private static final Logger logger = Logger.getLogger(FileWatcher.class);
private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>(); private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>();
private static final int ONE_DAY = 24 * 3600 * 1000; private static final int ONE_DAY = 24 * 3600 * 1000;
@@ -52,14 +51,14 @@ public class FileWatcher implements FileAlterationListener {
this(ONE_DAY); this(ONE_DAY);
} }
public void addFilesToWatch(String fileToWatch) { public void addFilesToWatch(String fileToWatch, Event fields) {
try { try {
if(fileToWatch.equals("-")) { if(fileToWatch.equals("-")) {
addStdIn(); addStdIn(fields);
} else if(fileToWatch.contains("*")) { } else if(fileToWatch.contains("*")) {
addWildCardFiles(fileToWatch); addWildCardFiles(fileToWatch, fields);
} else { } else {
addSingleFile(fileToWatch); addSingleFile(fileToWatch, fields);
} }
} catch(Exception e) { } catch(Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -175,7 +174,7 @@ public class FileWatcher implements FileAlterationListener {
removeMarkedFilesFromWatchMap(); removeMarkedFilesFromWatchMap();
} }
private void addSingleFile(String fileToWatch) throws Exception { private void addSingleFile(String fileToWatch, Event fields) throws Exception {
logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
String directory = FilenameUtils.getFullPath(fileToWatch); String directory = FilenameUtils.getFullPath(fileToWatch);
String fileName = FilenameUtils.getName(fileToWatch); String fileName = FilenameUtils.getName(fileToWatch);
@@ -183,10 +182,10 @@ public class FileWatcher implements FileAlterationListener {
FileFilterUtils.fileFileFilter(), FileFilterUtils.fileFileFilter(),
FileFilterUtils.nameFileFilter(fileName), FileFilterUtils.nameFileFilter(fileName),
new LastModifiedFileFilter(deadTime)); new LastModifiedFileFilter(deadTime));
initializeWatchMap(new File(directory), fileFilter); initializeWatchMap(new File(directory), fileFilter, fields);
} }
private void addWildCardFiles(String filesToWatch) throws Exception { private void addWildCardFiles(String filesToWatch, Event fields) throws Exception {
logger.info("Watching wildcard files : " + filesToWatch); logger.info("Watching wildcard files : " + filesToWatch);
String directory = FilenameUtils.getFullPath(filesToWatch); String directory = FilenameUtils.getFullPath(filesToWatch);
String wildcard = FilenameUtils.getName(filesToWatch); String wildcard = FilenameUtils.getName(filesToWatch);
@@ -195,26 +194,28 @@ public class FileWatcher implements FileAlterationListener {
FileFilterUtils.fileFileFilter(), FileFilterUtils.fileFileFilter(),
new WildcardFileFilter(wildcard), new WildcardFileFilter(wildcard),
new LastModifiedFileFilter(deadTime)); new LastModifiedFileFilter(deadTime));
initializeWatchMap(new File(directory), fileFilter); initializeWatchMap(new File(directory), fileFilter, fields);
} }
private void addStdIn() { private void addStdIn(Event fields) {
logger.info("Watching stdin : not implemented yet"); logger.error("Watching stdin : not implemented yet");
} }
private void initializeWatchMap(File directory, IOFileFilter fileFilter) throws Exception { private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception {
FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter); FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter);
observer.addListener(this); FileModificationListener listener = new FileModificationListener(this, fields);
observer.addListener(listener);
observerList.add(observer); observerList.add(observer);
observer.initialize(); observer.initialize();
for(File file : FileUtils.listFiles(directory, fileFilter, null)) { for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
addFileToWatchMap(watchMap, file); addFileToWatchMap(watchMap, file, fields);
} }
} }
private void addFileToWatchMap(Map<File,FileState> map, File file) { private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields) {
try { try {
FileState state = new FileState(file); FileState state = new FileState(file);
state.setFields(fields);
int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize()); int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize());
state.setSignatureLength(signatureLength); state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
@@ -226,19 +227,19 @@ public class FileWatcher implements FileAlterationListener {
} }
} }
public void onFileChange(File file) { public void onFileChange(File file, Event fields) {
try { try {
logger.debug("Change detected on file : " + file.getCanonicalPath()); logger.debug("Change detected on file : " + file.getCanonicalPath());
addFileToWatchMap(changedWatchMap, file); addFileToWatchMap(changedWatchMap, file, fields);
} catch (IOException e) { } catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage()); logger.error("Caught IOException : " + e.getMessage());
} }
} }
public void onFileCreate(File file) { public void onFileCreate(File file, Event fields) {
try { try {
logger.debug("Create detected on file : " + file.getCanonicalPath()); logger.debug("Create detected on file : " + file.getCanonicalPath());
addFileToWatchMap(changedWatchMap, file); addFileToWatchMap(changedWatchMap, file, fields);
} catch (IOException e) { } catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage()); logger.error("Caught IOException : " + e.getMessage());
} }
@@ -292,24 +293,4 @@ public class FileWatcher implements FileAlterationListener {
} }
} }
public void onDirectoryChange(File directory) {
// Do nothing
}
public void onDirectoryCreate(File directory) {
// Do nothing
}
public void onDirectoryDelete(File directory) {
// Do nothing
}
public void onStart(FileAlterationObserver observer) {
// TODO Auto-generated method stub
}
public void onStop(FileAlterationObserver observer) {
// TODO Auto-generated method stub
}
} }

View File

@@ -30,7 +30,7 @@ public class FileWatcherTest {
//@Test //@Test
public void testFileWatch() throws InterruptedException, IOException { public void testFileWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher(); FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./test.txt"); watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"));
for(int i = 0; i < 100; i++) { for(int i = 0; i < 100; i++) {
Thread.sleep(1000); Thread.sleep(1000);
watcher.checkFiles(); watcher.checkFiles();
@@ -40,7 +40,7 @@ public class FileWatcherTest {
@Test @Test
public void testWildcardWatch() throws InterruptedException, IOException { public void testWildcardWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher(); FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./test*.txt"); watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"));
File file1 = new File("test1.txt"); File file1 = new File("test1.txt");
File file2 = new File("test2.txt"); File file2 = new File("test2.txt");