Implemented compressed frame.

This commit is contained in:
didfet
2015-03-09 22:13:41 +01:00
parent d88b8b6bd5
commit 0fa6300843

View File

@@ -1,6 +1,7 @@
package info.fetter.logstashforwarder.protocol; package info.fetter.logstashforwarder.protocol;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
@@ -11,7 +12,9 @@ import java.security.KeyStore;
import java.security.KeyStoreException; import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.zip.Deflater;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@@ -23,6 +26,7 @@ public class LumberjackClient {
private final static byte FRAME_ACK = 0x41; private final static byte FRAME_ACK = 0x41;
private final static byte FRAME_WINDOW_SIZE = 0x57; private final static byte FRAME_WINDOW_SIZE = 0x57;
private final static byte FRAME_DATA = 0x44; private final static byte FRAME_DATA = 0x44;
private final static byte FRAME_COMPRESSED = 0x43;
private SSLSocket socket; private SSLSocket socket;
private KeyStore keyStore; private KeyStore keyStore;
@@ -88,6 +92,34 @@ public class LumberjackClient {
return sendDataFrame(output, keyValues); return sendDataFrame(output, keyValues);
} }
public int sendCompressedFrame(List<Map<String,byte[]>> keyValuesList) throws IOException {
output.writeByte(PROTOCOL_VERSION);
output.writeByte(FRAME_COMPRESSED);
ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream();
DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes);
for(Map<String,byte[]> keyValues : keyValuesList) {
sendDataFrame(uncompressedOutput, keyValues);
}
uncompressedOutput.close();
Deflater compressor = new Deflater();
compressor.setInput(uncompressedBytes.toByteArray());
compressor.finish();
ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
while(!compressor.finished()) {
int count = compressor.deflate(buffer);
compressedBytes.write(buffer, 0, count);
}
compressedBytes.close();
output.writeInt(compressor.getTotalOut());
output.write(compressedBytes.toByteArray());
return 6 + compressor.getTotalOut();
}
public int readAckFrame() throws IOException { public int readAckFrame() throws IOException {
byte protocolVersion = input.readByte(); byte protocolVersion = input.readByte();
if(protocolVersion != PROTOCOL_VERSION) { if(protocolVersion != PROTOCOL_VERSION) {