diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index 59384f3..024d4fa 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -1,6 +1,7 @@ package info.fetter.logstashforwarder.protocol; import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileInputStream; @@ -11,7 +12,9 @@ import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; +import java.util.List; import java.util.Map; +import java.util.zip.Deflater; import javax.net.SocketFactory; import javax.net.ssl.SSLContext; @@ -23,6 +26,7 @@ public class LumberjackClient { private final static byte FRAME_ACK = 0x41; private final static byte FRAME_WINDOW_SIZE = 0x57; private final static byte FRAME_DATA = 0x44; + private final static byte FRAME_COMPRESSED = 0x43; private SSLSocket socket; private KeyStore keyStore; @@ -88,6 +92,34 @@ public class LumberjackClient { return sendDataFrame(output, keyValues); } + public int sendCompressedFrame(List> keyValuesList) throws IOException { + output.writeByte(PROTOCOL_VERSION); + output.writeByte(FRAME_COMPRESSED); + + ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream(); + DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes); + for(Map keyValues : keyValuesList) { + sendDataFrame(uncompressedOutput, keyValues); + } + uncompressedOutput.close(); + Deflater compressor = new Deflater(); + compressor.setInput(uncompressedBytes.toByteArray()); + compressor.finish(); + + ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + while(!compressor.finished()) { + int count = compressor.deflate(buffer); + compressedBytes.write(buffer, 0, count); + } + compressedBytes.close(); + + output.writeInt(compressor.getTotalOut()); + output.write(compressedBytes.toByteArray()); + + return 6 + compressor.getTotalOut(); + } + public int readAckFrame() throws IOException { byte protocolVersion = input.readByte(); if(protocolVersion != PROTOCOL_VERSION) {