Modified loop.

This commit is contained in:
didfet
2015-03-17 18:24:28 +01:00
parent d358f43b6f
commit 7a7dfcb2c2
3 changed files with 67 additions and 17 deletions

View File

@@ -66,10 +66,10 @@ public class FileWatcher {
printWatchMap();
}
public void readFiles(FileReader reader) throws IOException {
public int readFiles(FileReader reader) throws IOException {
logger.debug("Reading files");
logger.trace("==============");
reader.readFiles(watchMap.values());
return reader.readFiles(watchMap.values());
}
private void processModifications() throws IOException {
@@ -194,6 +194,10 @@ public class FileWatcher {
}
private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception {
if(!directory.isDirectory()) {
logger.warn("Directory " + directory + " does not exist");
return;
}
FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter);
FileModificationListener listener = new FileModificationListener(this, fields);
observer.addListener(listener);
@@ -246,12 +250,12 @@ public class FileWatcher {
}
}
private void printWatchMap() {
private void printWatchMap() throws IOException {
if(logger.isTraceEnabled()) {
logger.trace("WatchMap contents : ");
for(File file : watchMap.keySet()) {
FileState state = watchMap.get(file);
logger.trace("\tFile : " + state.getDirectory() + " marked for deletion : " + state.isDeleted());
logger.trace("\tFile : " + file.getCanonicalPath() + " marked for deletion : " + state.isDeleted());
}
}
}

View File

@@ -1,17 +1,26 @@
package info.fetter.logstashforwarder;
import static org.apache.log4j.Level.*;
import java.io.IOException;
import info.fetter.logstashforwarder.config.ConfigurationManager;
import info.fetter.logstashforwarder.config.FilesSection;
import info.fetter.logstashforwarder.protocol.LumberjackClient;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.RootLogger;
/*
* Copyright 2015 Didier Fetter
@@ -37,10 +46,17 @@ public class Forwarder {
private static ConfigurationManager configManager;
private static FileWatcher watcher = new FileWatcher();
private static FileReader reader;
private static Level logLevel = INFO;
private static ProtocolAdapter adapter;
static void main(String[] args) {
public static void main(String[] args) {
try {
parseOptions(args);
BasicConfigurator.configure();
RootLogger.getRootLogger().setLevel(logLevel);
// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
// Logger.getLogger(FileReader.class).setLevel(TRACE);
// Logger.getLogger(FileReader.class).setAdditivity(false);
configManager = new ConfigurationManager(config);
configManager.readConfiguration();
for(FilesSection files : configManager.getConfig().getFiles()) {
@@ -49,44 +65,74 @@ public class Forwarder {
}
}
reader = new FileReader(spoolSize);
String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":");
adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]));
reader.setAdapter(adapter);
infiniteLoop();
} catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
System.exit(3);
}
}
private static void infiniteLoop() throws IOException, InterruptedException {
while(true) {
watcher.checkFiles();
while(watcher.readFiles(reader) == spoolSize);
Thread.sleep(idleTimeout);
}
}
@SuppressWarnings("static-access")
static void parseOptions(String[] args) {
Options options = new Options();
Option helpOption = new Option("help", "print this message");
Option quiet = new Option("quiet", "operate in quiet mode - only emit errors to log");
Option debug = new Option("debug", "operate in debug mode");
Option trace = new Option("trace", "operate in trace mode");
Option spoolSizeOption = OptionBuilder.withArgName("number of events")
.hasArg()
.withDescription("event count spool threshold - forces network flush")
.create("spool-size");
.create("spoolsize");
Option idleTimeoutOption = OptionBuilder.withArgName("")
.hasArg()
.withDescription("time between file reads in seconds")
.create("idle-timeout");
.create("idletimeout");
Option configOption = OptionBuilder.withArgName("config file")
.hasArg()
.isRequired()
.withDescription("path to logstash-forwarder configuration file")
.create("config");
options.addOption(helpOption).addOption(idleTimeoutOption).addOption(spoolSizeOption).addOption(configOption);
CommandLineParser parser = new BasicParser();
options.addOption(helpOption)
.addOption(idleTimeoutOption)
.addOption(spoolSizeOption)
.addOption(quiet)
.addOption(debug)
.addOption(trace)
.addOption(configOption);
CommandLineParser parser = new GnuParser();
try {
CommandLine line = parser.parse(options, args);
if(line.hasOption("spool-size")) {
spoolSize = Integer.parseInt(line.getOptionValue("spool-size"));
if(line.hasOption("spoolsize")) {
spoolSize = Integer.parseInt(line.getOptionValue("spoolsize"));
}
if(line.hasOption("idle-timeout")) {
idleTimeout = Integer.parseInt(line.getOptionValue("idle-timeout"));
if(line.hasOption("idletimeout")) {
idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout"));
}
if(line.hasOption("config")) {
config = line.getOptionValue("config");
}
if(line.hasOption("quiet")) {
logLevel = ERROR;
}
if(line.hasOption("debug")) {
logLevel = DEBUG;
}
if(line.hasOption("trace")) {
logLevel = TRACE;
}
} catch(ParseException e) {
printHelp(options);
System.exit(1);;

View File

@@ -129,7 +129,7 @@ public class LumberjackClient implements ProtocolAdapter {
ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream();
DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes);
for(Map<String,byte[]> keyValues : keyValuesList) {
logger.debug("Adding data frame");
logger.trace("Adding data frame");
sendDataFrame(uncompressedOutput, keyValues);
}
uncompressedOutput.close();