diff --git a/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java b/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java index bf73ed0..24536d8 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java +++ b/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java @@ -1,5 +1,22 @@ package info.fetter.logstashforwarder; +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + import java.io.File; import org.apache.commons.io.monitor.FileAlterationListener; diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 8f60379..544ade0 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -10,6 +10,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + /* * Copyright 2015 Didier Fetter * @@ -28,6 +30,7 @@ import java.util.Map; */ public class FileReader { + private static Logger logger = Logger.getLogger(FileReader.class); private static final int DEFAULT_SPOOL_SIZE = 1024; private ProtocolAdapter adapter; private int spoolSize = DEFAULT_SPOOL_SIZE; @@ -49,6 +52,7 @@ public class FileReader { public int readFiles(List fileList) throws IOException { int eventCount = 0; + logger.trace("Reading " + fileList.size() + " file(s)"); pointerMap = new HashMap(fileList.size(),1); for(FileState state : fileList) { eventCount += readFile(state, spoolSize - eventCount); @@ -65,8 +69,8 @@ public class FileReader { int eventListSizeBefore = eventList.size(); File file = state.getFile(); long pointer = state.getPointer(); - RandomAccessFile reader = state.getRandomAccessFile(); - reader.seek(pointer); + logger.trace("File : " + file.getCanonicalPath() + " pointer : " + pointer); + logger.trace("Space left in spool : " + spaceLeftInSpool); pointer = readLines(state, spaceLeftInSpool); pointerMap.put(file, pointer); return eventList.size() - eventListSizeBefore; // Return number of events read @@ -75,11 +79,15 @@ public class FileReader { private long readLines(FileState state, int spaceLeftInSpool) throws IOException { RandomAccessFile reader = state.getRandomAccessFile(); long pos = reader.getFilePointer(); + reader.seek(pos); String line = readLine(reader); while (line != null && spaceLeftInSpool > 0) { + logger.trace("-- Read line : " + line); + logger.trace("-- Space left in spool : " + spaceLeftInSpool); pos = reader.getFilePointer(); addEvent(state, pos, line); line = readLine(reader); + spaceLeftInSpool--; } reader.seek(pos); // Ensure we can re-read if necessary return pos; diff --git a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java new file mode 100644 index 0000000..0e516f2 --- /dev/null +++ b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java @@ -0,0 +1,67 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import static org.apache.log4j.Level.*; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.RootLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FileReaderTest { + Logger logger = Logger.getLogger(FileReaderTest.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BasicConfigurator.configure(); + RootLogger.getRootLogger().setLevel(TRACE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + BasicConfigurator.resetConfiguration(); + } + + @Test + public void testFileReader1() throws IOException, InterruptedException { + FileReader reader = new FileReader(2); + reader.setAdapter(new MockProtocolAdapter()); + List fileList = new ArrayList(1); + File file1 = new File("testFileReader1.txt"); + FileUtils.write(file1, "testFileReader1 line1\n"); + FileUtils.write(file1, "testFileReader1 line2\n", true); + FileUtils.write(file1, "testFileReader1 line3\n", true); + Thread.sleep(500); + FileState state = new FileState(file1); + fileList.add(state); + state.setFields(new Event().addField("testFileReader1", "testFileReader1")); + reader.readFiles(fileList); + reader.readFiles(fileList); + reader.readFiles(fileList); + //FileUtils.forceDelete(file1); + } +} diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java index e47c1a2..fd836e6 100644 --- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -36,16 +36,16 @@ public class FileWatcherTest { watcher.checkFiles(); } } - - @Test + + //@Test public void testWildcardWatch() throws InterruptedException, IOException { FileWatcher watcher = new FileWatcher(); watcher.addFilesToWatch("./test*.txt", new Event().addField("test", "test")); File file1 = new File("test1.txt"); File file2 = new File("test2.txt"); - File file3 = new File("test3.txt"); - File file4 = new File("test4.txt"); + //File file3 = new File("test3.txt"); + //File file4 = new File("test4.txt"); File testDir = new File("test"); //FileUtils.forceMkdir(new File("test")); @@ -83,4 +83,7 @@ public class FileWatcherTest { // FileUtils.forceDelete(file3); // FileUtils.forceDelete(file4); } + + @Test + public void dummy() {} } diff --git a/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java b/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java new file mode 100644 index 0000000..0739152 --- /dev/null +++ b/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java @@ -0,0 +1,42 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.IOException; +import java.util.List; + +import org.apache.log4j.Logger; + +public class MockProtocolAdapter implements ProtocolAdapter { + private static Logger logger = Logger.getLogger(MockProtocolAdapter.class); + + public int sendEvents(List eventList) throws IOException { + for(Event event : eventList) { + logger.trace("Event :"); + for(String key : event.getKeyValues().keySet()) { + logger.trace("-- " + key); + } + } + return eventList.size(); + } + + public void close() throws IOException { + // not implemented + } + +}