diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 34c818a..b5a5588 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -22,9 +22,6 @@ import info.fetter.logstashforwarder.util.AdapterException; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -33,31 +30,17 @@ import java.util.Map; import org.apache.log4j.Logger; -public class FileReader { +public class FileReader extends Reader { private static Logger logger = Logger.getLogger(FileReader.class); private static final byte[] ZIP_MAGIC = new byte[] {(byte) 0x50, (byte) 0x4b, (byte) 0x03, (byte) 0x04}; private static final byte[] LZW_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x9d}; private static final byte[] LZH_MAGIC = new byte[] {(byte) 0x1f, (byte) 0xa0}; private static final byte[] GZ_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x8b, (byte) 0x08}; private static final byte[][] MAGICS = new byte[][] {ZIP_MAGIC, LZW_MAGIC, LZH_MAGIC, GZ_MAGIC}; - private ProtocolAdapter adapter; - private int spoolSize = 0; - private List eventList; private Map pointerMap; - private final int STRINGBUILDER_INITIAL_CAPACITY = 1000; - private StringBuilder stringBuilder; - private String hostname; - { - try { - hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - } public FileReader(int spoolSize) { - this.spoolSize = spoolSize; - eventList = new ArrayList(spoolSize); + super(spoolSize); } public int readFiles(Collection fileList) throws AdapterException { @@ -165,21 +148,4 @@ public class FileReader { return null; } - private void addEvent(FileState state, long pos, String line) throws IOException { - Event event = new Event(state.getFields()); - event.addField("file", state.getFile().getCanonicalPath()) - .addField("offset", pos) - .addField("line", line) - .addField("host", hostname); - eventList.add(event); - } - - public ProtocolAdapter getAdapter() { - return adapter; - } - - public void setAdapter(ProtocolAdapter adapter) { - this.adapter = adapter; - } - } diff --git a/src/main/java/info/fetter/logstashforwarder/InputReader.java b/src/main/java/info/fetter/logstashforwarder/InputReader.java new file mode 100644 index 0000000..b050bb9 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/InputReader.java @@ -0,0 +1,92 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import info.fetter.logstashforwarder.util.AdapterException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.log4j.Logger; + +public class InputReader extends Reader { + private static Logger logger = Logger.getLogger(InputReader.class); + private BufferedReader reader; + private long position = 0; + private Event fields; + + public InputReader(int spoolSize, InputStream in, Event fields) { + super(spoolSize); + reader = new BufferedReader(new InputStreamReader(in)); + this.fields = fields; + stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY); + } + + public int readInput() throws AdapterException, IOException { + int eventCount = 0; + logger.trace("Reading stdin"); + + eventCount += readLines(); + + if(eventCount > 0) { + adapter.sendEvents(eventList); + } + + eventList.clear(); + return eventCount; + } + + private int readLines() throws IOException { + int lineCount = 0; + String line; + while(lineCount < spoolSize && (line = readLine()) != null) { + position += line.length(); + lineCount++; + addEvent("stdin", fields, position, line); + } + return lineCount; + } + + private String readLine() throws IOException { + int ch; + boolean seenCR = false; + String line; + while(reader.ready()) { + ch=reader.read(); + switch(ch) { + case '\n': + line = stringBuilder.toString(); + stringBuilder.setLength(0); + return line; + case '\r': + seenCR = true; + break; + default: + if (seenCR) { + stringBuilder.append('\r'); + seenCR = false; + } + stringBuilder.append((char)ch); // add character, not its ascii value + } + } + return null; + } + +} diff --git a/src/main/java/info/fetter/logstashforwarder/Reader.java b/src/main/java/info/fetter/logstashforwarder/Reader.java new file mode 100644 index 0000000..ce980b7 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/Reader.java @@ -0,0 +1,68 @@ +package info.fetter.logstashforwarder; + + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + + +public abstract class Reader { + protected ProtocolAdapter adapter; + protected int spoolSize = 0; + protected List eventList; + protected final int STRINGBUILDER_INITIAL_CAPACITY = 1000; + protected StringBuilder stringBuilder; + private String hostname; + { + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + protected Reader(int spoolSize) { + this.spoolSize = spoolSize; + eventList = new ArrayList(spoolSize); + } + + protected void addEvent(FileState state, long pos, String line) throws IOException { + addEvent(state.getFile().getCanonicalPath(), state.getFields(), pos, line); + } + + protected void addEvent(String fileName, Event fields, long pos, String line) throws IOException { + Event event = new Event(fields); + event.addField("file", fileName) + .addField("offset", pos) + .addField("line", line) + .addField("host", hostname); + eventList.add(event); + } + + public ProtocolAdapter getAdapter() { + return adapter; + } + + public void setAdapter(ProtocolAdapter adapter) { + this.adapter = adapter; + } +} diff --git a/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java b/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java new file mode 100644 index 0000000..90e32dd --- /dev/null +++ b/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java @@ -0,0 +1,90 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import static org.apache.log4j.Level.*; +import info.fetter.logstashforwarder.util.AdapterException; + +import java.io.File; +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.RootLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class InputReaderTest { + Logger logger = Logger.getLogger(InputReaderTest.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BasicConfigurator.configure(); + RootLogger.getRootLogger().setLevel(TRACE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + BasicConfigurator.resetConfiguration(); + } + + @Test + public void testInputReader1() throws IOException, InterruptedException, AdapterException { + PipedInputStream in = new PipedInputStream(); + PipedOutputStream out = new PipedOutputStream(in); + PrintWriter writer = new PrintWriter(out); + InputReader reader = new InputReader(2, in, null); + reader.setAdapter(new MockProtocolAdapter()); + + reader.readInput(); + + writer.println("line1"); + writer.flush(); + reader.readInput(); + + writer.print("line2"); + writer.flush(); + reader.readInput(); + + writer.println(); + writer.flush(); + reader.readInput(); + + writer.println("line3"); + writer.println("line4"); + writer.println("line5"); + writer.flush(); + reader.readInput(); + + reader.readInput(); + reader.readInput(); + + while(in.available() > 0) { + logger.trace("read : " + in.read()); + } + + writer.close(); + } +}