diff --git a/pom.xml b/pom.xml
index 8141ec5..5445c6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
logstash-forwarder-java
logstash-forwarder-java
- 0.1.1
+ 0.1.2-SNAPSHOT
logstash-forwarder-java
Java version of logstash forwarder
https://github.com/didfet/logstash-forwarder-java
diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java
index 8623237..de93faa 100644
--- a/src/main/java/info/fetter/logstashforwarder/FileReader.java
+++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java
@@ -52,7 +52,7 @@ public class FileReader {
eventList = new ArrayList(spoolSize);
}
- public int readFiles(Collection fileList) throws IOException, AdapterException {
+ public int readFiles(Collection fileList) throws AdapterException {
int eventCount = 0;
if(logger.isTraceEnabled()) {
logger.trace("Reading " + fileList.size() + " file(s)");
@@ -71,12 +71,12 @@ public class FileReader {
return eventCount; // Return number of events sent to adapter
}
- private int readFile(FileState state, int spaceLeftInSpool) throws IOException {
+ private int readFile(FileState state, int spaceLeftInSpool) {
int eventListSizeBefore = eventList.size();
File file = state.getFile();
long pointer = state.getPointer();
if(logger.isTraceEnabled()) {
- logger.trace("File : " + file.getCanonicalPath() + " pointer : " + pointer);
+ logger.trace("File : " + file + " pointer : " + pointer);
logger.trace("Space left in spool : " + spaceLeftInSpool);
}
pointer = readLines(state, spaceLeftInSpool);
@@ -84,22 +84,27 @@ public class FileReader {
return eventList.size() - eventListSizeBefore; // Return number of events read
}
- private long readLines(FileState state, int spaceLeftInSpool) throws IOException {
+ private long readLines(FileState state, int spaceLeftInSpool) {
RandomAccessFile reader = state.getRandomAccessFile();
long pos = state.getPointer();
- reader.seek(pos);
- String line = readLine(reader);
- while (line != null && spaceLeftInSpool > 0) {
- if(logger.isTraceEnabled()) {
- logger.trace("-- Read line : " + line);
- logger.trace("-- Space left in spool : " + spaceLeftInSpool);
+ try {
+ reader.seek(pos);
+ String line = readLine(reader);
+ while (line != null && spaceLeftInSpool > 0) {
+ if(logger.isTraceEnabled()) {
+ logger.trace("-- Read line : " + line);
+ logger.trace("-- Space left in spool : " + spaceLeftInSpool);
+ }
+ pos = reader.getFilePointer();
+ addEvent(state, pos, line);
+ line = readLine(reader);
+ spaceLeftInSpool--;
}
- pos = reader.getFilePointer();
- addEvent(state, pos, line);
- line = readLine(reader);
- spaceLeftInSpool--;
+ reader.seek(pos); // Ensure we can re-read if necessary
+ } catch(IOException e) {
+ logger.warn("Exception raised while reading file : " + state.getFile());
+ e.printStackTrace();
}
- reader.seek(pos); // Ensure we can re-read if necessary
return pos;
}
diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java
index 832676a..53e38da 100644
--- a/src/main/java/info/fetter/logstashforwarder/FileState.java
+++ b/src/main/java/info/fetter/logstashforwarder/FileState.java
@@ -18,6 +18,7 @@ package info.fetter.logstashforwarder;
*/
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -47,10 +48,10 @@ public class FileState {
private FileState oldFileState;
@JsonIgnore
private Event fields;
-
+
public FileState() {
}
-
+
public FileState(File file) throws IOException {
this.file = file;
directory = file.getCanonicalFile().getParent();
@@ -59,9 +60,16 @@ public class FileState {
lastModified = file.lastModified();
size = file.length();
}
-
- private void setFileFromDirectoryAndName() {
- this.file = new File(directory + File.separator + fileName);
+
+ private void setFileFromDirectoryAndName() throws FileNotFoundException {
+ file = new File(directory + File.separator + fileName);
+ if(file.exists()) {
+ randomAccessFile = new RandomAccessFile(file, "r");
+ lastModified = file.lastModified();
+ size = file.length();
+ } else {
+ deleted = true;
+ }
}
public File getFile() {
@@ -75,41 +83,41 @@ public class FileState {
public long getSize() {
return size;
}
-
+
public String getDirectory() {
return directory;
}
-
- public void setDirectory(String directory) {
+
+ public void setDirectory(String directory) throws FileNotFoundException {
this.directory = directory;
if(fileName != null && directory != null) {
setFileFromDirectoryAndName();
}
}
-
+
public String getFileName() {
return fileName;
}
-
- public void setFileName(String fileName) {
+
+ public void setFileName(String fileName) throws FileNotFoundException {
this.fileName = fileName;
if(fileName != null && directory != null) {
setFileFromDirectoryAndName();
}
}
-
+
public boolean isDeleted() {
return deleted;
}
-
+
public void setDeleted() {
deleted = true;
}
-
+
public boolean hasChanged() {
return changed;
}
-
+
public void setChanged(boolean changed) {
this.changed = changed;
}
@@ -129,7 +137,7 @@ public class FileState {
public long getPointer() {
return pointer;
}
-
+
public void setPointer(long pointer) {
this.pointer = pointer;
}
@@ -157,16 +165,16 @@ public class FileState {
public void setFields(Event fields) {
this.fields = fields;
}
-
+
@Override
public String toString() {
- return new ToStringBuilder(this).
- append("fileName", fileName).
- append("directory", directory).
- append("pointer", pointer).
- append("signature", signature).
- append("signatureLength", signatureLength).
- toString();
+ return new ToStringBuilder(this).
+ append("fileName", fileName).
+ append("directory", directory).
+ append("pointer", pointer).
+ append("signature", signature).
+ append("signatureLength", signatureLength).
+ toString();
}
-
+
}
diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java
index 48c4917..db60ce9 100644
--- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java
+++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java
@@ -39,10 +39,10 @@ public class FileWatcher {
private static final Logger logger = Logger.getLogger(FileWatcher.class);
private List observerList = new ArrayList();
public static final int ONE_DAY = 24 * 3600 * 1000;
- private Map watchMap = new HashMap();
- private Map changedWatchMap = new HashMap();
+ private Map oldWatchMap = new HashMap();
+ private Map newWatchMap = new HashMap();
private FileState[] savedStates;
- private static int MAX_SIGNATURE_LENGTH = 1024;
+ private int maxSignatureLength;
public FileWatcher() {
try {
@@ -52,6 +52,17 @@ public class FileWatcher {
}
}
+ public void initialize() throws IOException {
+ logger.debug("Initializing FileWatcher");
+ if(savedStates != null) {
+ for(FileState state : savedStates) {
+ oldWatchMap.put(state.getFile(), state);
+ }
+ }
+ processModifications();
+ printWatchMap();
+ }
+
public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) {
try {
if(fileToWatch.equals("-")) {
@@ -79,15 +90,15 @@ public class FileWatcher {
public int readFiles(FileReader reader) throws IOException, AdapterException {
logger.debug("Reading files");
logger.trace("==============");
- int numberOfLinesRead = reader.readFiles(watchMap.values());
- Registrar.writeStateToJson(watchMap.values());
+ int numberOfLinesRead = reader.readFiles(oldWatchMap.values());
+ Registrar.writeStateToJson(oldWatchMap.values());
return numberOfLinesRead;
}
private void processModifications() throws IOException {
- for(File file : changedWatchMap.keySet()) {
- FileState state = changedWatchMap.get(file);
+ for(File file : newWatchMap.keySet()) {
+ FileState state = newWatchMap.get(file);
if(logger.isTraceEnabled()) {
logger.trace("Checking file : " + file.getCanonicalPath());
logger.trace("-- Last modified : " + state.getLastModified());
@@ -97,7 +108,7 @@ public class FileWatcher {
}
logger.trace("Determine if file has just been written to");
- FileState oldState = watchMap.get(file);
+ FileState oldState = oldWatchMap.get(file);
if(oldState != null) {
if(oldState.getSize() > state.getSize()) {
logger.trace("File shorter : file can't be the same");
@@ -123,8 +134,8 @@ public class FileWatcher {
if(state.getOldFileState() == null) {
logger.trace("Determine if file has been renamed and/or written to");
- for(File otherFile : watchMap.keySet()) {
- FileState otherState = watchMap.get(otherFile);
+ for(File otherFile : oldWatchMap.keySet()) {
+ FileState otherState = oldWatchMap.get(otherFile);
if(otherState != null && state.getSize() >= otherState.getSize() && state.getDirectory().equals(otherState.getDirectory())) {
if(logger.isTraceEnabled()) {
logger.trace("Comparing to : " + otherFile.getCanonicalPath());
@@ -151,29 +162,31 @@ public class FileWatcher {
}
logger.trace("Refreshing file state");
- for(File file : changedWatchMap.keySet()) {
+ for(File file : newWatchMap.keySet()) {
if(logger.isTraceEnabled()) {
logger.trace("Refreshing file : " + file.getCanonicalPath());
}
- FileState state = changedWatchMap.get(file);
+ FileState state = newWatchMap.get(file);
FileState oldState = state.getOldFileState();
if(oldState == null) {
logger.trace("File has been truncated or created, not retrieving pointer");
} else {
logger.trace("File has not been truncated or created, retrieving pointer");
state.setPointer(oldState.getPointer());
- oldState.getRandomAccessFile().close();
+ try {
+ oldState.getRandomAccessFile().close();
+ } catch(Exception e) {}
}
}
logger.trace("Replacing old state");
- for(File file : changedWatchMap.keySet()) {
- FileState state = changedWatchMap.get(file);
- watchMap.put(file, state);
+ for(File file : newWatchMap.keySet()) {
+ FileState state = newWatchMap.get(file);
+ oldWatchMap.put(file, state);
}
// Truncating changedWatchMap
- changedWatchMap.clear();
+ newWatchMap.clear();
removeMarkedFilesFromWatchMap();
}
@@ -216,46 +229,7 @@ public class FileWatcher {
observerList.add(observer);
observer.initialize();
for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
- FileState savedState = null;
- if(savedStates != null) {
- for(FileState state : savedStates) {
- logger.trace("Comparing file : " + file + " with saved file : " + state.getFile());
- if(file.equals(state.getFile())) {
- savedState = state;
- logger.debug("Match found with saved file " + state.getFile());
- }
- }
- }
- if(savedState == null) {
- addFileToWatchMap(watchMap, file, fields);
- } else {
- addSavedFileToWatchMap(savedState, fields);
- }
- }
- }
-
- private void addSavedFileToWatchMap(FileState savedFileState, Event fields) {
- try {
- File file = savedFileState.getFile();
- FileState state = new FileState(file);
- state.setFields(fields);
- int savedSignatureLength = savedFileState.getSignatureLength();
- state.setSignatureLength(savedSignatureLength);
- long savedSignature = savedFileState.getSignature();
- int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize());
- long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
- if(signature == savedSignature) {
- state.setPointer(savedFileState.getPointer());
- logger.debug("Restoring signature of size : " + savedSignatureLength + " on file : " + file + " : " + savedSignature);
- } else {
- logger.debug("File " + file + " signature has changed");
- logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
- }
- state.setSignatureLength(signatureLength);
- state.setSignature(signature);
- watchMap.put(file, state);
- } catch (IOException e) {
- logger.error("Caught IOException : " + e.getMessage());
+ addFileToWatchMap(newWatchMap, file, fields);
}
}
@@ -263,7 +237,7 @@ public class FileWatcher {
try {
FileState state = new FileState(file);
state.setFields(fields);
- int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize());
+ int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
state.setSignature(signature);
@@ -277,7 +251,7 @@ public class FileWatcher {
public void onFileChange(File file, Event fields) {
try {
logger.debug("Change detected on file : " + file.getCanonicalPath());
- addFileToWatchMap(changedWatchMap, file, fields);
+ addFileToWatchMap(newWatchMap, file, fields);
} catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage());
}
@@ -286,7 +260,7 @@ public class FileWatcher {
public void onFileCreate(File file, Event fields) {
try {
logger.debug("Create detected on file : " + file.getCanonicalPath());
- addFileToWatchMap(changedWatchMap, file, fields);
+ addFileToWatchMap(newWatchMap, file, fields);
} catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage());
}
@@ -295,7 +269,7 @@ public class FileWatcher {
public void onFileDelete(File file) {
try {
logger.debug("Delete detected on file : " + file.getCanonicalPath());
- watchMap.get(file).setDeleted();
+ oldWatchMap.get(file).setDeleted();
} catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage());
}
@@ -304,8 +278,8 @@ public class FileWatcher {
private void printWatchMap() throws IOException {
if(logger.isTraceEnabled()) {
logger.trace("WatchMap contents : ");
- for(File file : watchMap.keySet()) {
- FileState state = watchMap.get(file);
+ for(File file : oldWatchMap.keySet()) {
+ FileState state = oldWatchMap.get(file);
logger.trace("\tFile : " + file.getCanonicalPath() + " marked for deletion : " + state.isDeleted());
}
}
@@ -314,8 +288,8 @@ public class FileWatcher {
private void removeMarkedFilesFromWatchMap() throws IOException {
logger.trace("Removing deleted files from watchMap");
List markedList = null;
- for(File file : watchMap.keySet()) {
- FileState state = watchMap.get(file);
+ for(File file : oldWatchMap.keySet()) {
+ FileState state = oldWatchMap.get(file);
if(state.isDeleted()) {
if(markedList == null) {
markedList = new ArrayList();
@@ -325,19 +299,29 @@ public class FileWatcher {
}
if(markedList != null) {
for(File file : markedList) {
- FileState state = watchMap.remove(file);
- state.getRandomAccessFile().close();
- logger.trace("\tFile : " + file.getCanonicalFile() + " removed");
+ FileState state = oldWatchMap.remove(file);
+ try {
+ state.getRandomAccessFile().close();
+ } catch(Exception e) {}
+ logger.trace("\tFile : " + file + " removed");
}
}
}
public void close() throws IOException {
logger.debug("Closing all files");
- for(File file : watchMap.keySet()) {
- FileState state = watchMap.get(file);
+ for(File file : oldWatchMap.keySet()) {
+ FileState state = oldWatchMap.get(file);
state.getRandomAccessFile().close();
}
}
+ public int getMaxSignatureLength() {
+ return maxSignatureLength;
+ }
+
+ public void setMaxSignatureLength(int maxSignatureLength) {
+ this.maxSignatureLength = maxSignatureLength;
+ }
+
}
diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java
index 46dc469..faebcd7 100644
--- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java
+++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java
@@ -52,6 +52,7 @@ public class Forwarder {
private static Level logLevel = INFO;
private static ProtocolAdapter adapter;
private static Random random = new Random();
+ private static int signatureLength = 4096;
public static void main(String[] args) {
try {
@@ -62,6 +63,7 @@ public class Forwarder {
// Logger.getLogger(FileReader.class).setLevel(TRACE);
// Logger.getLogger(FileReader.class).setAdditivity(false);
watcher = new FileWatcher();
+ watcher.setMaxSignatureLength(signatureLength);
configManager = new ConfigurationManager(config);
configManager.readConfiguration();
for(FilesSection files : configManager.getConfig().getFiles()) {
@@ -69,6 +71,7 @@ public class Forwarder {
watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY);
}
}
+ watcher.initialize();
reader = new FileReader(spoolSize);
connectToServer();
infiniteLoop();
@@ -128,6 +131,10 @@ public class Forwarder {
.isRequired()
.withDescription("path to logstash-forwarder configuration file")
.create("config");
+ Option signatureLengthOption = OptionBuilder.withArgName("signature length")
+ .hasArg()
+ .withDescription("Maximum length of file signature")
+ .create("signaturelength");
options.addOption(helpOption)
.addOption(idleTimeoutOption)
@@ -135,6 +142,7 @@ public class Forwarder {
.addOption(quiet)
.addOption(debug)
.addOption(trace)
+ .addOption(signatureLengthOption)
.addOption(configOption);
CommandLineParser parser = new GnuParser();
try {
@@ -148,6 +156,9 @@ public class Forwarder {
if(line.hasOption("config")) {
config = line.getOptionValue("config");
}
+ if(line.hasOption("signaturelength")) {
+ signatureLength = Integer.parseInt(line.getOptionValue("signaturelength"));
+ }
if(line.hasOption("quiet")) {
logLevel = ERROR;
}
diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java
index 5b433e6..04f9a08 100644
--- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java
+++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java
@@ -54,17 +54,18 @@ public class FileWatcherTest {
}
}
- //@Test
+ @Test
public void testWildcardWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher();
- watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
+ watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
+ watcher.initialize();
- File file1 = new File("test1.txt");
- File file2 = new File("test2.txt");
+ File file1 = new File("testFileWatcher1.txt");
+ File file2 = new File("testFileWatcher2.txt");
//File file3 = new File("test3.txt");
//File file4 = new File("test4.txt");
- File testDir = new File("test");
+ //File testDir = new File("testFileWatcher");
//FileUtils.forceMkdir(new File("test"));
watcher.checkFiles();
@@ -76,29 +77,22 @@ public class FileWatcherTest {
FileUtils.write(file2, "file 2 line 1\n", true);
Thread.sleep(1000);
watcher.checkFiles();
- FileUtils.moveFileToDirectory(file1, testDir, true);
- FileUtils.write(file2, "file 2 line 2\n", true);
- FileUtils.moveFile(file2, file1);
- FileUtils.write(file2, "file 3 line 1\n", true);
-// FileUtils.touch(file3);
+// FileUtils.moveFileToDirectory(file1, testDir, true);
+// FileUtils.write(file2, "file 2 line 2\n", true);
+// FileUtils.moveFile(file2, file1);
+// FileUtils.write(file2, "file 3 line 1\n", true);
+//
+// Thread.sleep(1000);
+// watcher.checkFiles();
+//
+//
+// watcher.close();
// FileUtils.forceDelete(file1);
// FileUtils.forceDelete(file2);
- Thread.sleep(1000);
- watcher.checkFiles();
+// FileUtils.forceDelete(testDir);
- watcher.close();
- FileUtils.forceDelete(file1);
- FileUtils.forceDelete(file2);
- FileUtils.forceDelete(testDir);
-
-
-// FileUtils.moveFile(file3, file4);
-// FileUtils.touch(file3);
-// Thread.sleep(500);
-// watcher.checkFiles();
-// FileUtils.forceDelete(file3);
-// FileUtils.forceDelete(file4);
+
}
@Test