From 20366c982e9fe78b9b77d7a0aa5f5dea8b93a2fe Mon Sep 17 00:00:00 2001 From: didfet Date: Thu, 19 Mar 2015 13:19:50 +0100 Subject: [PATCH] Create new ProtocolAdapter if connection is lost. --- pom.xml | 2 +- .../fetter/logstashforwarder/FileReader.java | 4 +- .../fetter/logstashforwarder/FileWatcher.java | 3 +- .../fetter/logstashforwarder/Forwarder.java | 35 ++++++++++++---- .../logstashforwarder/ProtocolAdapter.java | 7 ++-- .../protocol/LumberjackClient.java | 41 +++++++++++-------- .../logstashforwarder/FileReaderTest.java | 3 +- .../MockProtocolAdapter.java | 5 +-- 8 files changed, 65 insertions(+), 35 deletions(-) diff --git a/pom.xml b/pom.xml index dae5bae..81054a3 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 logstash-forwarder-java logstash-forwarder-java - 0.1.0 + 0.1.1-SNAPSHOT logstash-forwarder-java Java version of logstash forwarder https://github.com/didfet/logstash-forwarder-java diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index dc9ad40..8623237 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -1,5 +1,7 @@ package info.fetter.logstashforwarder; +import info.fetter.logstashforwarder.util.AdapterException; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -50,7 +52,7 @@ public class FileReader { eventList = new ArrayList(spoolSize); } - public int readFiles(Collection fileList) throws IOException { + public int readFiles(Collection fileList) throws IOException, AdapterException { int eventCount = 0; if(logger.isTraceEnabled()) { logger.trace("Reading " + fileList.size() + " file(s)"); diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index 5ec0f04..48c4917 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -17,6 +17,7 @@ package info.fetter.logstashforwarder; * */ +import info.fetter.logstashforwarder.util.AdapterException; import info.fetter.logstashforwarder.util.LastModifiedFileFilter; import java.io.File; @@ -75,7 +76,7 @@ public class FileWatcher { printWatchMap(); } - public int readFiles(FileReader reader) throws IOException { + public int readFiles(FileReader reader) throws IOException, AdapterException { logger.debug("Reading files"); logger.trace("=============="); int numberOfLinesRead = reader.readFiles(watchMap.values()); diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index f69eac9..5548488 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -7,6 +7,7 @@ import java.io.IOException; import info.fetter.logstashforwarder.config.ConfigurationManager; import info.fetter.logstashforwarder.config.FilesSection; import info.fetter.logstashforwarder.protocol.LumberjackClient; +import info.fetter.logstashforwarder.util.AdapterException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -18,6 +19,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.log4j.spi.RootLogger; /* @@ -38,6 +40,7 @@ import org.apache.log4j.spi.RootLogger; */ public class Forwarder { + private static Logger logger = Logger.getLogger(Forwarder.class); private static int spoolSize = 1024; private static int idleTimeout = 5000; private static String config; @@ -52,9 +55,9 @@ public class Forwarder { parseOptions(args); BasicConfigurator.configure(); RootLogger.getRootLogger().setLevel(logLevel); -// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement()); -// Logger.getLogger(FileReader.class).setLevel(TRACE); -// Logger.getLogger(FileReader.class).setAdditivity(false); + // Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement()); + // Logger.getLogger(FileReader.class).setLevel(TRACE); + // Logger.getLogger(FileReader.class).setAdditivity(false); watcher = new FileWatcher(); configManager = new ConfigurationManager(config); configManager.readConfiguration(); @@ -64,9 +67,7 @@ public class Forwarder { } } reader = new FileReader(spoolSize); - String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":"); - adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1])); - reader.setAdapter(adapter); + connectToServer(); infiniteLoop(); } catch(Exception e) { e.printStackTrace(); @@ -76,11 +77,27 @@ public class Forwarder { private static void infiniteLoop() throws IOException, InterruptedException { while(true) { - watcher.checkFiles(); - while(watcher.readFiles(reader) == spoolSize); - Thread.sleep(idleTimeout); + try { + watcher.checkFiles(); + while(watcher.readFiles(reader) == spoolSize); + Thread.sleep(idleTimeout); + } catch(AdapterException e) { + try { + logger.error("Lost server connection"); + Thread.sleep(configManager.getConfig().getNetwork().getTimeout() * 1000); + connectToServer(); + } catch(Exception ex) { + logger.error("Failed to reconnect to server : " + ex.getMessage()); + } + } } } + + private static void connectToServer() throws NumberFormatException, IOException { + String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":"); + adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1])); + reader.setAdapter(adapter); + } @SuppressWarnings("static-access") static void parseOptions(String[] args) { diff --git a/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java index 52d0f2a..5eeb288 100644 --- a/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java +++ b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java @@ -17,10 +17,11 @@ package info.fetter.logstashforwarder; * */ -import java.io.IOException; +import info.fetter.logstashforwarder.util.AdapterException; + import java.util.List; public interface ProtocolAdapter { - public int sendEvents(List eventList) throws IOException; - public void close() throws IOException; + public int sendEvents(List eventList) throws AdapterException; + public void close() throws AdapterException; } diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index 6acc01a..48ff5b0 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -19,6 +19,7 @@ package info.fetter.logstashforwarder.protocol; import info.fetter.logstashforwarder.Event; import info.fetter.logstashforwarder.ProtocolAdapter; +import info.fetter.logstashforwarder.util.AdapterException; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; @@ -78,7 +79,7 @@ public class LumberjackClient implements ProtocolAdapter { output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); input = new DataInputStream(socket.getInputStream()); - + logger.info("Connected to " + server + ":" + port); } catch(IOException e) { throw e; @@ -154,11 +155,11 @@ public class LumberjackClient implements ProtocolAdapter { if(logger.isTraceEnabled()) { HexDump.dump(compressedData, 0, System.out, 0); } - + output.writeInt(compressor.getTotalOut()); output.write(compressedData); output.flush(); - + logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames"); return 6 + compressor.getTotalOut(); } @@ -177,22 +178,30 @@ public class LumberjackClient implements ProtocolAdapter { return sequenceNumber; } - public int sendEvents(List eventList) throws IOException { - int beginSequence = sequence; - int numberOfEvents = eventList.size(); - logger.info("Sending " + numberOfEvents + " events"); - sendWindowSizeFrame(numberOfEvents); - List> keyValuesList = new ArrayList>(numberOfEvents); - for(Event event : eventList) { - keyValuesList.add(event.getKeyValues()); + public int sendEvents(List eventList) throws AdapterException { + try { + int beginSequence = sequence; + int numberOfEvents = eventList.size(); + logger.info("Sending " + numberOfEvents + " events"); + sendWindowSizeFrame(numberOfEvents); + List> keyValuesList = new ArrayList>(numberOfEvents); + for(Event event : eventList) { + keyValuesList.add(event.getKeyValues()); + } + sendCompressedFrame(keyValuesList); + while(readAckFrame() < (sequence - 1) ) {} + return sequence - beginSequence; + } catch(Exception e) { + throw new AdapterException(e); } - sendCompressedFrame(keyValuesList); - while(readAckFrame() < (sequence - 1) ) {} - return sequence - beginSequence; } - public void close() throws IOException { - socket.close(); + public void close() throws AdapterException { + try { + socket.close(); + } catch(Exception e) { + throw new AdapterException(e); + } logger.info("Connection to " + server + ":" + port + " closed"); } } diff --git a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java index 0e516f2..2dbf8df 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java @@ -18,6 +18,7 @@ package info.fetter.logstashforwarder; */ import static org.apache.log4j.Level.*; +import info.fetter.logstashforwarder.util.AdapterException; import java.io.File; import java.io.IOException; @@ -47,7 +48,7 @@ public class FileReaderTest { } @Test - public void testFileReader1() throws IOException, InterruptedException { + public void testFileReader1() throws IOException, InterruptedException, AdapterException { FileReader reader = new FileReader(2); reader.setAdapter(new MockProtocolAdapter()); List fileList = new ArrayList(1); diff --git a/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java b/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java index 7611576..8373fb9 100644 --- a/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java +++ b/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java @@ -17,7 +17,6 @@ package info.fetter.logstashforwarder; * */ -import java.io.IOException; import java.util.List; import org.apache.log4j.Logger; @@ -25,7 +24,7 @@ import org.apache.log4j.Logger; public class MockProtocolAdapter implements ProtocolAdapter { private static Logger logger = Logger.getLogger(MockProtocolAdapter.class); - public int sendEvents(List eventList) throws IOException { + public int sendEvents(List eventList) { for(Event event : eventList) { logger.trace("Event :"); for(String key : event.getKeyValues().keySet()) { @@ -35,7 +34,7 @@ public class MockProtocolAdapter implements ProtocolAdapter { return eventList.size(); } - public void close() throws IOException { + public void close() { // not implemented }