diff --git a/pom.xml b/pom.xml index 7809709..cc2bbe9 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 logstash-forwarder-java logstash-forwarder-java - 0.2.4 + 0.2.5-SNAPSHOT logstash-forwarder-java Java version of logstash forwarder https://github.com/didfet/logstash-forwarder-java diff --git a/src/main/java/info/fetter/logstashforwarder/Event.java b/src/main/java/info/fetter/logstashforwarder/Event.java index a4be5b0..1ae1c50 100644 --- a/src/main/java/info/fetter/logstashforwarder/Event.java +++ b/src/main/java/info/fetter/logstashforwarder/Event.java @@ -17,7 +17,6 @@ package info.fetter.logstashforwarder; * */ -import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; @@ -33,7 +32,7 @@ public class Event { } } - public Event(Map fields) throws UnsupportedEncodingException { + public Event(Map fields) { for(String key : fields.keySet()) { addField(key, fields.get(key)); } @@ -44,12 +43,12 @@ public class Event { return this; } - public Event addField(String key, String value) throws UnsupportedEncodingException { + public Event addField(String key, String value) { keyValues.put(key, value.getBytes()); return this; } - public Event addField(String key, long value) throws UnsupportedEncodingException { + public Event addField(String key, long value) { keyValues.put(key, String.valueOf(value).getBytes()); return this; } diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index b50b7b4..be2301a 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -17,6 +17,8 @@ package info.fetter.logstashforwarder; * */ +import info.fetter.logstashforwarder.config.FilesSection; +import info.fetter.logstashforwarder.config.Parameters; import info.fetter.logstashforwarder.util.AdapterException; import info.fetter.logstashforwarder.util.LastModifiedFileFilter; @@ -35,18 +37,14 @@ import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.io.monitor.FileAlterationObserver; import org.apache.log4j.Logger; -public class FileWatcher { +public class FileWatcher implements Watcher { private static final Logger logger = Logger.getLogger(FileWatcher.class); private List observerList = new ArrayList(); public static final int ONE_DAY = 24 * 3600 * 1000; private Map oldWatchMap = new HashMap(); private Map newWatchMap = new HashMap(); private FileState[] savedStates; - private int maxSignatureLength; - private boolean tail = false; - private Event stdinFields; - private boolean stdinConfigured = false; - private String sincedbFile = null; + private Parameters parameters; public FileWatcher() { } @@ -59,8 +57,8 @@ public class FileWatcher { oldWatchMap.put(state.getFile(), state); } } - processModifications(); - if(tail) { + detectModifications(); + if(parameters.isTailSelected()) { for(FileState state : oldWatchMap.values()) { if(state.getPointer() == 0) { state.setPointer(state.getSize()); @@ -70,14 +68,14 @@ public class FileWatcher { printWatchMap(); } - public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { + public void addFilesToWatch(FilesSection files) { try { - if(fileToWatch.equals("-")) { - addStdIn(fields); - } else if(fileToWatch.contains("*")) { - addWildCardFiles(fileToWatch, fields, deadTime); - } else { - addSingleFile(fileToWatch, fields, deadTime); + for(String path : files.getPaths()) { + if(path.contains("*")) { + addWildCardFiles(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000); + } else { + addSingleFile(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000); + } } } catch(Exception e) { throw new RuntimeException(e); @@ -90,7 +88,7 @@ public class FileWatcher { for(FileAlterationObserver observer : observerList) { observer.checkAndNotify(); } - processModifications(); + detectModifications(); printWatchMap(); } @@ -98,22 +96,11 @@ public class FileWatcher { logger.trace("Reading files"); logger.trace("=============="); int numberOfLinesRead = reader.readFiles(oldWatchMap.values()); - Registrar.writeStateToJson(sincedbFile,oldWatchMap.values()); + Registrar.writeStateToJson(parameters.getSincedbFile(),oldWatchMap.values()); return numberOfLinesRead; } - public int readStdin(InputReader reader) throws AdapterException, IOException { - if(stdinConfigured) { - logger.debug("Reading stdin"); - reader.setFields(stdinFields); - int numberOfLinesRead = reader.readInput(); - return numberOfLinesRead; - } else { - return 0; - } - } - - private void processModifications() throws IOException { + private void detectModifications() throws IOException { for(File file : newWatchMap.keySet()) { FileState state = newWatchMap.get(file); @@ -246,12 +233,6 @@ public class FileWatcher { initializeWatchMap(new File(directory), fileFilter, fields); } - private void addStdIn(Event fields) { - logger.error("Watching stdin"); - stdinFields = fields; - stdinConfigured = true; - } - private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception { if(!directory.isDirectory()) { logger.warn("Directory " + directory + " does not exist"); @@ -271,6 +252,7 @@ public class FileWatcher { try { FileState state = new FileState(file); state.setFields(fields); + int maxSignatureLength = parameters.getSignatureLength(); int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize()); state.setSignatureLength(signatureLength); long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); @@ -356,23 +338,21 @@ public class FileWatcher { } } - public int getMaxSignatureLength() { - return maxSignatureLength; + /** + * @return the parameters + */ + public Parameters getParameters() { + return parameters; } - public void setMaxSignatureLength(int maxSignatureLength) { - this.maxSignatureLength = maxSignatureLength; - } - - public void setTail(boolean tail) { - this.tail = tail; - } - - public void setSincedb(String sincedbFile) { - this.sincedbFile = sincedbFile; + /** + * @param parameters the parameters to set + */ + public void setParameters(Parameters parameters) { + this.parameters = parameters; try { logger.debug("Loading saved states"); - savedStates = Registrar.readStateFromJson(sincedbFile); + savedStates = Registrar.readStateFromJson(parameters.getSincedbFile()); } catch(Exception e) { logger.warn("Could not load saved states : " + e.getMessage(), e); } diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index dc52bbb..8e32d40 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -25,67 +25,48 @@ import java.util.Random; import info.fetter.logstashforwarder.config.ConfigurationManager; import info.fetter.logstashforwarder.config.FilesSection; +import info.fetter.logstashforwarder.config.Parameters; +import info.fetter.logstashforwarder.config.ParametersManager; import info.fetter.logstashforwarder.protocol.LumberjackClient; import info.fetter.logstashforwarder.util.AdapterException; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; import org.apache.log4j.Appender; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Layout; -import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.log4j.RollingFileAppender; import org.apache.log4j.spi.RootLogger; public class Forwarder { - private static final String SINCEDB = ".logstash-forwarder-java"; private static Logger logger = Logger.getLogger(Forwarder.class); - private static int spoolSize = 1024; - private static int idleTimeout = 5000; - private static int networkTimeout = 15000; - private static String config; private static ConfigurationManager configManager; - private static FileWatcher watcher; + private static FileWatcher fileWatcher; private static FileReader fileReader; + private static InputWatcher inputWatcher; private static InputReader inputReader; - private static Level logLevel = INFO; - private static boolean debugWatcherSelected = false; private static ProtocolAdapter adapter; private static Random random = new Random(); - private static int signatureLength = 4096; - private static boolean tailSelected = false; - private static String logfile = null; - private static String logfileSize = "10MB"; - private static int logfileNumber = 5; - private static String sincedbFile = SINCEDB; + private static Parameters parameters; + private static int networkTimeout = 15000; public static void main(String[] args) { try { - parseOptions(args); + parameters = ParametersManager.parseOptions(args); setupLogging(); - watcher = new FileWatcher(); - watcher.setMaxSignatureLength(signatureLength); - watcher.setTail(tailSelected); - watcher.setSincedb(sincedbFile); - configManager = new ConfigurationManager(config); + fileWatcher = new FileWatcher(); + fileWatcher.setParameters(parameters); + inputWatcher = new InputWatcher(); + configManager = new ConfigurationManager(parameters.getConfigFile()); configManager.readConfiguration(); for(FilesSection files : configManager.getConfig().getFiles()) { - for(String path : files.getPaths()) { - watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000); - } + inputWatcher.addFilesToWatch(files); + fileWatcher.addFilesToWatch(files); } - watcher.initialize(); - fileReader = new FileReader(spoolSize); - inputReader = new InputReader(spoolSize, System.in); + fileWatcher.initialize(); + fileReader = new FileReader(parameters.getSpoolSize()); + inputReader = new InputReader(parameters.getSpoolSize(), System.in); connectToServer(); infiniteLoop(); } catch(Exception e) { @@ -97,10 +78,10 @@ public class Forwarder { private static void infiniteLoop() throws IOException, InterruptedException { while(true) { try { - watcher.checkFiles(); - while(watcher.readFiles(fileReader) == spoolSize); - while(watcher.readStdin(inputReader) == spoolSize); - Thread.sleep(idleTimeout); + fileWatcher.checkFiles(); + while(fileWatcher.readFiles(fileReader) == parameters.getSpoolSize()); + while(inputWatcher.readStdin(inputReader) == parameters.getSpoolSize()); + Thread.sleep(parameters.getIdleTimeout()); } catch(AdapterException e) { logger.error("Lost server connection"); Thread.sleep(networkTimeout); @@ -145,136 +126,22 @@ public class Forwarder { } } - @SuppressWarnings("static-access") - static void parseOptions(String[] args) { - Options options = new Options(); - Option helpOption = new Option("help", "print this message"); - Option quietOption = new Option("quiet", "operate in quiet mode - only emit errors to log"); - Option debugOption = new Option("debug", "operate in debug mode"); - Option debugWatcherOption = new Option("debugwatcher", "operate watcher in debug mode"); - Option traceOption = new Option("trace", "operate in trace mode"); - Option tailOption = new Option("tail", "read new files from the end"); - Option spoolSizeOption = OptionBuilder.withArgName("number of events") - .hasArg() - .withDescription("event count spool threshold - forces network flush") - .create("spoolsize"); - Option idleTimeoutOption = OptionBuilder.withArgName("") - .hasArg() - .withDescription("time between file reads in seconds") - .create("idletimeout"); - Option configOption = OptionBuilder.withArgName("config file") - .hasArg() - .isRequired() - .withDescription("path to logstash-forwarder configuration file") - .create("config"); - Option signatureLengthOption = OptionBuilder.withArgName("signature length") - .hasArg() - .withDescription("Maximum length of file signature") - .create("signaturelength"); - Option logfileOption = OptionBuilder.withArgName("logfile name") - .hasArg() - .withDescription("Logfile name") - .create("logfile"); - Option logfileSizeOption = OptionBuilder.withArgName("logfile size") - .hasArg() - .withDescription("Logfile size (default 10M)") - .create("logfilesize"); - Option logfileNumberOption = OptionBuilder.withArgName("number of logfiles") - .hasArg() - .withDescription("Number of logfiles (default 5)") - .create("logfilenumber"); - Option sincedbOption = OptionBuilder.withArgName("sincedb file") - .hasArg() - .withDescription("Sincedb file name") - .create("sincedb"); - - options.addOption(helpOption) - .addOption(idleTimeoutOption) - .addOption(spoolSizeOption) - .addOption(quietOption) - .addOption(debugOption) - .addOption(debugWatcherOption) - .addOption(traceOption) - .addOption(tailOption) - .addOption(signatureLengthOption) - .addOption(configOption) - .addOption(logfileOption) - .addOption(logfileNumberOption) - .addOption(logfileSizeOption) - .addOption(sincedbOption); - - CommandLineParser parser = new GnuParser(); - try { - CommandLine line = parser.parse(options, args); - if(line.hasOption("spoolsize")) { - spoolSize = Integer.parseInt(line.getOptionValue("spoolsize")); - } - if(line.hasOption("idletimeout")) { - idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout")); - } - if(line.hasOption("config")) { - config = line.getOptionValue("config"); - } - if(line.hasOption("signaturelength")) { - signatureLength = Integer.parseInt(line.getOptionValue("signaturelength")); - } - if(line.hasOption("quiet")) { - logLevel = ERROR; - } - if(line.hasOption("debug")) { - logLevel = DEBUG; - } - if(line.hasOption("trace")) { - logLevel = TRACE; - } - if(line.hasOption("debugwatcher")) { - debugWatcherSelected = true; - } - if(line.hasOption("tail")) { - tailSelected = true; - } - if(line.hasOption("logfile")) { - logfile = line.getOptionValue("logfile"); - } - if(line.hasOption("logfilesize")) { - logfileSize = line.getOptionValue("logfilesize"); - } - if(line.hasOption("logfilenumber")) { - logfileNumber = Integer.parseInt(line.getOptionValue("logfilenumber")); - } - if(line.hasOption("sincedb")) { - sincedbFile = line.getOptionValue("sincedb"); - } - } catch(ParseException e) { - printHelp(options); - System.exit(1);; - } catch(NumberFormatException e) { - System.err.println("Value must be an integer"); - printHelp(options); - System.exit(2);; - } - } - - private static void printHelp(Options options) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("logstash-forwarder", options); - } private static void setupLogging() throws IOException { Appender appender; Layout layout = new PatternLayout("%d %p %c{1} - %m%n"); - if(logfile == null) { + if(parameters.getLogfile() == null) { appender = new ConsoleAppender(layout); } else { - RollingFileAppender rolling = new RollingFileAppender(layout, logfile, true); - rolling.setMaxFileSize(logfileSize); - rolling.setMaxBackupIndex(logfileNumber); + RollingFileAppender rolling = new RollingFileAppender(layout, parameters.getLogfile(), true); + rolling.setMaxFileSize(parameters.getLogfileSize()); + rolling.setMaxBackupIndex(parameters.getLogfileNumber()); appender = rolling; } BasicConfigurator.configure(appender); - RootLogger.getRootLogger().setLevel(logLevel); - if(debugWatcherSelected) { + RootLogger.getRootLogger().setLevel(parameters.getLogLevel()); + if(parameters.isDebugWatcherSelected()) { Logger.getLogger(FileWatcher.class).addAppender(appender); Logger.getLogger(FileWatcher.class).setLevel(DEBUG); Logger.getLogger(FileWatcher.class).setAdditivity(false); diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java index 2fe8835..d835824 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -18,9 +18,15 @@ package info.fetter.logstashforwarder; */ import static org.apache.log4j.Level.*; +import info.fetter.logstashforwarder.config.FilesSection; +import info.fetter.logstashforwarder.config.Parameters; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.log4j.BasicConfigurator; @@ -47,7 +53,15 @@ public class FileWatcherTest { //@Test public void testFileWatch() throws InterruptedException, IOException { FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); + FilesSection files = new FilesSection(); + Map fields = new HashMap(); + List paths = new ArrayList(); + fields.put("test", "test"); + paths.add("./test.txt"); + files.setFields(fields); + files.setPaths(paths); + files.setDeadTime("24h"); + watcher.addFilesToWatch(files); for(int i = 0; i < 100; i++) { Thread.sleep(1000); watcher.checkFiles(); @@ -61,7 +75,16 @@ public class FileWatcherTest { return; } FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); + FilesSection files = new FilesSection(); + Map fields = new HashMap(); + List paths = new ArrayList(); + fields.put("test", "test"); + paths.add("./testFileWatcher*.txt"); + files.setFields(fields); + files.setPaths(paths); + files.setDeadTime("24h"); + watcher.addFilesToWatch(files); + watcher.setParameters(new Parameters()); watcher.initialize(); File file1 = new File("testFileWatcher1.txt");