diff --git a/pom.xml b/pom.xml index a99cbcc..82bccbe 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 logstash-forwarder-java logstash-forwarder-java - 0.2.3 + 0.2.4-SNAPSHOT logstash-forwarder-java Java version of logstash forwarder https://github.com/didfet/logstash-forwarder-java diff --git a/src/main/java/info/fetter/logstashforwarder/Event.java b/src/main/java/info/fetter/logstashforwarder/Event.java index 1ae1c50..a4be5b0 100644 --- a/src/main/java/info/fetter/logstashforwarder/Event.java +++ b/src/main/java/info/fetter/logstashforwarder/Event.java @@ -17,6 +17,7 @@ package info.fetter.logstashforwarder; * */ +import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; @@ -32,7 +33,7 @@ public class Event { } } - public Event(Map fields) { + public Event(Map fields) throws UnsupportedEncodingException { for(String key : fields.keySet()) { addField(key, fields.get(key)); } @@ -43,12 +44,12 @@ public class Event { return this; } - public Event addField(String key, String value) { + public Event addField(String key, String value) throws UnsupportedEncodingException { keyValues.put(key, value.getBytes()); return this; } - public Event addField(String key, long value) { + public Event addField(String key, long value) throws UnsupportedEncodingException { keyValues.put(key, String.valueOf(value).getBytes()); return this; } diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 53b787b..c092a1e 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -44,7 +44,6 @@ public class FileReader extends Reader { public int readFiles(Collection fileList) throws AdapterException { int eventCount = 0; - stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY); if(logger.isTraceEnabled()) { logger.trace("Reading " + fileList.size() + " file(s)"); } @@ -126,10 +125,10 @@ public class FileReader extends Reader { long pos = state.getPointer(); try { reader.seek(pos); - String line = readLine(reader); + byte[] line = readLine(reader); while (line != null && spaceLeftInSpool > 0) { if(logger.isTraceEnabled()) { - logger.trace("-- Read line : " + line); + logger.trace("-- Read line : " + new String(line)); logger.trace("-- Space left in spool : " + spaceLeftInSpool); } pos = reader.getFilePointer(); @@ -144,23 +143,26 @@ public class FileReader extends Reader { return pos; } - private String readLine(RandomAccessFile reader) throws IOException { - stringBuilder.setLength(0); + private byte[] readLine(RandomAccessFile reader) throws IOException { + byteBuffer.clear(); int ch; boolean seenCR = false; while((ch=reader.read()) != -1) { switch(ch) { case '\n': - return stringBuilder.toString(); + byte[] line = new byte[byteBuffer.position()]; + byteBuffer.rewind(); + byteBuffer.get(line); + return line; case '\r': seenCR = true; break; default: if (seenCR) { - stringBuilder.append('\r'); + byteBuffer.put((byte) '\r'); seenCR = false; } - stringBuilder.append((char)ch); // add character, not its ascii value + byteBuffer.put((byte)ch); } } return null; diff --git a/src/main/java/info/fetter/logstashforwarder/InputReader.java b/src/main/java/info/fetter/logstashforwarder/InputReader.java index f1751ed..6b6b3d8 100644 --- a/src/main/java/info/fetter/logstashforwarder/InputReader.java +++ b/src/main/java/info/fetter/logstashforwarder/InputReader.java @@ -35,7 +35,6 @@ public class InputReader extends Reader { public InputReader(int spoolSize, InputStream in) { super(spoolSize); reader = new BufferedReader(new InputStreamReader(in)); - stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY); } public int readInput() throws AdapterException, IOException { @@ -54,35 +53,36 @@ public class InputReader extends Reader { private int readLines() throws IOException { int lineCount = 0; - String line; + byte[] line; while(lineCount < spoolSize && (line = readLine()) != null) { - position += line.length(); + position += line.length; lineCount++; addEvent("stdin", fields, position, line); } return lineCount; } - private String readLine() throws IOException { + private byte[] readLine() throws IOException { int ch; boolean seenCR = false; - String line; while(reader.ready()) { ch=reader.read(); switch(ch) { case '\n': - line = stringBuilder.toString(); - stringBuilder.setLength(0); + byte[] line = new byte[byteBuffer.position()]; + byteBuffer.rewind(); + byteBuffer.get(line); + byteBuffer.clear(); return line; case '\r': seenCR = true; break; default: if (seenCR) { - stringBuilder.append('\r'); + byteBuffer.put((byte) '\r'); seenCR = false; } - stringBuilder.append((char)ch); // add character, not its ascii value + byteBuffer.put((byte)ch); } } return null; diff --git a/src/main/java/info/fetter/logstashforwarder/Reader.java b/src/main/java/info/fetter/logstashforwarder/Reader.java index ce980b7..c6b73ac 100644 --- a/src/main/java/info/fetter/logstashforwarder/Reader.java +++ b/src/main/java/info/fetter/logstashforwarder/Reader.java @@ -21,6 +21,7 @@ package info.fetter.logstashforwarder; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -29,8 +30,8 @@ public abstract class Reader { protected ProtocolAdapter adapter; protected int spoolSize = 0; protected List eventList; - protected final int STRINGBUILDER_INITIAL_CAPACITY = 1000; - protected StringBuilder stringBuilder; + protected final int BYTEBUFFER_CAPACITY = 1024 * 1024; + protected ByteBuffer byteBuffer = ByteBuffer.allocate(BYTEBUFFER_CAPACITY); private String hostname; { try { @@ -49,6 +50,19 @@ public abstract class Reader { addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line); } + protected void addEvent(FileState state, long pos, byte[] line) throws IOException { + addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line); + } + + protected void addEvent(String fileName, Event fields, long pos, byte[] line) throws IOException { + Event event = new Event(fields); + event.addField("file", fileName) + .addField("offset", pos) + .addField("line", line) + .addField("host", hostname); + eventList.add(event); + } + protected void addEvent(String fileName, Event fields, long pos, String line) throws IOException { Event event = new Event(fields); event.addField("file", fileName)