diff --git a/pom.xml b/pom.xml
index dae5bae..81054a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
logstash-forwarder-java
logstash-forwarder-java
- 0.1.0
+ 0.1.1-SNAPSHOT
logstash-forwarder-java
Java version of logstash forwarder
https://github.com/didfet/logstash-forwarder-java
diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java
index dc9ad40..8623237 100644
--- a/src/main/java/info/fetter/logstashforwarder/FileReader.java
+++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java
@@ -1,5 +1,7 @@
package info.fetter.logstashforwarder;
+import info.fetter.logstashforwarder.util.AdapterException;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -50,7 +52,7 @@ public class FileReader {
eventList = new ArrayList(spoolSize);
}
- public int readFiles(Collection fileList) throws IOException {
+ public int readFiles(Collection fileList) throws IOException, AdapterException {
int eventCount = 0;
if(logger.isTraceEnabled()) {
logger.trace("Reading " + fileList.size() + " file(s)");
diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java
index 5ec0f04..48c4917 100644
--- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java
+++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java
@@ -17,6 +17,7 @@ package info.fetter.logstashforwarder;
*
*/
+import info.fetter.logstashforwarder.util.AdapterException;
import info.fetter.logstashforwarder.util.LastModifiedFileFilter;
import java.io.File;
@@ -75,7 +76,7 @@ public class FileWatcher {
printWatchMap();
}
- public int readFiles(FileReader reader) throws IOException {
+ public int readFiles(FileReader reader) throws IOException, AdapterException {
logger.debug("Reading files");
logger.trace("==============");
int numberOfLinesRead = reader.readFiles(watchMap.values());
diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java
index f69eac9..5548488 100644
--- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java
+++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java
@@ -7,6 +7,7 @@ import java.io.IOException;
import info.fetter.logstashforwarder.config.ConfigurationManager;
import info.fetter.logstashforwarder.config.FilesSection;
import info.fetter.logstashforwarder.protocol.LumberjackClient;
+import info.fetter.logstashforwarder.util.AdapterException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -18,6 +19,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.log4j.spi.RootLogger;
/*
@@ -38,6 +40,7 @@ import org.apache.log4j.spi.RootLogger;
*/
public class Forwarder {
+ private static Logger logger = Logger.getLogger(Forwarder.class);
private static int spoolSize = 1024;
private static int idleTimeout = 5000;
private static String config;
@@ -52,9 +55,9 @@ public class Forwarder {
parseOptions(args);
BasicConfigurator.configure();
RootLogger.getRootLogger().setLevel(logLevel);
-// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
-// Logger.getLogger(FileReader.class).setLevel(TRACE);
-// Logger.getLogger(FileReader.class).setAdditivity(false);
+ // Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
+ // Logger.getLogger(FileReader.class).setLevel(TRACE);
+ // Logger.getLogger(FileReader.class).setAdditivity(false);
watcher = new FileWatcher();
configManager = new ConfigurationManager(config);
configManager.readConfiguration();
@@ -64,9 +67,7 @@ public class Forwarder {
}
}
reader = new FileReader(spoolSize);
- String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":");
- adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]));
- reader.setAdapter(adapter);
+ connectToServer();
infiniteLoop();
} catch(Exception e) {
e.printStackTrace();
@@ -76,11 +77,27 @@ public class Forwarder {
private static void infiniteLoop() throws IOException, InterruptedException {
while(true) {
- watcher.checkFiles();
- while(watcher.readFiles(reader) == spoolSize);
- Thread.sleep(idleTimeout);
+ try {
+ watcher.checkFiles();
+ while(watcher.readFiles(reader) == spoolSize);
+ Thread.sleep(idleTimeout);
+ } catch(AdapterException e) {
+ try {
+ logger.error("Lost server connection");
+ Thread.sleep(configManager.getConfig().getNetwork().getTimeout() * 1000);
+ connectToServer();
+ } catch(Exception ex) {
+ logger.error("Failed to reconnect to server : " + ex.getMessage());
+ }
+ }
}
}
+
+ private static void connectToServer() throws NumberFormatException, IOException {
+ String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":");
+ adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]));
+ reader.setAdapter(adapter);
+ }
@SuppressWarnings("static-access")
static void parseOptions(String[] args) {
diff --git a/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java
index 52d0f2a..5eeb288 100644
--- a/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java
+++ b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java
@@ -17,10 +17,11 @@ package info.fetter.logstashforwarder;
*
*/
-import java.io.IOException;
+import info.fetter.logstashforwarder.util.AdapterException;
+
import java.util.List;
public interface ProtocolAdapter {
- public int sendEvents(List eventList) throws IOException;
- public void close() throws IOException;
+ public int sendEvents(List eventList) throws AdapterException;
+ public void close() throws AdapterException;
}
diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java
index 6acc01a..48ff5b0 100644
--- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java
+++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java
@@ -19,6 +19,7 @@ package info.fetter.logstashforwarder.protocol;
import info.fetter.logstashforwarder.Event;
import info.fetter.logstashforwarder.ProtocolAdapter;
+import info.fetter.logstashforwarder.util.AdapterException;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
@@ -78,7 +79,7 @@ public class LumberjackClient implements ProtocolAdapter {
output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
input = new DataInputStream(socket.getInputStream());
-
+
logger.info("Connected to " + server + ":" + port);
} catch(IOException e) {
throw e;
@@ -154,11 +155,11 @@ public class LumberjackClient implements ProtocolAdapter {
if(logger.isTraceEnabled()) {
HexDump.dump(compressedData, 0, System.out, 0);
}
-
+
output.writeInt(compressor.getTotalOut());
output.write(compressedData);
output.flush();
-
+
logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames");
return 6 + compressor.getTotalOut();
}
@@ -177,22 +178,30 @@ public class LumberjackClient implements ProtocolAdapter {
return sequenceNumber;
}
- public int sendEvents(List eventList) throws IOException {
- int beginSequence = sequence;
- int numberOfEvents = eventList.size();
- logger.info("Sending " + numberOfEvents + " events");
- sendWindowSizeFrame(numberOfEvents);
- List