From f6cb23c09c8debd4c9399c78260cb6deecd55796 Mon Sep 17 00:00:00 2001 From: didfet Date: Wed, 18 Mar 2015 17:42:03 +0100 Subject: [PATCH] Implemented FileState serialization/deserialization. --- .../fetter/logstashforwarder/FileState.java | 2 +- .../fetter/logstashforwarder/FileWatcher.java | 56 ++++++++++++- .../fetter/logstashforwarder/Forwarder.java | 3 +- .../fetter/logstashforwarder/Registrar.java | 58 +++++++++++++ .../logstashforwarder/RegistrarTest.java | 81 +++++++++++++++++++ 5 files changed, 195 insertions(+), 5 deletions(-) create mode 100644 src/main/java/info/fetter/logstashforwarder/Registrar.java create mode 100644 src/test/java/info/fetter/logstashforwarder/RegistrarTest.java diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java index 6c489a8..832676a 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -61,7 +61,7 @@ public class FileState { } private void setFileFromDirectoryAndName() { - this.file = new File(directory + File.pathSeparator + fileName); + this.file = new File(directory + File.separator + fileName); } public File getFile() { diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index af7ac26..5ec0f04 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -40,8 +40,17 @@ public class FileWatcher { public static final int ONE_DAY = 24 * 3600 * 1000; private Map watchMap = new HashMap(); private Map changedWatchMap = new HashMap(); + private FileState[] savedStates; private static int MAX_SIGNATURE_LENGTH = 1024; + public FileWatcher() { + try { + savedStates = Registrar.readStateFromJson(); + } catch(Exception e) { + logger.warn("Could not load saved states : " + e.getMessage()); + } + } + public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { try { if(fileToWatch.equals("-")) { @@ -65,11 +74,13 @@ public class FileWatcher { processModifications(); printWatchMap(); } - + public int readFiles(FileReader reader) throws IOException { logger.debug("Reading files"); logger.trace("=============="); - return reader.readFiles(watchMap.values()); + int numberOfLinesRead = reader.readFiles(watchMap.values()); + Registrar.writeStateToJson(watchMap.values()); + return numberOfLinesRead; } private void processModifications() throws IOException { @@ -204,7 +215,46 @@ public class FileWatcher { observerList.add(observer); observer.initialize(); for(File file : FileUtils.listFiles(directory, fileFilter, null)) { - addFileToWatchMap(watchMap, file, fields); + FileState savedState = null; + if(savedStates != null) { + for(FileState state : savedStates) { + logger.trace("Comparing file : " + file + " with saved file : " + state.getFile()); + if(file.equals(state.getFile())) { + savedState = state; + logger.debug("Match found with saved file " + state.getFile()); + } + } + } + if(savedState == null) { + addFileToWatchMap(watchMap, file, fields); + } else { + addSavedFileToWatchMap(savedState, fields); + } + } + } + + private void addSavedFileToWatchMap(FileState savedFileState, Event fields) { + try { + File file = savedFileState.getFile(); + FileState state = new FileState(file); + state.setFields(fields); + int savedSignatureLength = savedFileState.getSignatureLength(); + state.setSignatureLength(savedSignatureLength); + long savedSignature = savedFileState.getSignature(); + int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize()); + long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); + if(signature == savedSignature) { + state.setPointer(savedFileState.getPointer()); + logger.debug("Restoring signature of size : " + savedSignatureLength + " on file : " + file + " : " + savedSignature); + } else { + logger.debug("File " + file + " signature has changed"); + logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature); + } + state.setSignatureLength(signatureLength); + state.setSignature(signature); + watchMap.put(file, state); + } catch (IOException e) { + logger.error("Caught IOException : " + e.getMessage()); } } diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index a1eb39e..f69eac9 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -42,7 +42,7 @@ public class Forwarder { private static int idleTimeout = 5000; private static String config; private static ConfigurationManager configManager; - private static FileWatcher watcher = new FileWatcher(); + private static FileWatcher watcher; private static FileReader reader; private static Level logLevel = INFO; private static ProtocolAdapter adapter; @@ -55,6 +55,7 @@ public class Forwarder { // Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement()); // Logger.getLogger(FileReader.class).setLevel(TRACE); // Logger.getLogger(FileReader.class).setAdditivity(false); + watcher = new FileWatcher(); configManager = new ConfigurationManager(config); configManager.readConfiguration(); for(FilesSection files : configManager.getConfig().getFiles()) { diff --git a/src/main/java/info/fetter/logstashforwarder/Registrar.java b/src/main/java/info/fetter/logstashforwarder/Registrar.java new file mode 100644 index 0000000..b64d352 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/Registrar.java @@ -0,0 +1,58 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class Registrar { + private static final String SINCEDB = ".logstash-forwarder-java"; + + private static ObjectMapper mapper = new ObjectMapper(); + + public static FileState[] readStateFromJson(File file) throws JsonParseException, JsonMappingException, IOException { + FileState[] stateArray = mapper.readValue(file, FileState[].class); + return stateArray; + } + + public static FileState[] readStateFromJson(String file) throws JsonParseException, JsonMappingException, IOException { + return readStateFromJson(new File(file)); + } + + public static FileState[] readStateFromJson() throws JsonParseException, JsonMappingException, IOException { + return readStateFromJson(SINCEDB); + } + + public static void writeStateToJson(File file, Collection stateList) throws JsonGenerationException, JsonMappingException, IOException { + mapper.writeValue(file, stateList); + } + + public static void writeStateToJson(String file, Collection stateList) throws JsonGenerationException, JsonMappingException, IOException { + writeStateToJson(new File(file), stateList); + } + + public static void writeStateToJson(Collection stateList) throws JsonGenerationException, JsonMappingException, IOException { + writeStateToJson(SINCEDB, stateList); + } +} diff --git a/src/test/java/info/fetter/logstashforwarder/RegistrarTest.java b/src/test/java/info/fetter/logstashforwarder/RegistrarTest.java new file mode 100644 index 0000000..68838d7 --- /dev/null +++ b/src/test/java/info/fetter/logstashforwarder/RegistrarTest.java @@ -0,0 +1,81 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import static org.apache.log4j.Level.DEBUG; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.RootLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; + +public class RegistrarTest { + Logger logger = Logger.getLogger(RegistrarTest.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BasicConfigurator.configure(); + RootLogger.getRootLogger().setLevel(DEBUG); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + BasicConfigurator.resetConfiguration(); + } + + @Test + public void testReadState1() throws JsonParseException, JsonMappingException, IOException { + FileState[] states = Registrar.readStateFromJson(new File(RegistrarTest.class.getClassLoader().getResource("state1.json").getFile())); + for(FileState state : states) { + logger.debug("Loaded state : " + state); + } + } + + @Test + public void testWriteState2() throws JsonGenerationException, JsonMappingException, IOException { + FileState state1 = new FileState(); + state1.setDirectory("/directory1"); + state1.setFileName("file1"); + state1.setPointer(1234); + state1.setSignature(123456); + state1.setSignatureLength(135); + FileState state2 = new FileState(); + state2.setDirectory("/directory2"); + state2.setFileName("file2"); + state2.setPointer(4321); + state2.setSignature(654321); + state2.setSignatureLength(531); + List stateList = new ArrayList(2); + stateList.add(state1); + stateList.add(state2); + File file = new File("state2.json"); + logger.debug("Writing to file : " + file.getCanonicalPath()); + Registrar.writeStateToJson(file, stateList); + } +}