Created Parameters, ParametersManager, InputWatcher, Watcher.

This commit is contained in:
didfet
2016-08-27 01:48:17 +02:00
parent dfe74164e3
commit 118e8efd63
5 changed files with 83 additions and 214 deletions

View File

@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>logstash-forwarder-java</groupId> <groupId>logstash-forwarder-java</groupId>
<artifactId>logstash-forwarder-java</artifactId> <artifactId>logstash-forwarder-java</artifactId>
<version>0.2.4</version> <version>0.2.5-SNAPSHOT</version>
<name>logstash-forwarder-java</name> <name>logstash-forwarder-java</name>
<description>Java version of logstash forwarder</description> <description>Java version of logstash forwarder</description>
<url>https://github.com/didfet/logstash-forwarder-java</url> <url>https://github.com/didfet/logstash-forwarder-java</url>

View File

@@ -17,7 +17,6 @@ package info.fetter.logstashforwarder;
* *
*/ */
import java.io.UnsupportedEncodingException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -33,7 +32,7 @@ public class Event {
} }
} }
public Event(Map<String,String> fields) throws UnsupportedEncodingException { public Event(Map<String,String> fields) {
for(String key : fields.keySet()) { for(String key : fields.keySet()) {
addField(key, fields.get(key)); addField(key, fields.get(key));
} }
@@ -44,12 +43,12 @@ public class Event {
return this; return this;
} }
public Event addField(String key, String value) throws UnsupportedEncodingException { public Event addField(String key, String value) {
keyValues.put(key, value.getBytes()); keyValues.put(key, value.getBytes());
return this; return this;
} }
public Event addField(String key, long value) throws UnsupportedEncodingException { public Event addField(String key, long value) {
keyValues.put(key, String.valueOf(value).getBytes()); keyValues.put(key, String.valueOf(value).getBytes());
return this; return this;
} }

View File

@@ -17,6 +17,8 @@ package info.fetter.logstashforwarder;
* *
*/ */
import info.fetter.logstashforwarder.config.FilesSection;
import info.fetter.logstashforwarder.config.Parameters;
import info.fetter.logstashforwarder.util.AdapterException; import info.fetter.logstashforwarder.util.AdapterException;
import info.fetter.logstashforwarder.util.LastModifiedFileFilter; import info.fetter.logstashforwarder.util.LastModifiedFileFilter;
@@ -35,18 +37,14 @@ import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.io.monitor.FileAlterationObserver; import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
public class FileWatcher { public class FileWatcher implements Watcher {
private static final Logger logger = Logger.getLogger(FileWatcher.class); private static final Logger logger = Logger.getLogger(FileWatcher.class);
private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>(); private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>();
public static final int ONE_DAY = 24 * 3600 * 1000; public static final int ONE_DAY = 24 * 3600 * 1000;
private Map<File,FileState> oldWatchMap = new HashMap<File,FileState>(); private Map<File,FileState> oldWatchMap = new HashMap<File,FileState>();
private Map<File,FileState> newWatchMap = new HashMap<File,FileState>(); private Map<File,FileState> newWatchMap = new HashMap<File,FileState>();
private FileState[] savedStates; private FileState[] savedStates;
private int maxSignatureLength; private Parameters parameters;
private boolean tail = false;
private Event stdinFields;
private boolean stdinConfigured = false;
private String sincedbFile = null;
public FileWatcher() { public FileWatcher() {
} }
@@ -59,8 +57,8 @@ public class FileWatcher {
oldWatchMap.put(state.getFile(), state); oldWatchMap.put(state.getFile(), state);
} }
} }
processModifications(); detectModifications();
if(tail) { if(parameters.isTailSelected()) {
for(FileState state : oldWatchMap.values()) { for(FileState state : oldWatchMap.values()) {
if(state.getPointer() == 0) { if(state.getPointer() == 0) {
state.setPointer(state.getSize()); state.setPointer(state.getSize());
@@ -70,14 +68,14 @@ public class FileWatcher {
printWatchMap(); printWatchMap();
} }
public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { public void addFilesToWatch(FilesSection files) {
try { try {
if(fileToWatch.equals("-")) { for(String path : files.getPaths()) {
addStdIn(fields); if(path.contains("*")) {
} else if(fileToWatch.contains("*")) { addWildCardFiles(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000);
addWildCardFiles(fileToWatch, fields, deadTime);
} else { } else {
addSingleFile(fileToWatch, fields, deadTime); addSingleFile(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000);
}
} }
} catch(Exception e) { } catch(Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -90,7 +88,7 @@ public class FileWatcher {
for(FileAlterationObserver observer : observerList) { for(FileAlterationObserver observer : observerList) {
observer.checkAndNotify(); observer.checkAndNotify();
} }
processModifications(); detectModifications();
printWatchMap(); printWatchMap();
} }
@@ -98,22 +96,11 @@ public class FileWatcher {
logger.trace("Reading files"); logger.trace("Reading files");
logger.trace("=============="); logger.trace("==============");
int numberOfLinesRead = reader.readFiles(oldWatchMap.values()); int numberOfLinesRead = reader.readFiles(oldWatchMap.values());
Registrar.writeStateToJson(sincedbFile,oldWatchMap.values()); Registrar.writeStateToJson(parameters.getSincedbFile(),oldWatchMap.values());
return numberOfLinesRead; return numberOfLinesRead;
} }
public int readStdin(InputReader reader) throws AdapterException, IOException { private void detectModifications() throws IOException {
if(stdinConfigured) {
logger.debug("Reading stdin");
reader.setFields(stdinFields);
int numberOfLinesRead = reader.readInput();
return numberOfLinesRead;
} else {
return 0;
}
}
private void processModifications() throws IOException {
for(File file : newWatchMap.keySet()) { for(File file : newWatchMap.keySet()) {
FileState state = newWatchMap.get(file); FileState state = newWatchMap.get(file);
@@ -246,12 +233,6 @@ public class FileWatcher {
initializeWatchMap(new File(directory), fileFilter, fields); initializeWatchMap(new File(directory), fileFilter, fields);
} }
private void addStdIn(Event fields) {
logger.error("Watching stdin");
stdinFields = fields;
stdinConfigured = true;
}
private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception { private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception {
if(!directory.isDirectory()) { if(!directory.isDirectory()) {
logger.warn("Directory " + directory + " does not exist"); logger.warn("Directory " + directory + " does not exist");
@@ -271,6 +252,7 @@ public class FileWatcher {
try { try {
FileState state = new FileState(file); FileState state = new FileState(file);
state.setFields(fields); state.setFields(fields);
int maxSignatureLength = parameters.getSignatureLength();
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize()); int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
state.setSignatureLength(signatureLength); state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
@@ -356,23 +338,21 @@ public class FileWatcher {
} }
} }
public int getMaxSignatureLength() { /**
return maxSignatureLength; * @return the parameters
*/
public Parameters getParameters() {
return parameters;
} }
public void setMaxSignatureLength(int maxSignatureLength) { /**
this.maxSignatureLength = maxSignatureLength; * @param parameters the parameters to set
} */
public void setParameters(Parameters parameters) {
public void setTail(boolean tail) { this.parameters = parameters;
this.tail = tail;
}
public void setSincedb(String sincedbFile) {
this.sincedbFile = sincedbFile;
try { try {
logger.debug("Loading saved states"); logger.debug("Loading saved states");
savedStates = Registrar.readStateFromJson(sincedbFile); savedStates = Registrar.readStateFromJson(parameters.getSincedbFile());
} catch(Exception e) { } catch(Exception e) {
logger.warn("Could not load saved states : " + e.getMessage(), e); logger.warn("Could not load saved states : " + e.getMessage(), e);
} }

View File

@@ -25,67 +25,48 @@ import java.util.Random;
import info.fetter.logstashforwarder.config.ConfigurationManager; import info.fetter.logstashforwarder.config.ConfigurationManager;
import info.fetter.logstashforwarder.config.FilesSection; import info.fetter.logstashforwarder.config.FilesSection;
import info.fetter.logstashforwarder.config.Parameters;
import info.fetter.logstashforwarder.config.ParametersManager;
import info.fetter.logstashforwarder.protocol.LumberjackClient; import info.fetter.logstashforwarder.protocol.LumberjackClient;
import info.fetter.logstashforwarder.util.AdapterException; import info.fetter.logstashforwarder.util.AdapterException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.log4j.Appender; import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator; import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.ConsoleAppender; import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout; import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout; import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender; import org.apache.log4j.RollingFileAppender;
import org.apache.log4j.spi.RootLogger; import org.apache.log4j.spi.RootLogger;
public class Forwarder { public class Forwarder {
private static final String SINCEDB = ".logstash-forwarder-java";
private static Logger logger = Logger.getLogger(Forwarder.class); private static Logger logger = Logger.getLogger(Forwarder.class);
private static int spoolSize = 1024;
private static int idleTimeout = 5000;
private static int networkTimeout = 15000;
private static String config;
private static ConfigurationManager configManager; private static ConfigurationManager configManager;
private static FileWatcher watcher; private static FileWatcher fileWatcher;
private static FileReader fileReader; private static FileReader fileReader;
private static InputWatcher inputWatcher;
private static InputReader inputReader; private static InputReader inputReader;
private static Level logLevel = INFO;
private static boolean debugWatcherSelected = false;
private static ProtocolAdapter adapter; private static ProtocolAdapter adapter;
private static Random random = new Random(); private static Random random = new Random();
private static int signatureLength = 4096; private static Parameters parameters;
private static boolean tailSelected = false; private static int networkTimeout = 15000;
private static String logfile = null;
private static String logfileSize = "10MB";
private static int logfileNumber = 5;
private static String sincedbFile = SINCEDB;
public static void main(String[] args) { public static void main(String[] args) {
try { try {
parseOptions(args); parameters = ParametersManager.parseOptions(args);
setupLogging(); setupLogging();
watcher = new FileWatcher(); fileWatcher = new FileWatcher();
watcher.setMaxSignatureLength(signatureLength); fileWatcher.setParameters(parameters);
watcher.setTail(tailSelected); inputWatcher = new InputWatcher();
watcher.setSincedb(sincedbFile); configManager = new ConfigurationManager(parameters.getConfigFile());
configManager = new ConfigurationManager(config);
configManager.readConfiguration(); configManager.readConfiguration();
for(FilesSection files : configManager.getConfig().getFiles()) { for(FilesSection files : configManager.getConfig().getFiles()) {
for(String path : files.getPaths()) { inputWatcher.addFilesToWatch(files);
watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000); fileWatcher.addFilesToWatch(files);
} }
} fileWatcher.initialize();
watcher.initialize(); fileReader = new FileReader(parameters.getSpoolSize());
fileReader = new FileReader(spoolSize); inputReader = new InputReader(parameters.getSpoolSize(), System.in);
inputReader = new InputReader(spoolSize, System.in);
connectToServer(); connectToServer();
infiniteLoop(); infiniteLoop();
} catch(Exception e) { } catch(Exception e) {
@@ -97,10 +78,10 @@ public class Forwarder {
private static void infiniteLoop() throws IOException, InterruptedException { private static void infiniteLoop() throws IOException, InterruptedException {
while(true) { while(true) {
try { try {
watcher.checkFiles(); fileWatcher.checkFiles();
while(watcher.readFiles(fileReader) == spoolSize); while(fileWatcher.readFiles(fileReader) == parameters.getSpoolSize());
while(watcher.readStdin(inputReader) == spoolSize); while(inputWatcher.readStdin(inputReader) == parameters.getSpoolSize());
Thread.sleep(idleTimeout); Thread.sleep(parameters.getIdleTimeout());
} catch(AdapterException e) { } catch(AdapterException e) {
logger.error("Lost server connection"); logger.error("Lost server connection");
Thread.sleep(networkTimeout); Thread.sleep(networkTimeout);
@@ -145,136 +126,22 @@ public class Forwarder {
} }
} }
@SuppressWarnings("static-access")
static void parseOptions(String[] args) {
Options options = new Options();
Option helpOption = new Option("help", "print this message");
Option quietOption = new Option("quiet", "operate in quiet mode - only emit errors to log");
Option debugOption = new Option("debug", "operate in debug mode");
Option debugWatcherOption = new Option("debugwatcher", "operate watcher in debug mode");
Option traceOption = new Option("trace", "operate in trace mode");
Option tailOption = new Option("tail", "read new files from the end");
Option spoolSizeOption = OptionBuilder.withArgName("number of events")
.hasArg()
.withDescription("event count spool threshold - forces network flush")
.create("spoolsize");
Option idleTimeoutOption = OptionBuilder.withArgName("")
.hasArg()
.withDescription("time between file reads in seconds")
.create("idletimeout");
Option configOption = OptionBuilder.withArgName("config file")
.hasArg()
.isRequired()
.withDescription("path to logstash-forwarder configuration file")
.create("config");
Option signatureLengthOption = OptionBuilder.withArgName("signature length")
.hasArg()
.withDescription("Maximum length of file signature")
.create("signaturelength");
Option logfileOption = OptionBuilder.withArgName("logfile name")
.hasArg()
.withDescription("Logfile name")
.create("logfile");
Option logfileSizeOption = OptionBuilder.withArgName("logfile size")
.hasArg()
.withDescription("Logfile size (default 10M)")
.create("logfilesize");
Option logfileNumberOption = OptionBuilder.withArgName("number of logfiles")
.hasArg()
.withDescription("Number of logfiles (default 5)")
.create("logfilenumber");
Option sincedbOption = OptionBuilder.withArgName("sincedb file")
.hasArg()
.withDescription("Sincedb file name")
.create("sincedb");
options.addOption(helpOption)
.addOption(idleTimeoutOption)
.addOption(spoolSizeOption)
.addOption(quietOption)
.addOption(debugOption)
.addOption(debugWatcherOption)
.addOption(traceOption)
.addOption(tailOption)
.addOption(signatureLengthOption)
.addOption(configOption)
.addOption(logfileOption)
.addOption(logfileNumberOption)
.addOption(logfileSizeOption)
.addOption(sincedbOption);
CommandLineParser parser = new GnuParser();
try {
CommandLine line = parser.parse(options, args);
if(line.hasOption("spoolsize")) {
spoolSize = Integer.parseInt(line.getOptionValue("spoolsize"));
}
if(line.hasOption("idletimeout")) {
idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout"));
}
if(line.hasOption("config")) {
config = line.getOptionValue("config");
}
if(line.hasOption("signaturelength")) {
signatureLength = Integer.parseInt(line.getOptionValue("signaturelength"));
}
if(line.hasOption("quiet")) {
logLevel = ERROR;
}
if(line.hasOption("debug")) {
logLevel = DEBUG;
}
if(line.hasOption("trace")) {
logLevel = TRACE;
}
if(line.hasOption("debugwatcher")) {
debugWatcherSelected = true;
}
if(line.hasOption("tail")) {
tailSelected = true;
}
if(line.hasOption("logfile")) {
logfile = line.getOptionValue("logfile");
}
if(line.hasOption("logfilesize")) {
logfileSize = line.getOptionValue("logfilesize");
}
if(line.hasOption("logfilenumber")) {
logfileNumber = Integer.parseInt(line.getOptionValue("logfilenumber"));
}
if(line.hasOption("sincedb")) {
sincedbFile = line.getOptionValue("sincedb");
}
} catch(ParseException e) {
printHelp(options);
System.exit(1);;
} catch(NumberFormatException e) {
System.err.println("Value must be an integer");
printHelp(options);
System.exit(2);;
}
}
private static void printHelp(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("logstash-forwarder", options);
}
private static void setupLogging() throws IOException { private static void setupLogging() throws IOException {
Appender appender; Appender appender;
Layout layout = new PatternLayout("%d %p %c{1} - %m%n"); Layout layout = new PatternLayout("%d %p %c{1} - %m%n");
if(logfile == null) { if(parameters.getLogfile() == null) {
appender = new ConsoleAppender(layout); appender = new ConsoleAppender(layout);
} else { } else {
RollingFileAppender rolling = new RollingFileAppender(layout, logfile, true); RollingFileAppender rolling = new RollingFileAppender(layout, parameters.getLogfile(), true);
rolling.setMaxFileSize(logfileSize); rolling.setMaxFileSize(parameters.getLogfileSize());
rolling.setMaxBackupIndex(logfileNumber); rolling.setMaxBackupIndex(parameters.getLogfileNumber());
appender = rolling; appender = rolling;
} }
BasicConfigurator.configure(appender); BasicConfigurator.configure(appender);
RootLogger.getRootLogger().setLevel(logLevel); RootLogger.getRootLogger().setLevel(parameters.getLogLevel());
if(debugWatcherSelected) { if(parameters.isDebugWatcherSelected()) {
Logger.getLogger(FileWatcher.class).addAppender(appender); Logger.getLogger(FileWatcher.class).addAppender(appender);
Logger.getLogger(FileWatcher.class).setLevel(DEBUG); Logger.getLogger(FileWatcher.class).setLevel(DEBUG);
Logger.getLogger(FileWatcher.class).setAdditivity(false); Logger.getLogger(FileWatcher.class).setAdditivity(false);

View File

@@ -18,9 +18,15 @@ package info.fetter.logstashforwarder;
*/ */
import static org.apache.log4j.Level.*; import static org.apache.log4j.Level.*;
import info.fetter.logstashforwarder.config.FilesSection;
import info.fetter.logstashforwarder.config.Parameters;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.log4j.BasicConfigurator; import org.apache.log4j.BasicConfigurator;
@@ -47,7 +53,15 @@ public class FileWatcherTest {
//@Test //@Test
public void testFileWatch() throws InterruptedException, IOException { public void testFileWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher(); FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); FilesSection files = new FilesSection();
Map<String,String> fields = new HashMap<String, String>();
List<String> paths = new ArrayList<String>();
fields.put("test", "test");
paths.add("./test.txt");
files.setFields(fields);
files.setPaths(paths);
files.setDeadTime("24h");
watcher.addFilesToWatch(files);
for(int i = 0; i < 100; i++) { for(int i = 0; i < 100; i++) {
Thread.sleep(1000); Thread.sleep(1000);
watcher.checkFiles(); watcher.checkFiles();
@@ -61,7 +75,16 @@ public class FileWatcherTest {
return; return;
} }
FileWatcher watcher = new FileWatcher(); FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); FilesSection files = new FilesSection();
Map<String,String> fields = new HashMap<String, String>();
List<String> paths = new ArrayList<String>();
fields.put("test", "test");
paths.add("./testFileWatcher*.txt");
files.setFields(fields);
files.setPaths(paths);
files.setDeadTime("24h");
watcher.addFilesToWatch(files);
watcher.setParameters(new Parameters());
watcher.initialize(); watcher.initialize();
File file1 = new File("testFileWatcher1.txt"); File file1 = new File("testFileWatcher1.txt");