diff --git a/pom.xml b/pom.xml index 429de54..320de5d 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,11 @@ commons-lang 2.6 + + commons-cli + commons-cli + 1.2 + \ No newline at end of file diff --git a/src/main/java/info/fetter/logstashforwarder/Event.java b/src/main/java/info/fetter/logstashforwarder/Event.java index 45b1136..0746722 100644 --- a/src/main/java/info/fetter/logstashforwarder/Event.java +++ b/src/main/java/info/fetter/logstashforwarder/Event.java @@ -32,6 +32,12 @@ public class Event { } } + public Event(Map fields) { + for(String key : fields.keySet()) { + addField(key, fields.get(key)); + } + } + public Event addField(String key, byte[] value) { keyValues.put(key, value); return this; diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 3bba986..5ba30fb 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -6,6 +6,7 @@ import java.io.RandomAccessFile; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,7 +50,7 @@ public class FileReader { eventList = new ArrayList(spoolSize); } - public int readFiles(List fileList) throws IOException { + public int readFiles(Collection fileList) throws IOException { int eventCount = 0; if(logger.isTraceEnabled()) { logger.trace("Reading " + fileList.size() + " file(s)"); diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index 59f740c..1c04998 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -37,28 +37,19 @@ import org.apache.log4j.Logger; public class FileWatcher { private static final Logger logger = Logger.getLogger(FileWatcher.class); private List observerList = new ArrayList(); - private static final int ONE_DAY = 24 * 3600 * 1000; - private long deadTime; + public static final int ONE_DAY = 24 * 3600 * 1000; private Map watchMap = new HashMap(); private Map changedWatchMap = new HashMap(); private static int MAX_SIGNATURE_LENGTH = 1024; - public FileWatcher(long deadTime) { - this.deadTime = deadTime; - } - - public FileWatcher() { - this(ONE_DAY); - } - - public void addFilesToWatch(String fileToWatch, Event fields) { + public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) { try { if(fileToWatch.equals("-")) { addStdIn(fields); } else if(fileToWatch.contains("*")) { - addWildCardFiles(fileToWatch, fields); + addWildCardFiles(fileToWatch, fields, deadTime); } else { - addSingleFile(fileToWatch, fields); + addSingleFile(fileToWatch, fields, deadTime); } } catch(Exception e) { throw new RuntimeException(e); @@ -75,9 +66,10 @@ public class FileWatcher { printWatchMap(); } - public void readFiles() { + public void readFiles(FileReader reader) throws IOException { logger.debug("Reading files"); logger.trace("=============="); + reader.readFiles(watchMap.values()); } private void processModifications() throws IOException { @@ -174,7 +166,7 @@ public class FileWatcher { removeMarkedFilesFromWatchMap(); } - private void addSingleFile(String fileToWatch, Event fields) throws Exception { + private void addSingleFile(String fileToWatch, Event fields, int deadTime) throws Exception { logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); String directory = FilenameUtils.getFullPath(fileToWatch); String fileName = FilenameUtils.getName(fileToWatch); @@ -185,7 +177,7 @@ public class FileWatcher { initializeWatchMap(new File(directory), fileFilter, fields); } - private void addWildCardFiles(String filesToWatch, Event fields) throws Exception { + private void addWildCardFiles(String filesToWatch, Event fields, int deadTime) throws Exception { logger.info("Watching wildcard files : " + filesToWatch); String directory = FilenameUtils.getFullPath(filesToWatch); String wildcard = FilenameUtils.getName(filesToWatch); diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java new file mode 100644 index 0000000..0db9748 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -0,0 +1,105 @@ +package info.fetter.logstashforwarder; + +import info.fetter.logstashforwarder.config.ConfigurationManager; +import info.fetter.logstashforwarder.config.FilesSection; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +public class Forwarder { + private static int spoolSize = 1024; + private static int idleTimeout = 5000; + private static String config; + private static ConfigurationManager configManager; + private static FileWatcher watcher = new FileWatcher(); + private static FileReader reader; + + static void main(String[] args) { + try { + parseOptions(args); + configManager = new ConfigurationManager(config); + configManager.readConfiguration(); + for(FilesSection files : configManager.getConfig().getFiles()) { + for(String path : files.getPaths()) { + watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY); + } + } + reader = new FileReader(spoolSize); + } catch(Exception e) { + System.err.println(e.getMessage()); + System.exit(3); + } + } + + @SuppressWarnings("static-access") + static void parseOptions(String[] args) { + Options options = new Options(); + Option helpOption = new Option("help", "print this message"); + + Option spoolSizeOption = OptionBuilder.withArgName("number of events") + .hasArg() + .withDescription("event count spool threshold - forces network flush") + .create("spool-size"); + Option idleTimeoutOption = OptionBuilder.withArgName("") + .hasArg() + .withDescription("time between file reads in seconds") + .create("idle-timeout"); + Option configOption = OptionBuilder.withArgName("config file") + .hasArg() + .isRequired() + .withDescription("path to logstash-forwarder configuration file") + .create("config"); + + options.addOption(helpOption).addOption(idleTimeoutOption).addOption(spoolSizeOption).addOption(configOption); + CommandLineParser parser = new BasicParser(); + try { + CommandLine line = parser.parse(options, args); + if(line.hasOption("spool-size")) { + spoolSize = Integer.parseInt(line.getOptionValue("spool-size")); + } + if(line.hasOption("idle-timeout")) { + idleTimeout = Integer.parseInt(line.getOptionValue("idle-timeout")); + } + if(line.hasOption("config")) { + config = line.getOptionValue("config"); + } + } catch(ParseException e) { + printHelp(options); + System.exit(1);; + } catch(NumberFormatException e) { + System.err.println("Value must be an integer"); + printHelp(options); + System.exit(2);; + } + } + + private static void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("logstash-forwarder", options); + } + +} diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java index fd836e6..5b433e6 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -1,5 +1,22 @@ package info.fetter.logstashforwarder; +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + import static org.apache.log4j.Level.*; import java.io.File; @@ -30,7 +47,7 @@ public class FileWatcherTest { //@Test public void testFileWatch() throws InterruptedException, IOException { FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test")); + watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); for(int i = 0; i < 100; i++) { Thread.sleep(1000); watcher.checkFiles(); @@ -40,7 +57,7 @@ public class FileWatcherTest { //@Test public void testWildcardWatch() throws InterruptedException, IOException { FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test")); + watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY); File file1 = new File("test1.txt"); File file2 = new File("test2.txt");