This commit is contained in:
didfet
2015-09-03 15:04:41 +02:00
parent 081cbcb1f3
commit 5443fdee69
5 changed files with 40 additions and 23 deletions

View File

@@ -17,6 +17,7 @@ package info.fetter.logstashforwarder;
*
*/
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
@@ -32,7 +33,7 @@ public class Event {
}
}
public Event(Map<String,String> fields) {
public Event(Map<String,String> fields) throws UnsupportedEncodingException {
for(String key : fields.keySet()) {
addField(key, fields.get(key));
}
@@ -43,12 +44,12 @@ public class Event {
return this;
}
public Event addField(String key, String value) {
public Event addField(String key, String value) throws UnsupportedEncodingException {
keyValues.put(key, value.getBytes());
return this;
}
public Event addField(String key, long value) {
public Event addField(String key, long value) throws UnsupportedEncodingException {
keyValues.put(key, String.valueOf(value).getBytes());
return this;
}

View File

@@ -44,7 +44,6 @@ public class FileReader extends Reader {
public int readFiles(Collection<FileState> fileList) throws AdapterException {
int eventCount = 0;
stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY);
if(logger.isTraceEnabled()) {
logger.trace("Reading " + fileList.size() + " file(s)");
}
@@ -126,10 +125,10 @@ public class FileReader extends Reader {
long pos = state.getPointer();
try {
reader.seek(pos);
String line = readLine(reader);
byte[] line = readLine(reader);
while (line != null && spaceLeftInSpool > 0) {
if(logger.isTraceEnabled()) {
logger.trace("-- Read line : " + line);
logger.trace("-- Read line : " + new String(line));
logger.trace("-- Space left in spool : " + spaceLeftInSpool);
}
pos = reader.getFilePointer();
@@ -144,23 +143,26 @@ public class FileReader extends Reader {
return pos;
}
private String readLine(RandomAccessFile reader) throws IOException {
stringBuilder.setLength(0);
private byte[] readLine(RandomAccessFile reader) throws IOException {
byteBuffer.clear();
int ch;
boolean seenCR = false;
while((ch=reader.read()) != -1) {
switch(ch) {
case '\n':
return stringBuilder.toString();
byte[] line = new byte[byteBuffer.position()];
byteBuffer.rewind();
byteBuffer.get(line);
return line;
case '\r':
seenCR = true;
break;
default:
if (seenCR) {
stringBuilder.append('\r');
byteBuffer.put((byte) '\r');
seenCR = false;
}
stringBuilder.append((char)ch); // add character, not its ascii value
byteBuffer.put((byte)ch);
}
}
return null;

View File

@@ -35,7 +35,6 @@ public class InputReader extends Reader {
public InputReader(int spoolSize, InputStream in) {
super(spoolSize);
reader = new BufferedReader(new InputStreamReader(in));
stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY);
}
public int readInput() throws AdapterException, IOException {
@@ -54,35 +53,36 @@ public class InputReader extends Reader {
private int readLines() throws IOException {
int lineCount = 0;
String line;
byte[] line;
while(lineCount < spoolSize && (line = readLine()) != null) {
position += line.length();
position += line.length;
lineCount++;
addEvent("stdin", fields, position, line);
}
return lineCount;
}
private String readLine() throws IOException {
private byte[] readLine() throws IOException {
int ch;
boolean seenCR = false;
String line;
while(reader.ready()) {
ch=reader.read();
switch(ch) {
case '\n':
line = stringBuilder.toString();
stringBuilder.setLength(0);
byte[] line = new byte[byteBuffer.position()];
byteBuffer.rewind();
byteBuffer.get(line);
byteBuffer.clear();
return line;
case '\r':
seenCR = true;
break;
default:
if (seenCR) {
stringBuilder.append('\r');
byteBuffer.put((byte) '\r');
seenCR = false;
}
stringBuilder.append((char)ch); // add character, not its ascii value
byteBuffer.put((byte)ch);
}
}
return null;

View File

@@ -21,6 +21,7 @@ package info.fetter.logstashforwarder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -29,8 +30,8 @@ public abstract class Reader {
protected ProtocolAdapter adapter;
protected int spoolSize = 0;
protected List<Event> eventList;
protected final int STRINGBUILDER_INITIAL_CAPACITY = 1000;
protected StringBuilder stringBuilder;
protected final int BYTEBUFFER_CAPACITY = 1024 * 1024;
protected ByteBuffer byteBuffer = ByteBuffer.allocate(BYTEBUFFER_CAPACITY);
private String hostname;
{
try {
@@ -49,6 +50,19 @@ public abstract class Reader {
addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line);
}
protected void addEvent(FileState state, long pos, byte[] line) throws IOException {
addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line);
}
protected void addEvent(String fileName, Event fields, long pos, byte[] line) throws IOException {
Event event = new Event(fields);
event.addField("file", fileName)
.addField("offset", pos)
.addField("line", line)
.addField("host", hostname);
eventList.add(event);
}
protected void addEvent(String fileName, Event fields, long pos, String line) throws IOException {
Event event = new Event(fields);
event.addField("file", fileName)