From de012bb84acde2d0fb2513a066dee8885b4dadea Mon Sep 17 00:00:00 2001 From: didfet Date: Tue, 10 Mar 2015 16:20:56 +0100 Subject: [PATCH] Added debug information. --- .../protocol/LumberjackClient.java | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index a271341..810e140 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -37,7 +37,11 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; import javax.net.ssl.TrustManagerFactory; +import org.apache.commons.io.HexDump; +import org.apache.log4j.Logger; + public class LumberjackClient { + private final static Logger logger = Logger.getLogger(LumberjackClient.class); private final static byte PROTOCOL_VERSION = 0x31; private final static byte FRAME_ACK = 0x41; private final static byte FRAME_WINDOW_SIZE = 0x57; @@ -73,6 +77,8 @@ public class LumberjackClient { output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); input = new DataInputStream(socket.getInputStream()); + + logger.info("Connected to " + server + ":" + port); } catch(IOException e) { throw e; } catch(Exception e) { @@ -85,6 +91,7 @@ public class LumberjackClient { output.writeByte(FRAME_WINDOW_SIZE); output.writeInt(size); output.flush(); + logger.debug("Sending window size frame : " + size + " frames"); return 6; } @@ -121,11 +128,17 @@ public class LumberjackClient { ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream(); DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes); for(Map keyValues : keyValuesList) { + logger.debug("Adding data frame"); sendDataFrame(uncompressedOutput, keyValues); } uncompressedOutput.close(); Deflater compressor = new Deflater(); - compressor.setInput(uncompressedBytes.toByteArray()); + byte[] uncompressedData = uncompressedBytes.toByteArray(); + logger.debug("Deflating data : " + uncompressedData.length + " bytes"); + if(logger.isTraceEnabled()) { + HexDump.dump(uncompressedData, 0, System.out, 0); + } + compressor.setInput(uncompressedData); compressor.finish(); ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream(); @@ -135,10 +148,17 @@ public class LumberjackClient { compressedBytes.write(buffer, 0, count); } compressedBytes.close(); - + byte[] compressedData = compressedBytes.toByteArray(); + logger.debug("Deflated data : " + compressor.getTotalOut() + " bytes"); + if(logger.isTraceEnabled()) { + HexDump.dump(compressedData, 0, System.out, 0); + } + output.writeInt(compressor.getTotalOut()); - output.write(compressedBytes.toByteArray()); - + output.write(compressedData); + output.flush(); + + logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames"); return 6 + compressor.getTotalOut(); } @@ -152,23 +172,26 @@ public class LumberjackClient { throw new ProtocolException("Frame type should be Ack, received " + frameType); } int sequenceNumber = input.readInt(); + logger.debug("Received ack sequence : " + sequenceNumber); return sequenceNumber; } public int sendEvents(List eventList) throws IOException { int beginSequence = sequence; int numberOfEvents = eventList.size(); + logger.info("Sending " + numberOfEvents + " events"); sendWindowSizeFrame(numberOfEvents); List> keyValuesList = new ArrayList>(numberOfEvents); for(Event event : eventList) { keyValuesList.add(event.getKeyValues()); } sendCompressedFrame(keyValuesList); - while(readAckFrame() < sequence) {} + while(readAckFrame() < (sequence - 1) ) {} return sequence - beginSequence; } public void close() throws IOException { socket.close(); + logger.info("Connection to " + server + ":" + port + " closed"); } }