From 7a7dfcb2c2e54cdd6d689f91f82c6b62523b908f Mon Sep 17 00:00:00 2001 From: didfet Date: Tue, 17 Mar 2015 18:24:28 +0100 Subject: [PATCH] Modified loop. --- .../fetter/logstashforwarder/FileWatcher.java | 12 ++-- .../fetter/logstashforwarder/Forwarder.java | 70 +++++++++++++++---- .../protocol/LumberjackClient.java | 2 +- 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index 1c04998..af7ac26 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -66,10 +66,10 @@ public class FileWatcher { printWatchMap(); } - public void readFiles(FileReader reader) throws IOException { + public int readFiles(FileReader reader) throws IOException { logger.debug("Reading files"); logger.trace("=============="); - reader.readFiles(watchMap.values()); + return reader.readFiles(watchMap.values()); } private void processModifications() throws IOException { @@ -194,6 +194,10 @@ public class FileWatcher { } private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception { + if(!directory.isDirectory()) { + logger.warn("Directory " + directory + " does not exist"); + return; + } FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter); FileModificationListener listener = new FileModificationListener(this, fields); observer.addListener(listener); @@ -246,12 +250,12 @@ public class FileWatcher { } } - private void printWatchMap() { + private void printWatchMap() throws IOException { if(logger.isTraceEnabled()) { logger.trace("WatchMap contents : "); for(File file : watchMap.keySet()) { FileState state = watchMap.get(file); - logger.trace("\tFile : " + state.getDirectory() + " marked for deletion : " + state.isDeleted()); + logger.trace("\tFile : " + file.getCanonicalPath() + " marked for deletion : " + state.isDeleted()); } } } diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index 0db9748..38fe379 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -1,17 +1,26 @@ package info.fetter.logstashforwarder; +import static org.apache.log4j.Level.*; + +import java.io.IOException; + import info.fetter.logstashforwarder.config.ConfigurationManager; import info.fetter.logstashforwarder.config.FilesSection; +import info.fetter.logstashforwarder.protocol.LumberjackClient; -import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; +import org.apache.log4j.Appender; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.RootLogger; /* * Copyright 2015 Didier Fetter @@ -37,10 +46,17 @@ public class Forwarder { private static ConfigurationManager configManager; private static FileWatcher watcher = new FileWatcher(); private static FileReader reader; + private static Level logLevel = INFO; + private static ProtocolAdapter adapter; - static void main(String[] args) { + public static void main(String[] args) { try { parseOptions(args); + BasicConfigurator.configure(); + RootLogger.getRootLogger().setLevel(logLevel); +// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement()); +// Logger.getLogger(FileReader.class).setLevel(TRACE); +// Logger.getLogger(FileReader.class).setAdditivity(false); configManager = new ConfigurationManager(config); configManager.readConfiguration(); for(FilesSection files : configManager.getConfig().getFiles()) { @@ -49,44 +65,74 @@ public class Forwarder { } } reader = new FileReader(spoolSize); + String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":"); + adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1])); + reader.setAdapter(adapter); + infiniteLoop(); } catch(Exception e) { - System.err.println(e.getMessage()); + e.printStackTrace(); System.exit(3); } } + private static void infiniteLoop() throws IOException, InterruptedException { + while(true) { + watcher.checkFiles(); + while(watcher.readFiles(reader) == spoolSize); + Thread.sleep(idleTimeout); + } + } + @SuppressWarnings("static-access") static void parseOptions(String[] args) { Options options = new Options(); Option helpOption = new Option("help", "print this message"); + Option quiet = new Option("quiet", "operate in quiet mode - only emit errors to log"); + Option debug = new Option("debug", "operate in debug mode"); + Option trace = new Option("trace", "operate in trace mode"); Option spoolSizeOption = OptionBuilder.withArgName("number of events") .hasArg() .withDescription("event count spool threshold - forces network flush") - .create("spool-size"); + .create("spoolsize"); Option idleTimeoutOption = OptionBuilder.withArgName("") .hasArg() .withDescription("time between file reads in seconds") - .create("idle-timeout"); + .create("idletimeout"); Option configOption = OptionBuilder.withArgName("config file") .hasArg() .isRequired() .withDescription("path to logstash-forwarder configuration file") .create("config"); - options.addOption(helpOption).addOption(idleTimeoutOption).addOption(spoolSizeOption).addOption(configOption); - CommandLineParser parser = new BasicParser(); + options.addOption(helpOption) + .addOption(idleTimeoutOption) + .addOption(spoolSizeOption) + .addOption(quiet) + .addOption(debug) + .addOption(trace) + .addOption(configOption); + CommandLineParser parser = new GnuParser(); try { CommandLine line = parser.parse(options, args); - if(line.hasOption("spool-size")) { - spoolSize = Integer.parseInt(line.getOptionValue("spool-size")); + if(line.hasOption("spoolsize")) { + spoolSize = Integer.parseInt(line.getOptionValue("spoolsize")); } - if(line.hasOption("idle-timeout")) { - idleTimeout = Integer.parseInt(line.getOptionValue("idle-timeout")); + if(line.hasOption("idletimeout")) { + idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout")); } if(line.hasOption("config")) { config = line.getOptionValue("config"); } + if(line.hasOption("quiet")) { + logLevel = ERROR; + } + if(line.hasOption("debug")) { + logLevel = DEBUG; + } + if(line.hasOption("trace")) { + logLevel = TRACE; + } } catch(ParseException e) { printHelp(options); System.exit(1);; diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index 542969e..6acc01a 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -129,7 +129,7 @@ public class LumberjackClient implements ProtocolAdapter { ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream(); DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes); for(Map keyValues : keyValuesList) { - logger.debug("Adding data frame"); + logger.trace("Adding data frame"); sendDataFrame(uncompressedOutput, keyValues); } uncompressedOutput.close();