From b54e3d9f25f2a01602f637e818e8db9d5bfd8ffc Mon Sep 17 00:00:00 2001 From: didfet Date: Mon, 9 Mar 2015 22:50:54 +0100 Subject: [PATCH] Added sendEvents method. --- .../info/fetter/logstashforwarder/Event.java | 20 ++++ .../protocol/LumberjackClient.java | 91 +++++++++++-------- 2 files changed, 74 insertions(+), 37 deletions(-) create mode 100644 src/main/java/info/fetter/logstashforwarder/Event.java diff --git a/src/main/java/info/fetter/logstashforwarder/Event.java b/src/main/java/info/fetter/logstashforwarder/Event.java new file mode 100644 index 0000000..af26084 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/Event.java @@ -0,0 +1,20 @@ +package info.fetter.logstashforwarder; + +import java.util.HashMap; +import java.util.Map; + +public class Event { + private Map keyValues = new HashMap(10); + + public void addField(String key, byte[] value) { + keyValues.put(key, value); + } + + public void addField(String key, String value) { + keyValues.put(key, value.getBytes()); + } + + public Map getKeyValues() { + return keyValues; + } +} diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index 024d4fa..4d257a2 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -1,17 +1,16 @@ package info.fetter.logstashforwarder.protocol; +import info.fetter.logstashforwarder.Event; + import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.security.KeyManagementException; +import java.net.ProtocolException; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.zip.Deflater; @@ -27,7 +26,7 @@ public class LumberjackClient { private final static byte FRAME_WINDOW_SIZE = 0x57; private final static byte FRAME_DATA = 0x44; private final static byte FRAME_COMPRESSED = 0x43; - + private SSLSocket socket; private KeyStore keyStore; private String server; @@ -35,29 +34,35 @@ public class LumberjackClient { private DataOutputStream output; private DataInputStream input; private int sequence = 1; - - public LumberjackClient(String keyStorePath, String server, int port) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, KeyManagementException { + + public LumberjackClient(String keyStorePath, String server, int port) throws IOException { this.server = server; this.port = port; - - keyStore = KeyStore.getInstance("JKS"); - keyStore.load(new FileInputStream(keyStorePath), null); - - TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); - tmf.init(keyStore); - SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, tmf.getTrustManagers(), null); - - SocketFactory socketFactory = context.getSocketFactory(); - socket = (SSLSocket)socketFactory.createSocket(this.server, this.port); - socket.setUseClientMode(true); - socket.startHandshake(); - - output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); - input = new DataInputStream(socket.getInputStream()); + try { + keyStore = KeyStore.getInstance("JKS"); + keyStore.load(new FileInputStream(keyStorePath), null); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); + tmf.init(keyStore); + + SSLContext context = SSLContext.getInstance("TLS"); + context.init(null, tmf.getTrustManagers(), null); + + SocketFactory socketFactory = context.getSocketFactory(); + socket = (SSLSocket)socketFactory.createSocket(this.server, this.port); + socket.setUseClientMode(true); + socket.startHandshake(); + + output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); + input = new DataInputStream(socket.getInputStream()); + } catch(IOException e) { + throw e; + } catch(Exception e) { + throw new RuntimeException(e); + } } - + public int sendWindowSizeFrame(int size) throws IOException { output.writeByte(PROTOCOL_VERSION); output.writeByte(FRAME_WINDOW_SIZE); @@ -65,7 +70,7 @@ public class LumberjackClient { output.flush(); return 6; } - + private int sendDataFrame(DataOutputStream output, Map keyValues) throws IOException { output.writeByte(PROTOCOL_VERSION); output.writeByte(FRAME_DATA); @@ -87,15 +92,15 @@ public class LumberjackClient { output.flush(); return bytesSent; } - + public int sendDataFrameInSocket(Map keyValues) throws IOException { return sendDataFrame(output, keyValues); } - + public int sendCompressedFrame(List> keyValuesList) throws IOException { output.writeByte(PROTOCOL_VERSION); output.writeByte(FRAME_COMPRESSED); - + ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream(); DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes); for(Map keyValues : keyValuesList) { @@ -105,7 +110,7 @@ public class LumberjackClient { Deflater compressor = new Deflater(); compressor.setInput(uncompressedBytes.toByteArray()); compressor.finish(); - + ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; while(!compressor.finished()) { @@ -113,26 +118,38 @@ public class LumberjackClient { compressedBytes.write(buffer, 0, count); } compressedBytes.close(); - + output.writeInt(compressor.getTotalOut()); output.write(compressedBytes.toByteArray()); - + return 6 + compressor.getTotalOut(); } - - public int readAckFrame() throws IOException { + + public int readAckFrame() throws ProtocolException, IOException { byte protocolVersion = input.readByte(); if(protocolVersion != PROTOCOL_VERSION) { - throw new IOException("Protocol version should be 1, received " + protocolVersion); + throw new ProtocolException("Protocol version should be 1, received " + protocolVersion); } byte frameType = input.readByte(); if(frameType != FRAME_ACK) { - throw new IOException("Frame type should be Ack, received " + frameType); + throw new ProtocolException("Frame type should be Ack, received " + frameType); } int sequenceNumber = input.readInt(); return sequenceNumber; } - + + public int sendEvents(List eventList) throws IOException { + int beginSequence = sequence; + int numberOfEvents = eventList.size(); + List> keyValuesList = new ArrayList>(numberOfEvents); + for(Event event : eventList) { + keyValuesList.add(event.getKeyValues()); + } + sendCompressedFrame(keyValuesList); + while(readAckFrame() < sequence) {} + return sequence - beginSequence; + } + public void close() throws IOException { socket.close(); }