diff --git a/src/main/java/info/fetter/logstashforwarder/Event.java b/src/main/java/info/fetter/logstashforwarder/Event.java index 7775b37..d2752ac 100644 --- a/src/main/java/info/fetter/logstashforwarder/Event.java +++ b/src/main/java/info/fetter/logstashforwarder/Event.java @@ -23,6 +23,15 @@ import java.util.Map; public class Event { private Map keyValues = new HashMap(10); + public Event() { + } + + public Event(Event event) { + if(event != null) { + keyValues.putAll(event.keyValues); + } + } + public void addField(String key, byte[] value) { keyValues.put(key, value); } @@ -31,6 +40,10 @@ public class Event { keyValues.put(key, value.getBytes()); } + public void addField(String key, long value) { + keyValues.put(key, String.valueOf(value).getBytes()); + } + public Map getKeyValues() { return keyValues; } diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 4ccb9d5..8f60379 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -1,7 +1,14 @@ package info.fetter.logstashforwarder; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /* * Copyright 2015 Didier Fetter @@ -25,24 +32,88 @@ public class FileReader { private ProtocolAdapter adapter; private int spoolSize = DEFAULT_SPOOL_SIZE; private List eventList; - + private Map pointerMap; + private String hostname; + { + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + public FileReader(int spoolSize) { this.spoolSize = spoolSize; eventList = new ArrayList(spoolSize); } - - public int readFiles(List fileList) { - // TODO: Read files and send events until there's nothing left to read or spool size reached - int eventCounter = 0; + + public int readFiles(List fileList) throws IOException { + int eventCount = 0; + pointerMap = new HashMap(fileList.size(),1); for(FileState state : fileList) { - eventCounter += readFile(state, spoolSize - eventCounter); + eventCount += readFile(state, spoolSize - eventCount); } - return 0; // Return number of events sent to adapter + adapter.sendEvents(eventList); + for(FileState state : fileList) { + state.setPointer(pointerMap.get(state.getFile())); + } + eventList.clear(); + return eventCount; // Return number of events sent to adapter } - private int readFile(FileState state, int spaceLeftInSpool) { - // TODO Read file - return 0; // Return number of events read + private int readFile(FileState state, int spaceLeftInSpool) throws IOException { + int eventListSizeBefore = eventList.size(); + File file = state.getFile(); + long pointer = state.getPointer(); + RandomAccessFile reader = state.getRandomAccessFile(); + reader.seek(pointer); + pointer = readLines(state, spaceLeftInSpool); + pointerMap.put(file, pointer); + return eventList.size() - eventListSizeBefore; // Return number of events read + } + + private long readLines(FileState state, int spaceLeftInSpool) throws IOException { + RandomAccessFile reader = state.getRandomAccessFile(); + long pos = reader.getFilePointer(); + String line = readLine(reader); + while (line != null && spaceLeftInSpool > 0) { + pos = reader.getFilePointer(); + addEvent(state, pos, line); + line = readLine(reader); + } + reader.seek(pos); // Ensure we can re-read if necessary + return pos; + } + + private String readLine(RandomAccessFile reader) throws IOException { + StringBuffer sb = new StringBuffer(); + int ch; + boolean seenCR = false; + while((ch=reader.read()) != -1) { + switch(ch) { + case '\n': + return sb.toString(); + case '\r': + seenCR = true; + break; + default: + if (seenCR) { + sb.append('\r'); + seenCR = false; + } + sb.append((char)ch); // add character, not its ascii value + } + } + return null; + } + + private void addEvent(FileState state, long pos, String line) throws IOException { + Event event = new Event(state.getFields()); + event.addField("file", state.getFile().getCanonicalPath()); + event.addField("offset", pos); + event.addField("line", line); + event.addField("host", hostname); + eventList.add(event); } public ProtocolAdapter getAdapter() { diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java index 10e902a..013507e 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -34,6 +34,7 @@ public class FileState { private long pointer = 0; private FileState oldFileState; private String fileName; + private Event fields; public FileState(File file) throws IOException { this.file = file; @@ -115,5 +116,13 @@ public class FileState { public String getFileName() { return fileName; } + + public Event getFields() { + return fields; + } + + public void setFields(Event fields) { + this.fields = fields; + } }