Multiline joint character inserted between lines.

Without this, lines get joined without any space in between
which is a problem for some line formats as separate words
get jumbled together.
This commit is contained in:
Alberto González Palomo
2017-02-01 12:46:02 +01:00
parent e1ac798843
commit 94091a0f9f
3 changed files with 32 additions and 26 deletions

View File

@@ -23,6 +23,7 @@ import info.fetter.logstashforwarder.util.RandomAccessFile;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
//import java.io.RandomAccessFile; //import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@@ -123,6 +124,15 @@ public class FileReader extends Reader {
return false; return false;
} }
private static byte[] extractBytes(ByteBuffer byteBuffer)
{
byte[] bytes = new byte[byteBuffer.position()];
byteBuffer.rewind();
byteBuffer.get(bytes);
byteBuffer.clear();
return bytes;
}
private long readLines(FileState state, int spaceLeftInSpool) { private long readLines(FileState state, int spaceLeftInSpool) {
RandomAccessFile reader = state.getRandomAccessFile(); RandomAccessFile reader = state.getRandomAccessFile();
long pos = state.getPointer(); long pos = state.getPointer();
@@ -130,7 +140,8 @@ public class FileReader extends Reader {
try { try {
reader.seek(pos); reader.seek(pos);
byte[] line = readLine(reader); byte[] line = readLine(reader);
byte[] bufferedLines = null; ByteBuffer bufferedLines = ByteBuffer.allocate(BYTEBUFFER_CAPACITY);
bufferedLines.clear();
while (line != null && spaceLeftInSpool > 0) { while (line != null && spaceLeftInSpool > 0) {
if(logger.isTraceEnabled()) { if(logger.isTraceEnabled()) {
logger.trace("-- Read line : " + new String(line)); logger.trace("-- Read line : " + new String(line));
@@ -148,28 +159,25 @@ public class FileReader extends Reader {
if (multiline.isPatternFound(line)) if (multiline.isPatternFound(line))
{ {
// buffer the line // buffer the line
if (bufferedLines != null) if (bufferedLines.position() > 0) {
{ bufferedLines.put(Multiline.JOINT);
bufferedLines = ArrayUtils.addAll(bufferedLines, line); }
} bufferedLines.put(line);
else
{
bufferedLines = line;
}
} }
else { else {
if (multiline.isPrevious()) { if (multiline.isPrevious()) {
// did not match, so new event started // did not match, so new event started
if (bufferedLines != null) { if (bufferedLines.position() > 0) {
addEvent(state, pos, bufferedLines); addEvent(state, pos, extractBytes(bufferedLines));
} }
bufferedLines = line; bufferedLines.put(line);
} }
else { else {
// did not match, add the current line // did not match, add the current line
if (bufferedLines != null) { if (bufferedLines.position() > 0) {
addEvent(state, pos, ArrayUtils.addAll(bufferedLines, line)); bufferedLines.put(Multiline.JOINT);
bufferedLines = null; bufferedLines.put(line);
addEvent(state, pos, extractBytes(bufferedLines));
} }
else else
addEvent(state, pos, line); addEvent(state, pos, line);
@@ -179,8 +187,8 @@ public class FileReader extends Reader {
line = readLine(reader); line = readLine(reader);
spaceLeftInSpool--; spaceLeftInSpool--;
} }
if (bufferedLines != null) { if (bufferedLines.position() > 0) {
addEvent(state, pos, bufferedLines); // send any buffered lines left addEvent(state, pos, extractBytes(bufferedLines)); // send any buffered lines left
} }
reader.seek(pos); // Ensure we can re-read if necessary reader.seek(pos); // Ensure we can re-read if necessary
} catch(IOException e) { } catch(IOException e) {
@@ -196,10 +204,7 @@ public class FileReader extends Reader {
while((ch=reader.read()) != -1) { while((ch=reader.read()) != -1) {
switch(ch) { switch(ch) {
case '\n': case '\n':
byte[] line = new byte[byteBuffer.position()]; return extractBytes(byteBuffer);
byteBuffer.rewind();
byteBuffer.get(line);
return line;
case '\r': case '\r':
seenCR = true; seenCR = true;
break; break;

View File

@@ -24,6 +24,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
public class Multiline { public class Multiline {
public enum WhatType { Previous, Next }; public enum WhatType { Previous, Next };
public static byte JOINT = (byte) ' ';
private Pattern pattern = null; private Pattern pattern = null;
private boolean negate = false; private boolean negate = false;

View File

@@ -40,20 +40,20 @@ public abstract class Reader {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
protected Reader(int spoolSize) { protected Reader(int spoolSize) {
this.spoolSize = spoolSize; this.spoolSize = spoolSize;
eventList = new ArrayList<Event>(spoolSize); eventList = new ArrayList<Event>(spoolSize);
} }
protected void addEvent(FileState state, long pos, String line) throws IOException { protected void addEvent(FileState state, long pos, String line) throws IOException {
addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line); addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line);
} }
protected void addEvent(FileState state, long pos, byte[] line) throws IOException { protected void addEvent(FileState state, long pos, byte[] line) throws IOException {
addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line); addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line);
} }
protected void addEvent(String fileName, Event fields, long pos, byte[] line) throws IOException { protected void addEvent(String fileName, Event fields, long pos, byte[] line) throws IOException {
Event event = new Event(fields); Event event = new Event(fields);
event.addField("file", fileName) event.addField("file", fileName)
@@ -71,7 +71,7 @@ public abstract class Reader {
.addField("host", hostname); .addField("host", hostname);
eventList.add(event); eventList.add(event);
} }
public ProtocolAdapter getAdapter() { public ProtocolAdapter getAdapter() {
return adapter; return adapter;
} }