Added -signaturelength option

Bug corrections on saved state loading
Bug corrections on file reading
This commit is contained in:
didfet
2015-03-19 18:25:15 +01:00
parent 850a92cfe1
commit fbc6bc352e
6 changed files with 137 additions and 135 deletions

View File

@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>logstash-forwarder-java</groupId> <groupId>logstash-forwarder-java</groupId>
<artifactId>logstash-forwarder-java</artifactId> <artifactId>logstash-forwarder-java</artifactId>
<version>0.1.1</version> <version>0.1.2-SNAPSHOT</version>
<name>logstash-forwarder-java</name> <name>logstash-forwarder-java</name>
<description>Java version of logstash forwarder</description> <description>Java version of logstash forwarder</description>
<url>https://github.com/didfet/logstash-forwarder-java</url> <url>https://github.com/didfet/logstash-forwarder-java</url>

View File

@@ -52,7 +52,7 @@ public class FileReader {
eventList = new ArrayList<Event>(spoolSize); eventList = new ArrayList<Event>(spoolSize);
} }
public int readFiles(Collection<FileState> fileList) throws IOException, AdapterException { public int readFiles(Collection<FileState> fileList) throws AdapterException {
int eventCount = 0; int eventCount = 0;
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("Reading " + fileList.size() + " file(s)"); logger.trace("Reading " + fileList.size() + " file(s)");
@@ -71,12 +71,12 @@ public class FileReader {
return eventCount; // Return number of events sent to adapter return eventCount; // Return number of events sent to adapter
} }
private int readFile(FileState state, int spaceLeftInSpool) throws IOException { private int readFile(FileState state, int spaceLeftInSpool) {
int eventListSizeBefore = eventList.size(); int eventListSizeBefore = eventList.size();
File file = state.getFile(); File file = state.getFile();
long pointer = state.getPointer(); long pointer = state.getPointer();
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("File : " + file.getCanonicalPath() + " pointer : " + pointer); logger.trace("File : " + file + " pointer : " + pointer);
logger.trace("Space left in spool : " + spaceLeftInSpool); logger.trace("Space left in spool : " + spaceLeftInSpool);
} }
pointer = readLines(state, spaceLeftInSpool); pointer = readLines(state, spaceLeftInSpool);
@@ -84,22 +84,27 @@ public class FileReader {
return eventList.size() - eventListSizeBefore; // Return number of events read return eventList.size() - eventListSizeBefore; // Return number of events read
} }
private long readLines(FileState state, int spaceLeftInSpool) throws IOException { private long readLines(FileState state, int spaceLeftInSpool) {
RandomAccessFile reader = state.getRandomAccessFile(); RandomAccessFile reader = state.getRandomAccessFile();
long pos = state.getPointer(); long pos = state.getPointer();
reader.seek(pos); try {
String line = readLine(reader); reader.seek(pos);
while (line != null && spaceLeftInSpool > 0) { String line = readLine(reader);
if(logger.isTraceEnabled()) { while (line != null && spaceLeftInSpool > 0) {
logger.trace("-- Read line : " + line); if(logger.isTraceEnabled()) {
logger.trace("-- Space left in spool : " + spaceLeftInSpool); logger.trace("-- Read line : " + line);
logger.trace("-- Space left in spool : " + spaceLeftInSpool);
}
pos = reader.getFilePointer();
addEvent(state, pos, line);
line = readLine(reader);
spaceLeftInSpool--;
} }
pos = reader.getFilePointer(); reader.seek(pos); // Ensure we can re-read if necessary
addEvent(state, pos, line); } catch(IOException e) {
line = readLine(reader); logger.warn("Exception raised while reading file : " + state.getFile());
spaceLeftInSpool--; e.printStackTrace();
} }
reader.seek(pos); // Ensure we can re-read if necessary
return pos; return pos;
} }

View File

@@ -18,6 +18,7 @@ package info.fetter.logstashforwarder;
*/ */
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@@ -60,8 +61,15 @@ public class FileState {
size = file.length(); size = file.length();
} }
private void setFileFromDirectoryAndName() { private void setFileFromDirectoryAndName() throws FileNotFoundException {
this.file = new File(directory + File.separator + fileName); file = new File(directory + File.separator + fileName);
if(file.exists()) {
randomAccessFile = new RandomAccessFile(file, "r");
lastModified = file.lastModified();
size = file.length();
} else {
deleted = true;
}
} }
public File getFile() { public File getFile() {
@@ -80,7 +88,7 @@ public class FileState {
return directory; return directory;
} }
public void setDirectory(String directory) { public void setDirectory(String directory) throws FileNotFoundException {
this.directory = directory; this.directory = directory;
if(fileName != null && directory != null) { if(fileName != null && directory != null) {
setFileFromDirectoryAndName(); setFileFromDirectoryAndName();
@@ -91,7 +99,7 @@ public class FileState {
return fileName; return fileName;
} }
public void setFileName(String fileName) { public void setFileName(String fileName) throws FileNotFoundException {
this.fileName = fileName; this.fileName = fileName;
if(fileName != null && directory != null) { if(fileName != null && directory != null) {
setFileFromDirectoryAndName(); setFileFromDirectoryAndName();
@@ -160,13 +168,13 @@ public class FileState {
@Override @Override
public String toString() { public String toString() {
return new ToStringBuilder(this). return new ToStringBuilder(this).
append("fileName", fileName). append("fileName", fileName).
append("directory", directory). append("directory", directory).
append("pointer", pointer). append("pointer", pointer).
append("signature", signature). append("signature", signature).
append("signatureLength", signatureLength). append("signatureLength", signatureLength).
toString(); toString();
} }
} }

View File

@@ -39,10 +39,10 @@ public class FileWatcher {
private static final Logger logger = Logger.getLogger(FileWatcher.class); private static final Logger logger = Logger.getLogger(FileWatcher.class);
private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>(); private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>();
public static final int ONE_DAY = 24 * 3600 * 1000; public static final int ONE_DAY = 24 * 3600 * 1000;
private Map<File,FileState> watchMap = new HashMap<File,FileState>(); private Map<File,FileState> oldWatchMap = new HashMap<File,FileState>();
private Map<File,FileState> changedWatchMap = new HashMap<File,FileState>(); private Map<File,FileState> newWatchMap = new HashMap<File,FileState>();
private FileState[] savedStates; private FileState[] savedStates;
private static int MAX_SIGNATURE_LENGTH = 1024; private int maxSignatureLength;
public FileWatcher() { public FileWatcher() {
try { try {
@@ -52,6 +52,17 @@ public class FileWatcher {
} }
} }
public void initialize() throws IOException {
logger.debug("Initializing FileWatcher");
if(savedStates != null) {
for(FileState state : savedStates) {
oldWatchMap.put(state.getFile(), state);
}
}
processModifications();
printWatchMap();
}
public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) {
try { try {
if(fileToWatch.equals("-")) { if(fileToWatch.equals("-")) {
@@ -79,15 +90,15 @@ public class FileWatcher {
public int readFiles(FileReader reader) throws IOException, AdapterException { public int readFiles(FileReader reader) throws IOException, AdapterException {
logger.debug("Reading files"); logger.debug("Reading files");
logger.trace("=============="); logger.trace("==============");
int numberOfLinesRead = reader.readFiles(watchMap.values()); int numberOfLinesRead = reader.readFiles(oldWatchMap.values());
Registrar.writeStateToJson(watchMap.values()); Registrar.writeStateToJson(oldWatchMap.values());
return numberOfLinesRead; return numberOfLinesRead;
} }
private void processModifications() throws IOException { private void processModifications() throws IOException {
for(File file : changedWatchMap.keySet()) { for(File file : newWatchMap.keySet()) {
FileState state = changedWatchMap.get(file); FileState state = newWatchMap.get(file);
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("Checking file : " + file.getCanonicalPath()); logger.trace("Checking file : " + file.getCanonicalPath());
logger.trace("-- Last modified : " + state.getLastModified()); logger.trace("-- Last modified : " + state.getLastModified());
@@ -97,7 +108,7 @@ public class FileWatcher {
} }
logger.trace("Determine if file has just been written to"); logger.trace("Determine if file has just been written to");
FileState oldState = watchMap.get(file); FileState oldState = oldWatchMap.get(file);
if(oldState != null) { if(oldState != null) {
if(oldState.getSize() > state.getSize()) { if(oldState.getSize() > state.getSize()) {
logger.trace("File shorter : file can't be the same"); logger.trace("File shorter : file can't be the same");
@@ -123,8 +134,8 @@ public class FileWatcher {
if(state.getOldFileState() == null) { if(state.getOldFileState() == null) {
logger.trace("Determine if file has been renamed and/or written to"); logger.trace("Determine if file has been renamed and/or written to");
for(File otherFile : watchMap.keySet()) { for(File otherFile : oldWatchMap.keySet()) {
FileState otherState = watchMap.get(otherFile); FileState otherState = oldWatchMap.get(otherFile);
if(otherState != null && state.getSize() >= otherState.getSize() && state.getDirectory().equals(otherState.getDirectory())) { if(otherState != null && state.getSize() >= otherState.getSize() && state.getDirectory().equals(otherState.getDirectory())) {
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("Comparing to : " + otherFile.getCanonicalPath()); logger.trace("Comparing to : " + otherFile.getCanonicalPath());
@@ -151,29 +162,31 @@ public class FileWatcher {
} }
logger.trace("Refreshing file state"); logger.trace("Refreshing file state");
for(File file : changedWatchMap.keySet()) { for(File file : newWatchMap.keySet()) {
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("Refreshing file : " + file.getCanonicalPath()); logger.trace("Refreshing file : " + file.getCanonicalPath());
} }
FileState state = changedWatchMap.get(file); FileState state = newWatchMap.get(file);
FileState oldState = state.getOldFileState(); FileState oldState = state.getOldFileState();
if(oldState == null) { if(oldState == null) {
logger.trace("File has been truncated or created, not retrieving pointer"); logger.trace("File has been truncated or created, not retrieving pointer");
} else { } else {
logger.trace("File has not been truncated or created, retrieving pointer"); logger.trace("File has not been truncated or created, retrieving pointer");
state.setPointer(oldState.getPointer()); state.setPointer(oldState.getPointer());
oldState.getRandomAccessFile().close(); try {
oldState.getRandomAccessFile().close();
} catch(Exception e) {}
} }
} }
logger.trace("Replacing old state"); logger.trace("Replacing old state");
for(File file : changedWatchMap.keySet()) { for(File file : newWatchMap.keySet()) {
FileState state = changedWatchMap.get(file); FileState state = newWatchMap.get(file);
watchMap.put(file, state); oldWatchMap.put(file, state);
} }
// Truncating changedWatchMap // Truncating changedWatchMap
changedWatchMap.clear(); newWatchMap.clear();
removeMarkedFilesFromWatchMap(); removeMarkedFilesFromWatchMap();
} }
@@ -216,46 +229,7 @@ public class FileWatcher {
observerList.add(observer); observerList.add(observer);
observer.initialize(); observer.initialize();
for(File file : FileUtils.listFiles(directory, fileFilter, null)) { for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
FileState savedState = null; addFileToWatchMap(newWatchMap, file, fields);
if(savedStates != null) {
for(FileState state : savedStates) {
logger.trace("Comparing file : " + file + " with saved file : " + state.getFile());
if(file.equals(state.getFile())) {
savedState = state;
logger.debug("Match found with saved file " + state.getFile());
}
}
}
if(savedState == null) {
addFileToWatchMap(watchMap, file, fields);
} else {
addSavedFileToWatchMap(savedState, fields);
}
}
}
private void addSavedFileToWatchMap(FileState savedFileState, Event fields) {
try {
File file = savedFileState.getFile();
FileState state = new FileState(file);
state.setFields(fields);
int savedSignatureLength = savedFileState.getSignatureLength();
state.setSignatureLength(savedSignatureLength);
long savedSignature = savedFileState.getSignature();
int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize());
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
if(signature == savedSignature) {
state.setPointer(savedFileState.getPointer());
logger.debug("Restoring signature of size : " + savedSignatureLength + " on file : " + file + " : " + savedSignature);
} else {
logger.debug("File " + file + " signature has changed");
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
}
state.setSignatureLength(signatureLength);
state.setSignature(signature);
watchMap.put(file, state);
} catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage());
} }
} }
@@ -263,7 +237,7 @@ public class FileWatcher {
try { try {
FileState state = new FileState(file); FileState state = new FileState(file);
state.setFields(fields); state.setFields(fields);
int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize()); int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
state.setSignatureLength(signatureLength); state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
state.setSignature(signature); state.setSignature(signature);
@@ -277,7 +251,7 @@ public class FileWatcher {
public void onFileChange(File file, Event fields) { public void onFileChange(File file, Event fields) {
try { try {
logger.debug("Change detected on file : " + file.getCanonicalPath()); logger.debug("Change detected on file : " + file.getCanonicalPath());
addFileToWatchMap(changedWatchMap, file, fields); addFileToWatchMap(newWatchMap, file, fields);
} catch (IOException e) { } catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage()); logger.error("Caught IOException : " + e.getMessage());
} }
@@ -286,7 +260,7 @@ public class FileWatcher {
public void onFileCreate(File file, Event fields) { public void onFileCreate(File file, Event fields) {
try { try {
logger.debug("Create detected on file : " + file.getCanonicalPath()); logger.debug("Create detected on file : " + file.getCanonicalPath());
addFileToWatchMap(changedWatchMap, file, fields); addFileToWatchMap(newWatchMap, file, fields);
} catch (IOException e) { } catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage()); logger.error("Caught IOException : " + e.getMessage());
} }
@@ -295,7 +269,7 @@ public class FileWatcher {
public void onFileDelete(File file) { public void onFileDelete(File file) {
try { try {
logger.debug("Delete detected on file : " + file.getCanonicalPath()); logger.debug("Delete detected on file : " + file.getCanonicalPath());
watchMap.get(file).setDeleted(); oldWatchMap.get(file).setDeleted();
} catch (IOException e) { } catch (IOException e) {
logger.error("Caught IOException : " + e.getMessage()); logger.error("Caught IOException : " + e.getMessage());
} }
@@ -304,8 +278,8 @@ public class FileWatcher {
private void printWatchMap() throws IOException { private void printWatchMap() throws IOException {
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("WatchMap contents : "); logger.trace("WatchMap contents : ");
for(File file : watchMap.keySet()) { for(File file : oldWatchMap.keySet()) {
FileState state = watchMap.get(file); FileState state = oldWatchMap.get(file);
logger.trace("\tFile : " + file.getCanonicalPath() + " marked for deletion : " + state.isDeleted()); logger.trace("\tFile : " + file.getCanonicalPath() + " marked for deletion : " + state.isDeleted());
} }
} }
@@ -314,8 +288,8 @@ public class FileWatcher {
private void removeMarkedFilesFromWatchMap() throws IOException { private void removeMarkedFilesFromWatchMap() throws IOException {
logger.trace("Removing deleted files from watchMap"); logger.trace("Removing deleted files from watchMap");
List<File> markedList = null; List<File> markedList = null;
for(File file : watchMap.keySet()) { for(File file : oldWatchMap.keySet()) {
FileState state = watchMap.get(file); FileState state = oldWatchMap.get(file);
if(state.isDeleted()) { if(state.isDeleted()) {
if(markedList == null) { if(markedList == null) {
markedList = new ArrayList<File>(); markedList = new ArrayList<File>();
@@ -325,19 +299,29 @@ public class FileWatcher {
} }
if(markedList != null) { if(markedList != null) {
for(File file : markedList) { for(File file : markedList) {
FileState state = watchMap.remove(file); FileState state = oldWatchMap.remove(file);
state.getRandomAccessFile().close(); try {
logger.trace("\tFile : " + file.getCanonicalFile() + " removed"); state.getRandomAccessFile().close();
} catch(Exception e) {}
logger.trace("\tFile : " + file + " removed");
} }
} }
} }
public void close() throws IOException { public void close() throws IOException {
logger.debug("Closing all files"); logger.debug("Closing all files");
for(File file : watchMap.keySet()) { for(File file : oldWatchMap.keySet()) {
FileState state = watchMap.get(file); FileState state = oldWatchMap.get(file);
state.getRandomAccessFile().close(); state.getRandomAccessFile().close();
} }
} }
public int getMaxSignatureLength() {
return maxSignatureLength;
}
public void setMaxSignatureLength(int maxSignatureLength) {
this.maxSignatureLength = maxSignatureLength;
}
} }

View File

@@ -52,6 +52,7 @@ public class Forwarder {
private static Level logLevel = INFO; private static Level logLevel = INFO;
private static ProtocolAdapter adapter; private static ProtocolAdapter adapter;
private static Random random = new Random(); private static Random random = new Random();
private static int signatureLength = 4096;
public static void main(String[] args) { public static void main(String[] args) {
try { try {
@@ -62,6 +63,7 @@ public class Forwarder {
// Logger.getLogger(FileReader.class).setLevel(TRACE); // Logger.getLogger(FileReader.class).setLevel(TRACE);
// Logger.getLogger(FileReader.class).setAdditivity(false); // Logger.getLogger(FileReader.class).setAdditivity(false);
watcher = new FileWatcher(); watcher = new FileWatcher();
watcher.setMaxSignatureLength(signatureLength);
configManager = new ConfigurationManager(config); configManager = new ConfigurationManager(config);
configManager.readConfiguration(); configManager.readConfiguration();
for(FilesSection files : configManager.getConfig().getFiles()) { for(FilesSection files : configManager.getConfig().getFiles()) {
@@ -69,6 +71,7 @@ public class Forwarder {
watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY); watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY);
} }
} }
watcher.initialize();
reader = new FileReader(spoolSize); reader = new FileReader(spoolSize);
connectToServer(); connectToServer();
infiniteLoop(); infiniteLoop();
@@ -128,6 +131,10 @@ public class Forwarder {
.isRequired() .isRequired()
.withDescription("path to logstash-forwarder configuration file") .withDescription("path to logstash-forwarder configuration file")
.create("config"); .create("config");
Option signatureLengthOption = OptionBuilder.withArgName("signature length")
.hasArg()
.withDescription("Maximum length of file signature")
.create("signaturelength");
options.addOption(helpOption) options.addOption(helpOption)
.addOption(idleTimeoutOption) .addOption(idleTimeoutOption)
@@ -135,6 +142,7 @@ public class Forwarder {
.addOption(quiet) .addOption(quiet)
.addOption(debug) .addOption(debug)
.addOption(trace) .addOption(trace)
.addOption(signatureLengthOption)
.addOption(configOption); .addOption(configOption);
CommandLineParser parser = new GnuParser(); CommandLineParser parser = new GnuParser();
try { try {
@@ -148,6 +156,9 @@ public class Forwarder {
if(line.hasOption("config")) { if(line.hasOption("config")) {
config = line.getOptionValue("config"); config = line.getOptionValue("config");
} }
if(line.hasOption("signaturelength")) {
signatureLength = Integer.parseInt(line.getOptionValue("signaturelength"));
}
if(line.hasOption("quiet")) { if(line.hasOption("quiet")) {
logLevel = ERROR; logLevel = ERROR;
} }

View File

@@ -54,17 +54,18 @@ public class FileWatcherTest {
} }
} }
//@Test @Test
public void testWildcardWatch() throws InterruptedException, IOException { public void testWildcardWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher(); FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
watcher.initialize();
File file1 = new File("test1.txt"); File file1 = new File("testFileWatcher1.txt");
File file2 = new File("test2.txt"); File file2 = new File("testFileWatcher2.txt");
//File file3 = new File("test3.txt"); //File file3 = new File("test3.txt");
//File file4 = new File("test4.txt"); //File file4 = new File("test4.txt");
File testDir = new File("test"); //File testDir = new File("testFileWatcher");
//FileUtils.forceMkdir(new File("test")); //FileUtils.forceMkdir(new File("test"));
watcher.checkFiles(); watcher.checkFiles();
@@ -76,29 +77,22 @@ public class FileWatcherTest {
FileUtils.write(file2, "file 2 line 1\n", true); FileUtils.write(file2, "file 2 line 1\n", true);
Thread.sleep(1000); Thread.sleep(1000);
watcher.checkFiles(); watcher.checkFiles();
FileUtils.moveFileToDirectory(file1, testDir, true); // FileUtils.moveFileToDirectory(file1, testDir, true);
FileUtils.write(file2, "file 2 line 2\n", true); // FileUtils.write(file2, "file 2 line 2\n", true);
FileUtils.moveFile(file2, file1); // FileUtils.moveFile(file2, file1);
FileUtils.write(file2, "file 3 line 1\n", true); // FileUtils.write(file2, "file 3 line 1\n", true);
// FileUtils.touch(file3); //
// Thread.sleep(1000);
// watcher.checkFiles();
//
//
// watcher.close();
// FileUtils.forceDelete(file1); // FileUtils.forceDelete(file1);
// FileUtils.forceDelete(file2); // FileUtils.forceDelete(file2);
Thread.sleep(1000); // FileUtils.forceDelete(testDir);
watcher.checkFiles();
watcher.close();
FileUtils.forceDelete(file1);
FileUtils.forceDelete(file2);
FileUtils.forceDelete(testDir);
// FileUtils.moveFile(file3, file4);
// FileUtils.touch(file3);
// Thread.sleep(500);
// watcher.checkFiles();
// FileUtils.forceDelete(file3);
// FileUtils.forceDelete(file4);
} }
@Test @Test