Create new ProtocolAdapter if connection is lost.

This commit is contained in:
didfet
2015-03-19 13:19:50 +01:00
parent c199d00d2d
commit 20366c982e
8 changed files with 65 additions and 35 deletions

View File

@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>logstash-forwarder-java</groupId>
<artifactId>logstash-forwarder-java</artifactId>
<version>0.1.0</version>
<version>0.1.1-SNAPSHOT</version>
<name>logstash-forwarder-java</name>
<description>Java version of logstash forwarder</description>
<url>https://github.com/didfet/logstash-forwarder-java</url>

View File

@@ -1,5 +1,7 @@
package info.fetter.logstashforwarder;
import info.fetter.logstashforwarder.util.AdapterException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -50,7 +52,7 @@ public class FileReader {
eventList = new ArrayList<Event>(spoolSize);
}
public int readFiles(Collection<FileState> fileList) throws IOException {
public int readFiles(Collection<FileState> fileList) throws IOException, AdapterException {
int eventCount = 0;
if(logger.isTraceEnabled()) {
logger.trace("Reading " + fileList.size() + " file(s)");

View File

@@ -17,6 +17,7 @@ package info.fetter.logstashforwarder;
*
*/
import info.fetter.logstashforwarder.util.AdapterException;
import info.fetter.logstashforwarder.util.LastModifiedFileFilter;
import java.io.File;
@@ -75,7 +76,7 @@ public class FileWatcher {
printWatchMap();
}
public int readFiles(FileReader reader) throws IOException {
public int readFiles(FileReader reader) throws IOException, AdapterException {
logger.debug("Reading files");
logger.trace("==============");
int numberOfLinesRead = reader.readFiles(watchMap.values());

View File

@@ -7,6 +7,7 @@ import java.io.IOException;
import info.fetter.logstashforwarder.config.ConfigurationManager;
import info.fetter.logstashforwarder.config.FilesSection;
import info.fetter.logstashforwarder.protocol.LumberjackClient;
import info.fetter.logstashforwarder.util.AdapterException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -18,6 +19,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.RootLogger;
/*
@@ -38,6 +40,7 @@ import org.apache.log4j.spi.RootLogger;
*/
public class Forwarder {
private static Logger logger = Logger.getLogger(Forwarder.class);
private static int spoolSize = 1024;
private static int idleTimeout = 5000;
private static String config;
@@ -52,9 +55,9 @@ public class Forwarder {
parseOptions(args);
BasicConfigurator.configure();
RootLogger.getRootLogger().setLevel(logLevel);
// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
// Logger.getLogger(FileReader.class).setLevel(TRACE);
// Logger.getLogger(FileReader.class).setAdditivity(false);
// Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
// Logger.getLogger(FileReader.class).setLevel(TRACE);
// Logger.getLogger(FileReader.class).setAdditivity(false);
watcher = new FileWatcher();
configManager = new ConfigurationManager(config);
configManager.readConfiguration();
@@ -64,9 +67,7 @@ public class Forwarder {
}
}
reader = new FileReader(spoolSize);
String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":");
adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]));
reader.setAdapter(adapter);
connectToServer();
infiniteLoop();
} catch(Exception e) {
e.printStackTrace();
@@ -76,11 +77,27 @@ public class Forwarder {
private static void infiniteLoop() throws IOException, InterruptedException {
while(true) {
watcher.checkFiles();
while(watcher.readFiles(reader) == spoolSize);
Thread.sleep(idleTimeout);
try {
watcher.checkFiles();
while(watcher.readFiles(reader) == spoolSize);
Thread.sleep(idleTimeout);
} catch(AdapterException e) {
try {
logger.error("Lost server connection");
Thread.sleep(configManager.getConfig().getNetwork().getTimeout() * 1000);
connectToServer();
} catch(Exception ex) {
logger.error("Failed to reconnect to server : " + ex.getMessage());
}
}
}
}
private static void connectToServer() throws NumberFormatException, IOException {
String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":");
adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]));
reader.setAdapter(adapter);
}
@SuppressWarnings("static-access")
static void parseOptions(String[] args) {

View File

@@ -17,10 +17,11 @@ package info.fetter.logstashforwarder;
*
*/
import java.io.IOException;
import info.fetter.logstashforwarder.util.AdapterException;
import java.util.List;
public interface ProtocolAdapter {
public int sendEvents(List<Event> eventList) throws IOException;
public void close() throws IOException;
public int sendEvents(List<Event> eventList) throws AdapterException;
public void close() throws AdapterException;
}

View File

@@ -19,6 +19,7 @@ package info.fetter.logstashforwarder.protocol;
import info.fetter.logstashforwarder.Event;
import info.fetter.logstashforwarder.ProtocolAdapter;
import info.fetter.logstashforwarder.util.AdapterException;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
@@ -78,7 +79,7 @@ public class LumberjackClient implements ProtocolAdapter {
output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
input = new DataInputStream(socket.getInputStream());
logger.info("Connected to " + server + ":" + port);
} catch(IOException e) {
throw e;
@@ -154,11 +155,11 @@ public class LumberjackClient implements ProtocolAdapter {
if(logger.isTraceEnabled()) {
HexDump.dump(compressedData, 0, System.out, 0);
}
output.writeInt(compressor.getTotalOut());
output.write(compressedData);
output.flush();
logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames");
return 6 + compressor.getTotalOut();
}
@@ -177,22 +178,30 @@ public class LumberjackClient implements ProtocolAdapter {
return sequenceNumber;
}
public int sendEvents(List<Event> eventList) throws IOException {
int beginSequence = sequence;
int numberOfEvents = eventList.size();
logger.info("Sending " + numberOfEvents + " events");
sendWindowSizeFrame(numberOfEvents);
List<Map<String,byte[]>> keyValuesList = new ArrayList<Map<String,byte[]>>(numberOfEvents);
for(Event event : eventList) {
keyValuesList.add(event.getKeyValues());
public int sendEvents(List<Event> eventList) throws AdapterException {
try {
int beginSequence = sequence;
int numberOfEvents = eventList.size();
logger.info("Sending " + numberOfEvents + " events");
sendWindowSizeFrame(numberOfEvents);
List<Map<String,byte[]>> keyValuesList = new ArrayList<Map<String,byte[]>>(numberOfEvents);
for(Event event : eventList) {
keyValuesList.add(event.getKeyValues());
}
sendCompressedFrame(keyValuesList);
while(readAckFrame() < (sequence - 1) ) {}
return sequence - beginSequence;
} catch(Exception e) {
throw new AdapterException(e);
}
sendCompressedFrame(keyValuesList);
while(readAckFrame() < (sequence - 1) ) {}
return sequence - beginSequence;
}
public void close() throws IOException {
socket.close();
public void close() throws AdapterException {
try {
socket.close();
} catch(Exception e) {
throw new AdapterException(e);
}
logger.info("Connection to " + server + ":" + port + " closed");
}
}

View File

@@ -18,6 +18,7 @@ package info.fetter.logstashforwarder;
*/
import static org.apache.log4j.Level.*;
import info.fetter.logstashforwarder.util.AdapterException;
import java.io.File;
import java.io.IOException;
@@ -47,7 +48,7 @@ public class FileReaderTest {
}
@Test
public void testFileReader1() throws IOException, InterruptedException {
public void testFileReader1() throws IOException, InterruptedException, AdapterException {
FileReader reader = new FileReader(2);
reader.setAdapter(new MockProtocolAdapter());
List<FileState> fileList = new ArrayList<FileState>(1);

View File

@@ -17,7 +17,6 @@ package info.fetter.logstashforwarder;
*
*/
import java.io.IOException;
import java.util.List;
import org.apache.log4j.Logger;
@@ -25,7 +24,7 @@ import org.apache.log4j.Logger;
public class MockProtocolAdapter implements ProtocolAdapter {
private static Logger logger = Logger.getLogger(MockProtocolAdapter.class);
public int sendEvents(List<Event> eventList) throws IOException {
public int sendEvents(List<Event> eventList) {
for(Event event : eventList) {
logger.trace("Event :");
for(String key : event.getKeyValues().keySet()) {
@@ -35,7 +34,7 @@ public class MockProtocolAdapter implements ProtocolAdapter {
return eventList.size();
}
public void close() throws IOException {
public void close() {
// not implemented
}