Implemented Forwarder.

This commit is contained in:
didfet
2015-03-17 00:27:04 +01:00
parent 3caea16be7
commit 616f883f49
6 changed files with 145 additions and 19 deletions

View File

@@ -65,6 +65,11 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
</project>

View File

@@ -32,6 +32,12 @@ public class Event {
}
}
public Event(Map<String,String> fields) {
for(String key : fields.keySet()) {
addField(key, fields.get(key));
}
}
public Event addField(String key, byte[] value) {
keyValues.put(key, value);
return this;

View File

@@ -6,6 +6,7 @@ import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,7 +50,7 @@ public class FileReader {
eventList = new ArrayList<Event>(spoolSize);
}
public int readFiles(List<FileState> fileList) throws IOException {
public int readFiles(Collection<FileState> fileList) throws IOException {
int eventCount = 0;
if(logger.isTraceEnabled()) {
logger.trace("Reading " + fileList.size() + " file(s)");

View File

@@ -37,28 +37,19 @@ import org.apache.log4j.Logger;
public class FileWatcher {
private static final Logger logger = Logger.getLogger(FileWatcher.class);
private List<FileAlterationObserver> observerList = new ArrayList<FileAlterationObserver>();
private static final int ONE_DAY = 24 * 3600 * 1000;
private long deadTime;
public static final int ONE_DAY = 24 * 3600 * 1000;
private Map<File,FileState> watchMap = new HashMap<File,FileState>();
private Map<File,FileState> changedWatchMap = new HashMap<File,FileState>();
private static int MAX_SIGNATURE_LENGTH = 1024;
public FileWatcher(long deadTime) {
this.deadTime = deadTime;
}
public FileWatcher() {
this(ONE_DAY);
}
public void addFilesToWatch(String fileToWatch, Event fields) {
public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) {
try {
if(fileToWatch.equals("-")) {
addStdIn(fields);
} else if(fileToWatch.contains("*")) {
addWildCardFiles(fileToWatch, fields);
addWildCardFiles(fileToWatch, fields, deadTime);
} else {
addSingleFile(fileToWatch, fields);
addSingleFile(fileToWatch, fields, deadTime);
}
} catch(Exception e) {
throw new RuntimeException(e);
@@ -75,9 +66,10 @@ public class FileWatcher {
printWatchMap();
}
public void readFiles() {
public void readFiles(FileReader reader) throws IOException {
logger.debug("Reading files");
logger.trace("==============");
reader.readFiles(watchMap.values());
}
private void processModifications() throws IOException {
@@ -174,7 +166,7 @@ public class FileWatcher {
removeMarkedFilesFromWatchMap();
}
private void addSingleFile(String fileToWatch, Event fields) throws Exception {
private void addSingleFile(String fileToWatch, Event fields, int deadTime) throws Exception {
logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
String directory = FilenameUtils.getFullPath(fileToWatch);
String fileName = FilenameUtils.getName(fileToWatch);
@@ -185,7 +177,7 @@ public class FileWatcher {
initializeWatchMap(new File(directory), fileFilter, fields);
}
private void addWildCardFiles(String filesToWatch, Event fields) throws Exception {
private void addWildCardFiles(String filesToWatch, Event fields, int deadTime) throws Exception {
logger.info("Watching wildcard files : " + filesToWatch);
String directory = FilenameUtils.getFullPath(filesToWatch);
String wildcard = FilenameUtils.getName(filesToWatch);

View File

@@ -0,0 +1,105 @@
package info.fetter.logstashforwarder;
import info.fetter.logstashforwarder.config.ConfigurationManager;
import info.fetter.logstashforwarder.config.FilesSection;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
/*
* Copyright 2015 Didier Fetter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
public class Forwarder {
private static int spoolSize = 1024;
private static int idleTimeout = 5000;
private static String config;
private static ConfigurationManager configManager;
private static FileWatcher watcher = new FileWatcher();
private static FileReader reader;
static void main(String[] args) {
try {
parseOptions(args);
configManager = new ConfigurationManager(config);
configManager.readConfiguration();
for(FilesSection files : configManager.getConfig().getFiles()) {
for(String path : files.getPaths()) {
watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY);
}
}
reader = new FileReader(spoolSize);
} catch(Exception e) {
System.err.println(e.getMessage());
System.exit(3);
}
}
@SuppressWarnings("static-access")
static void parseOptions(String[] args) {
Options options = new Options();
Option helpOption = new Option("help", "print this message");
Option spoolSizeOption = OptionBuilder.withArgName("number of events")
.hasArg()
.withDescription("event count spool threshold - forces network flush")
.create("spool-size");
Option idleTimeoutOption = OptionBuilder.withArgName("")
.hasArg()
.withDescription("time between file reads in seconds")
.create("idle-timeout");
Option configOption = OptionBuilder.withArgName("config file")
.hasArg()
.isRequired()
.withDescription("path to logstash-forwarder configuration file")
.create("config");
options.addOption(helpOption).addOption(idleTimeoutOption).addOption(spoolSizeOption).addOption(configOption);
CommandLineParser parser = new BasicParser();
try {
CommandLine line = parser.parse(options, args);
if(line.hasOption("spool-size")) {
spoolSize = Integer.parseInt(line.getOptionValue("spool-size"));
}
if(line.hasOption("idle-timeout")) {
idleTimeout = Integer.parseInt(line.getOptionValue("idle-timeout"));
}
if(line.hasOption("config")) {
config = line.getOptionValue("config");
}
} catch(ParseException e) {
printHelp(options);
System.exit(1);;
} catch(NumberFormatException e) {
System.err.println("Value must be an integer");
printHelp(options);
System.exit(2);;
}
}
private static void printHelp(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("logstash-forwarder", options);
}
}

View File

@@ -1,5 +1,22 @@
package info.fetter.logstashforwarder;
/*
* Copyright 2015 Didier Fetter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import static org.apache.log4j.Level.*;
import java.io.File;
@@ -30,7 +47,7 @@ public class FileWatcherTest {
//@Test
public void testFileWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"));
watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
for(int i = 0; i < 100; i++) {
Thread.sleep(1000);
watcher.checkFiles();
@@ -40,7 +57,7 @@ public class FileWatcherTest {
//@Test
public void testWildcardWatch() throws InterruptedException, IOException {
FileWatcher watcher = new FileWatcher();
watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"));
watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
File file1 = new File("test1.txt");
File file2 = new File("test2.txt");