From 69d181dfd032c841249e5eb6aac18018832ce15c Mon Sep 17 00:00:00 2001 From: didfet Date: Sat, 4 Apr 2015 23:35:05 +0200 Subject: [PATCH] Implemented stdin input (issue #5). --- .../fetter/logstashforwarder/FileWatcher.java | 17 ++++++++++++++++- .../fetter/logstashforwarder/Forwarder.java | 11 +++++++---- .../fetter/logstashforwarder/InputReader.java | 7 +++++-- .../logstashforwarder/InputReaderTest.java | 4 ++-- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index 1518291..3f320a7 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -44,6 +44,8 @@ public class FileWatcher { private FileState[] savedStates; private int maxSignatureLength; private boolean tail = false; + private Event stdinFields; + private boolean stdinConfigured = false; public FileWatcher() { try { @@ -103,6 +105,17 @@ public class FileWatcher { return numberOfLinesRead; } + public int readStdin(InputReader reader) throws AdapterException, IOException { + if(stdinConfigured) { + logger.debug("Reading stdin"); + reader.setFields(stdinFields); + int numberOfLinesRead = reader.readInput(); + return numberOfLinesRead; + } else { + return 0; + } + } + private void processModifications() throws IOException { for(File file : newWatchMap.keySet()) { @@ -224,7 +237,9 @@ public class FileWatcher { } private void addStdIn(Event fields) { - logger.error("Watching stdin : not implemented yet"); + logger.error("Watching stdin"); + stdinFields = fields; + stdinConfigured = true; } private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception { diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java index a965f65..43d029e 100644 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -53,7 +53,8 @@ public class Forwarder { private static String config; private static ConfigurationManager configManager; private static FileWatcher watcher; - private static FileReader reader; + private static FileReader fileReader; + private static InputReader inputReader; private static Level logLevel = INFO; private static ProtocolAdapter adapter; private static Random random = new Random(); @@ -75,7 +76,8 @@ public class Forwarder { } } watcher.initialize(); - reader = new FileReader(spoolSize); + fileReader = new FileReader(spoolSize); + inputReader = new InputReader(spoolSize, System.in); connectToServer(); infiniteLoop(); } catch(Exception e) { @@ -88,7 +90,8 @@ public class Forwarder { while(true) { try { watcher.checkFiles(); - while(watcher.readFiles(reader) == spoolSize); + while(watcher.readFiles(fileReader) == spoolSize); + while(watcher.readStdin(inputReader) == spoolSize); Thread.sleep(idleTimeout); } catch(AdapterException e) { logger.error("Lost server connection"); @@ -117,7 +120,7 @@ public class Forwarder { String[] serverAndPort = serverList.get(randomServerIndex).split(":"); logger.info("Trying to connect to " + serverList.get(randomServerIndex)); adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout); - reader.setAdapter(adapter); + fileReader.setAdapter(adapter); } catch(Exception ex) { logger.error("Failed to connect to server " + serverList.get(randomServerIndex) + " : " + ex.getMessage()); } diff --git a/src/main/java/info/fetter/logstashforwarder/InputReader.java b/src/main/java/info/fetter/logstashforwarder/InputReader.java index b050bb9..f1751ed 100644 --- a/src/main/java/info/fetter/logstashforwarder/InputReader.java +++ b/src/main/java/info/fetter/logstashforwarder/InputReader.java @@ -32,10 +32,9 @@ public class InputReader extends Reader { private long position = 0; private Event fields; - public InputReader(int spoolSize, InputStream in, Event fields) { + public InputReader(int spoolSize, InputStream in) { super(spoolSize); reader = new BufferedReader(new InputStreamReader(in)); - this.fields = fields; stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY); } @@ -88,5 +87,9 @@ public class InputReader extends Reader { } return null; } + + public void setFields(Event fields) { + this.fields = fields; + } } diff --git a/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java b/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java index 4eb2ae6..189b337 100644 --- a/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java +++ b/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java @@ -52,7 +52,7 @@ public class InputReaderTest { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in); PrintWriter writer = new PrintWriter(out); - InputReader reader = new InputReader(2, in, null); + InputReader reader = new InputReader(2, in); MockProtocolAdapter adapter = new MockProtocolAdapter(); reader.setAdapter(adapter); @@ -103,7 +103,7 @@ public class InputReaderTest { PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in); PrintWriter writer = new PrintWriter(out); - InputReader reader = new InputReader(2, in, null); + InputReader reader = new InputReader(2, in); MockProtocolAdapter adapter = new MockProtocolAdapter(); reader.setAdapter(adapter);