Created InputReader.

This commit is contained in:
didfet
2015-04-04 20:17:23 +02:00
parent 920aa3a31a
commit 31c6ee6d9f
4 changed files with 252 additions and 36 deletions

View File

@@ -22,9 +22,6 @@ import info.fetter.logstashforwarder.util.AdapterException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -33,31 +30,17 @@ import java.util.Map;
import org.apache.log4j.Logger;
public class FileReader {
public class FileReader extends Reader {
private static Logger logger = Logger.getLogger(FileReader.class);
private static final byte[] ZIP_MAGIC = new byte[] {(byte) 0x50, (byte) 0x4b, (byte) 0x03, (byte) 0x04};
private static final byte[] LZW_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x9d};
private static final byte[] LZH_MAGIC = new byte[] {(byte) 0x1f, (byte) 0xa0};
private static final byte[] GZ_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x8b, (byte) 0x08};
private static final byte[][] MAGICS = new byte[][] {ZIP_MAGIC, LZW_MAGIC, LZH_MAGIC, GZ_MAGIC};
private ProtocolAdapter adapter;
private int spoolSize = 0;
private List<Event> eventList;
private Map<File,Long> pointerMap;
private final int STRINGBUILDER_INITIAL_CAPACITY = 1000;
private StringBuilder stringBuilder;
private String hostname;
{
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
public FileReader(int spoolSize) {
this.spoolSize = spoolSize;
eventList = new ArrayList<Event>(spoolSize);
super(spoolSize);
}
public int readFiles(Collection<FileState> fileList) throws AdapterException {
@@ -165,21 +148,4 @@ public class FileReader {
return null;
}
private void addEvent(FileState state, long pos, String line) throws IOException {
Event event = new Event(state.getFields());
event.addField("file", state.getFile().getCanonicalPath())
.addField("offset", pos)
.addField("line", line)
.addField("host", hostname);
eventList.add(event);
}
public ProtocolAdapter getAdapter() {
return adapter;
}
public void setAdapter(ProtocolAdapter adapter) {
this.adapter = adapter;
}
}

View File

@@ -0,0 +1,92 @@
package info.fetter.logstashforwarder;
/*
* Copyright 2015 Didier Fetter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import info.fetter.logstashforwarder.util.AdapterException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.log4j.Logger;
public class InputReader extends Reader {
private static Logger logger = Logger.getLogger(InputReader.class);
private BufferedReader reader;
private long position = 0;
private Event fields;
public InputReader(int spoolSize, InputStream in, Event fields) {
super(spoolSize);
reader = new BufferedReader(new InputStreamReader(in));
this.fields = fields;
stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY);
}
public int readInput() throws AdapterException, IOException {
int eventCount = 0;
logger.trace("Reading stdin");
eventCount += readLines();
if(eventCount > 0) {
adapter.sendEvents(eventList);
}
eventList.clear();
return eventCount;
}
private int readLines() throws IOException {
int lineCount = 0;
String line;
while(lineCount < spoolSize && (line = readLine()) != null) {
position += line.length();
lineCount++;
addEvent("stdin", fields, position, line);
}
return lineCount;
}
private String readLine() throws IOException {
int ch;
boolean seenCR = false;
String line;
while(reader.ready()) {
ch=reader.read();
switch(ch) {
case '\n':
line = stringBuilder.toString();
stringBuilder.setLength(0);
return line;
case '\r':
seenCR = true;
break;
default:
if (seenCR) {
stringBuilder.append('\r');
seenCR = false;
}
stringBuilder.append((char)ch); // add character, not its ascii value
}
}
return null;
}
}

View File

@@ -0,0 +1,68 @@
package info.fetter.logstashforwarder;
/*
* Copyright 2015 Didier Fetter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
public abstract class Reader {
protected ProtocolAdapter adapter;
protected int spoolSize = 0;
protected List<Event> eventList;
protected final int STRINGBUILDER_INITIAL_CAPACITY = 1000;
protected StringBuilder stringBuilder;
private String hostname;
{
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
protected Reader(int spoolSize) {
this.spoolSize = spoolSize;
eventList = new ArrayList<Event>(spoolSize);
}
protected void addEvent(FileState state, long pos, String line) throws IOException {
addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line);
}
protected void addEvent(String fileName, Event fields, long pos, String line) throws IOException {
Event event = new Event(fields);
event.addField("file", fileName)
.addField("offset", pos)
.addField("line", line)
.addField("host", hostname);
eventList.add(event);
}
public ProtocolAdapter getAdapter() {
return adapter;
}
public void setAdapter(ProtocolAdapter adapter) {
this.adapter = adapter;
}
}

View File

@@ -0,0 +1,90 @@
package info.fetter.logstashforwarder;
/*
* Copyright 2015 Didier Fetter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import static org.apache.log4j.Level.*;
import info.fetter.logstashforwarder.util.AdapterException;
import java.io.File;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.RootLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class InputReaderTest {
Logger logger = Logger.getLogger(InputReaderTest.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
BasicConfigurator.configure();
RootLogger.getRootLogger().setLevel(TRACE);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
BasicConfigurator.resetConfiguration();
}
@Test
public void testInputReader1() throws IOException, InterruptedException, AdapterException {
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
PrintWriter writer = new PrintWriter(out);
InputReader reader = new InputReader(2, in, null);
reader.setAdapter(new MockProtocolAdapter());
reader.readInput();
writer.println("line1");
writer.flush();
reader.readInput();
writer.print("line2");
writer.flush();
reader.readInput();
writer.println();
writer.flush();
reader.readInput();
writer.println("line3");
writer.println("line4");
writer.println("line5");
writer.flush();
reader.readInput();
reader.readInput();
reader.readInput();
while(in.available() > 0) {
logger.trace("read : " + in.read());
}
writer.close();
}
}