From 340076f85023019c48c54266e477bf5211f73ec0 Mon Sep 17 00:00:00 2001 From: didfet Date: Sun, 15 Mar 2015 00:29:05 +0100 Subject: [PATCH] Created FileReader and ProtocolAdapter --- .../fetter/logstashforwarder/FileReader.java | 56 +++++++++++++++++++ .../fetter/logstashforwarder/FileWatcher.java | 31 ++-------- .../logstashforwarder/ProtocolAdapter.java | 26 +++++++++ .../protocol/LumberjackClient.java | 3 +- 4 files changed, 90 insertions(+), 26 deletions(-) create mode 100644 src/main/java/info/fetter/logstashforwarder/FileReader.java create mode 100644 src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java new file mode 100644 index 0000000..4ccb9d5 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -0,0 +1,56 @@ +package info.fetter.logstashforwarder; + +import java.util.ArrayList; +import java.util.List; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +public class FileReader { + private static final int DEFAULT_SPOOL_SIZE = 1024; + private ProtocolAdapter adapter; + private int spoolSize = DEFAULT_SPOOL_SIZE; + private List eventList; + + public FileReader(int spoolSize) { + this.spoolSize = spoolSize; + eventList = new ArrayList(spoolSize); + } + + public int readFiles(List fileList) { + // TODO: Read files and send events until there's nothing left to read or spool size reached + int eventCounter = 0; + for(FileState state : fileList) { + eventCounter += readFile(state, spoolSize - eventCounter); + } + return 0; // Return number of events sent to adapter + } + + private int readFile(FileState state, int spaceLeftInSpool) { + // TODO Read file + return 0; // Return number of events read + } + + public ProtocolAdapter getAdapter() { + return adapter; + } + + public void setAdapter(ProtocolAdapter adapter) { + this.adapter = adapter; + } + +} diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index b109460..65af0fc 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -55,11 +55,11 @@ public class FileWatcher implements FileAlterationListener { public void addFilesToWatch(String fileToWatch) { try { if(fileToWatch.equals("-")) { - watchStdIn(); + addStdIn(); } else if(fileToWatch.contains("*")) { - watchWildCardFiles(fileToWatch); + addWildCardFiles(fileToWatch); } else { - watchFile(fileToWatch); + addSingleFile(fileToWatch); } } catch(Exception e) { throw new RuntimeException(e); @@ -69,12 +69,10 @@ public class FileWatcher implements FileAlterationListener { public void checkFiles() throws IOException { logger.debug("Checking files"); logger.trace("=============="); - for(FileAlterationObserver observer : observerList) { observer.checkAndNotify(); } processModifications(); - printWatchMap(); } @@ -95,39 +93,31 @@ public class FileWatcher implements FileAlterationListener { logger.trace("-- Filename : " + state.getFileName()); } - // Determine if file is still the same logger.trace("Determine if file has just been written to"); FileState oldState = watchMap.get(file); if(oldState != null) { if(oldState.getSize() > state.getSize()) { - // File is shorter, can't be the same logger.trace("File shorter : file can't be the same"); } else { if(oldState.getSignatureLength() == state.getSignatureLength() && oldState.getSignature() == state.getSignature()) { - // File is the same state.setOldFileState(oldState); logger.trace("Same signature size and value : file is the same"); continue; } else if(oldState.getSignatureLength() < state.getSignatureLength()){ - // Compute the signature on the new file long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength()); if(signature == oldState.getSignature()) { - // File is the same state.setOldFileState(oldState); logger.trace("Same signature : file is the same"); continue; } else { - // File can't be the same logger.trace("Signature different : file can't be the same"); } } else if(oldState.getSignatureLength() > state.getSignatureLength()){ - // File can't be the same logger.trace("Signature shorter : file can't be the same"); } } } - // Determine if file has been renamed and/or written to if(state.getOldFileState() == null) { logger.trace("Determine if file has been renamed and/or written to"); for(File otherFile : watchMap.keySet()) { @@ -136,26 +126,20 @@ public class FileWatcher implements FileAlterationListener { if(logger.isTraceEnabled()) { logger.trace("Comparing to : " + otherFile.getCanonicalPath()); } - // File in the same directory which was smaller if(otherState.getSignatureLength() == state.getSignatureLength() && otherState.getSignature() == state.getSignature()) { - // File is the same state.setOldFileState(otherState); logger.trace("Same signature size and value : file is the same"); break; } else if(otherState.getSignatureLength() < state.getSignatureLength()){ - // Compute the signature on the new file long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength()); if(signature == otherState.getSignature()) { - // File is the same state.setOldFileState(otherState); logger.trace("Same signature : file is the same"); break; } else { - // File can't be the same logger.trace("Signature different : file can't be the same"); } } else if(otherState.getSignatureLength() > state.getSignatureLength()){ - // File can't be the same logger.trace("Signature shorter : file can't be the same"); } } @@ -163,7 +147,6 @@ public class FileWatcher implements FileAlterationListener { } } - // Refresh file state logger.trace("Refreshing file state"); for(File file : changedWatchMap.keySet()) { if(logger.isTraceEnabled()) { @@ -180,10 +163,8 @@ public class FileWatcher implements FileAlterationListener { } } - // Replacing old state logger.trace("Replacing old state"); for(File file : changedWatchMap.keySet()) { - //logger.trace("Replacing file : " + file.getCanonicalPath()); FileState state = changedWatchMap.get(file); watchMap.put(file, state); } @@ -194,7 +175,7 @@ public class FileWatcher implements FileAlterationListener { removeMarkedFilesFromWatchMap(); } - private void watchFile(String fileToWatch) throws Exception { + private void addSingleFile(String fileToWatch) throws Exception { logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); String directory = FilenameUtils.getFullPath(fileToWatch); String fileName = FilenameUtils.getName(fileToWatch); @@ -205,7 +186,7 @@ public class FileWatcher implements FileAlterationListener { initializeWatchMap(new File(directory), fileFilter); } - private void watchWildCardFiles(String filesToWatch) throws Exception { + private void addWildCardFiles(String filesToWatch) throws Exception { logger.info("Watching wildcard files : " + filesToWatch); String directory = FilenameUtils.getFullPath(filesToWatch); String wildcard = FilenameUtils.getName(filesToWatch); @@ -217,7 +198,7 @@ public class FileWatcher implements FileAlterationListener { initializeWatchMap(new File(directory), fileFilter); } - private void watchStdIn() { + private void addStdIn() { logger.info("Watching stdin : not implemented yet"); } diff --git a/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java new file mode 100644 index 0000000..52d0f2a --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java @@ -0,0 +1,26 @@ +package info.fetter.logstashforwarder; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.IOException; +import java.util.List; + +public interface ProtocolAdapter { + public int sendEvents(List eventList) throws IOException; + public void close() throws IOException; +} diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index 810e140..542969e 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -18,6 +18,7 @@ package info.fetter.logstashforwarder.protocol; */ import info.fetter.logstashforwarder.Event; +import info.fetter.logstashforwarder.ProtocolAdapter; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; @@ -40,7 +41,7 @@ import javax.net.ssl.TrustManagerFactory; import org.apache.commons.io.HexDump; import org.apache.log4j.Logger; -public class LumberjackClient { +public class LumberjackClient implements ProtocolAdapter { private final static Logger logger = Logger.getLogger(LumberjackClient.class); private final static byte PROTOCOL_VERSION = 0x31; private final static byte FRAME_ACK = 0x41;