mirror of
https://github.com/Febbweiss/logstash-forwarder-java.git
synced 2026-03-04 22:25:39 +00:00
Implemented FileState serialization/deserialization.
This commit is contained in:
@@ -61,7 +61,7 @@ public class FileState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void setFileFromDirectoryAndName() {
|
private void setFileFromDirectoryAndName() {
|
||||||
this.file = new File(directory + File.pathSeparator + fileName);
|
this.file = new File(directory + File.separator + fileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public File getFile() {
|
public File getFile() {
|
||||||
|
|||||||
@@ -40,8 +40,17 @@ public class FileWatcher {
|
|||||||
public static final int ONE_DAY = 24 * 3600 * 1000;
|
public static final int ONE_DAY = 24 * 3600 * 1000;
|
||||||
private Map<File,FileState> watchMap = new HashMap<File,FileState>();
|
private Map<File,FileState> watchMap = new HashMap<File,FileState>();
|
||||||
private Map<File,FileState> changedWatchMap = new HashMap<File,FileState>();
|
private Map<File,FileState> changedWatchMap = new HashMap<File,FileState>();
|
||||||
|
private FileState[] savedStates;
|
||||||
private static int MAX_SIGNATURE_LENGTH = 1024;
|
private static int MAX_SIGNATURE_LENGTH = 1024;
|
||||||
|
|
||||||
|
public FileWatcher() {
|
||||||
|
try {
|
||||||
|
savedStates = Registrar.readStateFromJson();
|
||||||
|
} catch(Exception e) {
|
||||||
|
logger.warn("Could not load saved states : " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) {
|
public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) {
|
||||||
try {
|
try {
|
||||||
if(fileToWatch.equals("-")) {
|
if(fileToWatch.equals("-")) {
|
||||||
@@ -69,7 +78,9 @@ public class FileWatcher {
|
|||||||
public int readFiles(FileReader reader) throws IOException {
|
public int readFiles(FileReader reader) throws IOException {
|
||||||
logger.debug("Reading files");
|
logger.debug("Reading files");
|
||||||
logger.trace("==============");
|
logger.trace("==============");
|
||||||
return reader.readFiles(watchMap.values());
|
int numberOfLinesRead = reader.readFiles(watchMap.values());
|
||||||
|
Registrar.writeStateToJson(watchMap.values());
|
||||||
|
return numberOfLinesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processModifications() throws IOException {
|
private void processModifications() throws IOException {
|
||||||
@@ -204,7 +215,46 @@ public class FileWatcher {
|
|||||||
observerList.add(observer);
|
observerList.add(observer);
|
||||||
observer.initialize();
|
observer.initialize();
|
||||||
for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
|
for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
|
||||||
addFileToWatchMap(watchMap, file, fields);
|
FileState savedState = null;
|
||||||
|
if(savedStates != null) {
|
||||||
|
for(FileState state : savedStates) {
|
||||||
|
logger.trace("Comparing file : " + file + " with saved file : " + state.getFile());
|
||||||
|
if(file.equals(state.getFile())) {
|
||||||
|
savedState = state;
|
||||||
|
logger.debug("Match found with saved file " + state.getFile());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(savedState == null) {
|
||||||
|
addFileToWatchMap(watchMap, file, fields);
|
||||||
|
} else {
|
||||||
|
addSavedFileToWatchMap(savedState, fields);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addSavedFileToWatchMap(FileState savedFileState, Event fields) {
|
||||||
|
try {
|
||||||
|
File file = savedFileState.getFile();
|
||||||
|
FileState state = new FileState(file);
|
||||||
|
state.setFields(fields);
|
||||||
|
int savedSignatureLength = savedFileState.getSignatureLength();
|
||||||
|
state.setSignatureLength(savedSignatureLength);
|
||||||
|
long savedSignature = savedFileState.getSignature();
|
||||||
|
int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize());
|
||||||
|
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
|
||||||
|
if(signature == savedSignature) {
|
||||||
|
state.setPointer(savedFileState.getPointer());
|
||||||
|
logger.debug("Restoring signature of size : " + savedSignatureLength + " on file : " + file + " : " + savedSignature);
|
||||||
|
} else {
|
||||||
|
logger.debug("File " + file + " signature has changed");
|
||||||
|
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
|
||||||
|
}
|
||||||
|
state.setSignatureLength(signatureLength);
|
||||||
|
state.setSignature(signature);
|
||||||
|
watchMap.put(file, state);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("Caught IOException : " + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ public class Forwarder {
|
|||||||
private static int idleTimeout = 5000;
|
private static int idleTimeout = 5000;
|
||||||
private static String config;
|
private static String config;
|
||||||
private static ConfigurationManager configManager;
|
private static ConfigurationManager configManager;
|
||||||
private static FileWatcher watcher = new FileWatcher();
|
private static FileWatcher watcher;
|
||||||
private static FileReader reader;
|
private static FileReader reader;
|
||||||
private static Level logLevel = INFO;
|
private static Level logLevel = INFO;
|
||||||
private static ProtocolAdapter adapter;
|
private static ProtocolAdapter adapter;
|
||||||
@@ -55,6 +55,7 @@ public class Forwarder {
|
|||||||
// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
|
// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
|
||||||
// Logger.getLogger(FileReader.class).setLevel(TRACE);
|
// Logger.getLogger(FileReader.class).setLevel(TRACE);
|
||||||
// Logger.getLogger(FileReader.class).setAdditivity(false);
|
// Logger.getLogger(FileReader.class).setAdditivity(false);
|
||||||
|
watcher = new FileWatcher();
|
||||||
configManager = new ConfigurationManager(config);
|
configManager = new ConfigurationManager(config);
|
||||||
configManager.readConfiguration();
|
configManager.readConfiguration();
|
||||||
for(FilesSection files : configManager.getConfig().getFiles()) {
|
for(FilesSection files : configManager.getConfig().getFiles()) {
|
||||||
|
|||||||
58
src/main/java/info/fetter/logstashforwarder/Registrar.java
Normal file
58
src/main/java/info/fetter/logstashforwarder/Registrar.java
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package info.fetter.logstashforwarder;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright 2015 Didier Fetter
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerationException;
|
||||||
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
|
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
public class Registrar {
|
||||||
|
private static final String SINCEDB = ".logstash-forwarder-java";
|
||||||
|
|
||||||
|
private static ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
public static FileState[] readStateFromJson(File file) throws JsonParseException, JsonMappingException, IOException {
|
||||||
|
FileState[] stateArray = mapper.readValue(file, FileState[].class);
|
||||||
|
return stateArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FileState[] readStateFromJson(String file) throws JsonParseException, JsonMappingException, IOException {
|
||||||
|
return readStateFromJson(new File(file));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FileState[] readStateFromJson() throws JsonParseException, JsonMappingException, IOException {
|
||||||
|
return readStateFromJson(SINCEDB);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void writeStateToJson(File file, Collection<FileState> stateList) throws JsonGenerationException, JsonMappingException, IOException {
|
||||||
|
mapper.writeValue(file, stateList);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void writeStateToJson(String file, Collection<FileState> stateList) throws JsonGenerationException, JsonMappingException, IOException {
|
||||||
|
writeStateToJson(new File(file), stateList);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void writeStateToJson(Collection<FileState> stateList) throws JsonGenerationException, JsonMappingException, IOException {
|
||||||
|
writeStateToJson(SINCEDB, stateList);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package info.fetter.logstashforwarder;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright 2015 Didier Fetter
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
import static org.apache.log4j.Level.DEBUG;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.log4j.BasicConfigurator;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.spi.RootLogger;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerationException;
|
||||||
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
|
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||||
|
|
||||||
|
public class RegistrarTest {
|
||||||
|
Logger logger = Logger.getLogger(RegistrarTest.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
BasicConfigurator.configure();
|
||||||
|
RootLogger.getRootLogger().setLevel(DEBUG);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
BasicConfigurator.resetConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadState1() throws JsonParseException, JsonMappingException, IOException {
|
||||||
|
FileState[] states = Registrar.readStateFromJson(new File(RegistrarTest.class.getClassLoader().getResource("state1.json").getFile()));
|
||||||
|
for(FileState state : states) {
|
||||||
|
logger.debug("Loaded state : " + state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteState2() throws JsonGenerationException, JsonMappingException, IOException {
|
||||||
|
FileState state1 = new FileState();
|
||||||
|
state1.setDirectory("/directory1");
|
||||||
|
state1.setFileName("file1");
|
||||||
|
state1.setPointer(1234);
|
||||||
|
state1.setSignature(123456);
|
||||||
|
state1.setSignatureLength(135);
|
||||||
|
FileState state2 = new FileState();
|
||||||
|
state2.setDirectory("/directory2");
|
||||||
|
state2.setFileName("file2");
|
||||||
|
state2.setPointer(4321);
|
||||||
|
state2.setSignature(654321);
|
||||||
|
state2.setSignatureLength(531);
|
||||||
|
List<FileState> stateList = new ArrayList<FileState>(2);
|
||||||
|
stateList.add(state1);
|
||||||
|
stateList.add(state2);
|
||||||
|
File file = new File("state2.json");
|
||||||
|
logger.debug("Writing to file : " + file.getCanonicalPath());
|
||||||
|
Registrar.writeStateToJson(file, stateList);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user