A fast grep
Introduction
In this post, I present a fast Java "grep" application. The application presented here is not a full featured replacement for "grep": rather, it presents a stripped down "grep" application built on top of parts you can extend and configure to implement additional features.
What does the grep application presented here do? It allows you to specify a base directory (containing files and sub-directories) under which to search, a "file name" term indicating the files (lying under this base directory) to search, and a search term. Thus for example, if you wanted to search for all JAVA files lying in the base directory "c:\var\projects" containing the literal "class", you would invoke this grep as follows:
java -jar jgrep.jar -d c:\var\projects -f .java class
Note that the "-f" argument specifies a literal string with which a file name must end in order for it to be included in the search.
Functional elements
If "grepping" may be defined as the act of building a set of files located under a given directory (and sub-directories thereof), for files conforming to a given naming pattern and containing a given search term, then grepping consists of the following independent functional elements:
1. Locating all files and sub-directories under a directory.
2. Filtering out directories and files that conform to a the given file name pattern from the above list, and reading the contents of those files.
3. Searching for the search term within the files obtained in the previous step.
4. Presenting files found in the previous step to the user.
Diagrammatically, we can represent the above steps as follows:
Thinking about the above decomposition, it is apparent that these operations may overlap. In other words, a certain step does not have to wait to begin for its previous steps to complete: each step may be seen as a "producer" that produces work for its successor thread, and, at the same time, a "consumer" for work produced by its predecessor. So, it is possible to redraw the diagram as follows:
Building on the "producer consumer" terminology introduced in the previous paragraph, we can flesh out this diagram with three queues:
In the diagram shown above, the little trapeziods represent queues, with the base (the longer parallel side) representing the producer and the top representing the consumer. Thus, the left most trapeziod represents a queue in which items are produced by the directory scanner and consumed by the file reader. Similarly, the second trapezoid represents a queue in which items are produced by the file reader and consumed by the content finder. The right most trapezoid, similarly, represents a queue in which items are produced by the content finder and consumed by the presentation component.
Code
The code presented below consists of four classes:
1. DirectoryScannerProducer scans a directory using the "fast directory scanning" technique presented in a previous posting, filling a blocking queue with directory and file entries.
2. The IProcessor class is a generic class that declares a generic "process" method that accepts an input of a given generic type, and returning a value of another given generic type. Instances of this class must be "stateless" (that is, they must not store state in instance variables).
3. QueueProcessor is a generic class that consumes items from a blocking queue containing items of a given type, processing these using a pre-defined IProcessor, and filling another blocking queue with the result. The QueueProcessor fills only non-null values returned by the IProcessor into the output queue. Any exceptions thrown by the "process" method of the IProcessor are ignored. QueueProcessor has a "waitFor" method that indicates that the QueueProcessor must stop after the given number of inputs have been processed.
4. Grep provides a main method that declares and sets up all the components and the queues, parses command line arguments, and launches all the components.
IProcessor
package com.subhajit.util.grep;
import java.lang.reflect.InvocationTargetException;
public interface IProcessor<I,O> {
O process(I input) throws InterruptedException, InvocationTargetException;
}
DirectoryScannerProducer
package com.subhajit.util.grep;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Scans a directory and puts all files (not directories) into a given
* {@link BlockingQueue}.
*
* @author sdasgupta
*
*/
public class DirectoryScannerProducer {
private final Semaphore sem = new Semaphore(1);
private final AtomicInteger waitingCount = new AtomicInteger(0);
private final List<ThreadWithList> threads = new ArrayList<ThreadWithList>();
private final int installmentSize;
private final AtomicInteger producedCount = new AtomicInteger(0);
private static class DirInfo {
private final File dir;
private final File[] listing;
private int index;
public DirInfo(File dir) {
super();
this.dir = dir;
this.listing = dir.listFiles();
this.index = 0;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public File getDir() {
return dir;
}
public File[] getListing() {
return listing;
}
}
private final BlockingQueue<DirInfo> workingQueue;
private final BlockingQueue<File> producedQueue;
private final class ThreadWithList extends Thread {
private int useCount = 0;
public ThreadWithList() {
super();
}
public void incrementUseCount() {
useCount++;
}
public int getUseCount() {
return useCount;
}
public void run() {
while (true) {
if (DirectoryScannerProducer.this.scan0()) {
break;
}
}
}
}
public DirectoryScannerProducer(BlockingQueue<File> producedQueue,
int threads, int installmentSize) {
super();
this.producedQueue = producedQueue;
this.installmentSize = installmentSize;
workingQueue = new LinkedBlockingQueue<DirInfo>();
for (int i = 0; i < threads; i++) {
ThreadWithList t = new ThreadWithList();
this.threads.add(t);
}
}
public int scan(final File dir)
throws InterruptedException, ExecutionException {
sem.acquire();
workingQueue.add(new DirInfo(dir));
for (ThreadWithList t : threads) {
t.start();
}
sem.acquire();
return producedCount.get();
}
private boolean scan0() {
waitingCount.incrementAndGet();
// Remove the next item from the queue.
ThreadWithList thread = ((ThreadWithList) Thread.currentThread());
DirInfo dirInfo = null;
try {
if (waitingCount.get() == threads.size() && workingQueue.isEmpty()) {
sem.release();
return true;
}
dirInfo = workingQueue.take();
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
return true;
} finally {
waitingCount.decrementAndGet();
}
int index = dirInfo.getIndex();
File[] listing = dirInfo.getListing();
int upperBound = Math.min(index + installmentSize, listing.length);
for (int i = index; i < upperBound; i++) {
if (listing[i].isFile()) {
try {
producedQueue.put(listing[i]);
producedCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return true;
}
}
if (listing[i].isDirectory()) {
DirInfo subdirInfo = new DirInfo(listing[i]);
try {
workingQueue.put(subdirInfo);
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
return true;
}
}
}
if (upperBound != listing.length) {
dirInfo.setIndex(upperBound);
workingQueue.add(dirInfo);
}
thread.useCount += (upperBound - index);
return false;
}
public void close() {
for (ThreadWithList t : threads) {
if (t.isAlive()) {
t.interrupt();
}
}
for (ThreadWithList t : threads) {
try {
t.join();
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
return;
}
}
workingQueue.clear();
}
}
QueueProcessor
package com.subhajit.util.grep;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Abstract class models a set of threads consuming inputs of type <tt>I</tt>
* from an input queue, and writing outputs of type <tt>O</tt> to an output
* queue.
*
* @author sdasgupta
*
* @param <I>
* @param <O>
*/
public class QueueProcessor<I, O> {
/**
* Private class polls for and extracts messages from
* {@link QueueProcessor#inputQueue}, processes each message using
* {@link QueueProcessor#process(Object)} and places the resulting object
* (if it is not <tt>null</tt>) into {@link QueueProcessor#outputQueue}.
*
* @author sdasgupta
*/
private final class ConsumerRunnable implements Runnable {
public void run() {
while (true) {
try {
final I input = QueueProcessor.this.inputQueue.poll(100,
TimeUnit.MILLISECONDS);
if (input != null) {
pushService.submit(new Runnable() {
public void run() {
try {
O result = processor.process(input);
if (result != null) {
QueueProcessor.this.outputQueue
.put(result);
producedMessageCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (InvocationTargetException exc) {
return;
}
}
});
consumedMessageCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
/**
* The number of threads consuming messages from
* {@link QueueProcessor#inputQueue}.
*/
private final int threadCount;
/**
* The threads consuming messages from {@link QueueProcessor#inputQueue}.
*/
private final List<Thread> consumerThreads;
/**
* The queue from which input messages are read.
*/
private final BlockingQueue<I> inputQueue;
/**
* The queue to which output messages are written.
*/
private final BlockingQueue<O> outputQueue;
/**
* Processes inputs (pulled from the <tt>inputQueue</tt>) and pushes the
* results (obtained by calling {@link #process(Object)}) to the
* <tt>outputQueue</tt>.
*/
private final ExecutorService pushService;
/**
* Counts the total number of consumed messages.
*/
private final AtomicInteger consumedMessageCount = new AtomicInteger(0);
/**
* Counts the total number of produced messages.
*/
private final AtomicInteger producedMessageCount = new AtomicInteger(0);
private final IProcessor<I, O> processor;
/**
* Public constructor.
*
* @param inputQueue
* @param outputQueue
* @param threadCount
*/
public QueueProcessor(BlockingQueue<I> inputQueue,
BlockingQueue<O> outputQueue, int threadCount,
IProcessor<I, O> processor) {
super();
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
this.threadCount = threadCount;
this.processor = processor;
pushService = Executors.newFixedThreadPool(10);
consumerThreads = new ArrayList<Thread>();
}
// protected abstract O process(I input) throws InterruptedException,
// InvocationTargetException;
public void startup() {
// Create the consumer threads.
for (int i = 0; i < this.threadCount; i++) {
consumerThreads.add(new Thread(new ConsumerRunnable()));
}
for (Thread thread : consumerThreads) {
thread.start();
}
}
/**
* Waits for <tt>inputMessageCount</tt> messages to be processed, invokes
* {@link QueueProcessor#shutdown()}, and returns
* {@link QueueProcessor#producedMessageCount}.
*
* @param inputMessageCount
* @return
* @throws InterruptedException
*/
public int waitFor(int inputMessageCount) throws InterruptedException {
while (true) {
if (consumedMessageCount.get() >= inputMessageCount) {
break;
}
Thread.sleep(100);
}
shutdown();
return producedMessageCount.get();
}
/**
* Issues a shutdown request to the {@link QueueProcessor#pushService},
* waits for that service to shut down, interrupts the
* {@link QueueProcessor#consumerThreads}, waits for those threads to shut
* down, then returns.
*
* <p>
* All messages which have already been consumed are processed.
* </p>
*
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
pushService.shutdown();
while (true) {
if (pushService.isTerminated()) {
break;
}
Thread.sleep(100);
}
for (Thread thread : consumerThreads) {
thread.interrupt();
}
for (Thread thread : consumerThreads) {
thread.join();
}
}
}
Grep
package com.subhajit.util.grep;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import com.subhajit.argutils.ArgumentUtils;
import com.sun.idm.svc.util.common.CommonUtils;
import com.sun.idm.svc.util.streams.FileUtils;
public class Grep {
private static class FileContent {
private final File file;
private final String content;
public FileContent(File file, String content) {
super();
this.file = file;
this.content = content;
}
public File getFile() {
return file;
}
public String getContent() {
return content;
}
}
private static enum Argument {
text, dir, pattern
}
private static Map<Argument, ?> parseCommandLineArguments(String[] args)
throws IOException {
final String argumentSpec = "d:string:true:Base directory under which to search|"
+ "f:string:true:Comma separated file name patterns to search (eg. .java,.properties,.xml)|"
+ "i:boolean:false:Ignore case|"
+ "names:boolean:false:Show file names only if \"true\", else show verbose output";
if (args.length == 0) {
System.out
.println("Usage:\nOption\t\tType\tRequired\tDescription\n"
+ ArgumentUtils.getUsage(argumentSpec));
System.exit(1);
}
Map<String, String> map = ArgumentUtils.parseArgs(args, argumentSpec);
if (!map.containsKey(CommonUtils.UNBOUND_ARGUMENT)) {
throw new IllegalArgumentException(
"Cannot continue, since search terms have not been specified.");
}
final String text = map.get(CommonUtils.UNBOUND_ARGUMENT);
final File dir = new File(map.get("d")).getCanonicalFile();
final String pattern = map.get("f");
Map<Argument, Object> ret = new HashMap<Argument, Object>();
ret.put(Argument.dir, dir);
ret.put(Argument.pattern, pattern);
ret.put(Argument.text, text);
return ret;
}
public static void main(String[] args) {
int status = 0;
long t0 = System.nanoTime();
try {
// Parse the command line arguments.
Map<Argument, ?> commandLineArguments = parseCommandLineArguments(args);
final String text = (String) commandLineArguments
.get(Argument.text);
final File dir = (File) commandLineArguments.get(Argument.dir);
final String fileNameInput = (String) commandLineArguments
.get(Argument.pattern);
// Setup the queue of files. This queue is populated by the
// directory scanner with all files and directories found under the
// base directory we are scanning.
BlockingQueue<File> fileQueue = new ArrayBlockingQueue<File>(5000);
final DirectoryScannerProducer scanner = new DirectoryScannerProducer(
fileQueue, 5, 20);
// Setup the content queue. This queue contains FileContent objects
// embodying the content of files read from the fileQueue that match
// the file name pattern we are interested in. This queue is
// populated by the fileReaders object.
final BlockingQueue<FileContent> contentQueue = new LinkedBlockingQueue<FileContent>();
final QueueProcessor<File, FileContent> fileReaders = new QueueProcessor<File, FileContent>(
fileQueue, contentQueue, 5,
new IProcessor<File, FileContent>() {
public FileContent process(File input)
throws InterruptedException,
InvocationTargetException {
try {
if (input.isDirectory()) {
return null;
}
if (!input.getName().endsWith(fileNameInput)) {
return null;
}
return new FileContent(input, new String(
FileUtils.loadFile(input)));
} catch (IOException exc) {
throw new InvocationTargetException(exc);
}
}
});
// Setup the output queue containing FileContent objects
// representing matches. The output queue is populated by the file
// finders object.
final BlockingQueue<FileContent> outputQueue = new LinkedBlockingQueue<FileContent>();
final QueueProcessor<FileContent, FileContent> fileFinders = new QueueProcessor<FileContent, FileContent>(
contentQueue, outputQueue, 5,
new IProcessor<FileContent, FileContent>() {
public FileContent process(FileContent input)
throws InterruptedException,
InvocationTargetException {
if (input.getContent().contains(text)) {
return input;
} else {
return null;
}
}
});
// Start the file readers.
fileReaders.startup();
// Start the file finders.
fileFinders.startup();
// Start the thread that dumps the matching output to the console.
final AtomicInteger dumpedCount = new AtomicInteger(0);
final ExecutorService dumpService = Executors
.newFixedThreadPool(10);
Thread dumpingThread = new Thread(new Runnable() {
public void run() {
try {
while (true) {
final FileContent matchingFileInfo = outputQueue
.take();
dumpService.submit(new Runnable() {
public void run() {
System.out.println(matchingFileInfo
.getFile());
}
});
dumpedCount.incrementAndGet();
}
} catch (InterruptedException exc) {
System.out.println("Done (" + dumpedCount.get() + ")");
Thread.currentThread().interrupt();
return;
}
}
});
dumpingThread.start();
// Start the directory scanner.
int workItemCount0 = scanner.scan(dir);
// Wait for the file readers to process its inputs.
int workItemCount1 = fileReaders.waitFor(workItemCount0);
// Wait for the file finders object to process its inputs.
int workItemCount2 = fileFinders.waitFor(workItemCount1);
// Interrupt the dumping thread once it has printed all output.
while (true) {
if (dumpedCount.get() == workItemCount2) {
dumpingThread.interrupt();
break;
}
Thread.sleep(100);
}
dumpingThread.join();
dumpService.shutdownNow();
} catch (Throwable exc) {
status = 1;
exc.printStackTrace();
} finally {
t0 = System.nanoTime() - t0;
System.out.println((t0 / 1000000) + " ms.");
System.exit(status);
}
}
}
What do you think?
|
Code wont build. It is missing:
import com.subhajit.argutils.ArgumentUtils;
import com.sun.idm.svc.util.common.CommonUtils;
import com.sun.idm.svc.util.streams.FileUtils;
Posted by sk on May 18, 2009 at 11:19 AM CDT #
Oh, I see you put some significant processing per file after all, so you'll surely benefit from fair work partition.
Maybe a next post talking about "a faster grep" is in order: compile the searched pattern as:
Pattern pattern = Pattern.compile("\Q" + text + "\E"); //if my memory serves right
(this is immutable/thread-safe).
boolean found = pattern.matcher(input).find()
This has the benefit of using the fast Boyer-Moore string matching algorithm, which should be beneficial for highly reused patterns (there is some preprocessing needed to be able to safely skip some portions of input text while searching).
Also, reading the whole file contents into a string before searching is not too memory-usage-friendly. Maybe line by line? (It would be fun though, searching for "lines" in a binary file :-/ )
That's all for now. Bye!
Posted by Dimitris Andreou on May 19, 2009 at 03:27 AM CDT #
Dimitris,
Once again, thanks for your inputs. About reading the entire file in memory before searching it, I am seeking to use fast nio-based file loading to beat the inherent slowness of disk interaction. But you have a point, so let me think about how to try to reduce the overhead of reading the entire file. Also, I did play a bit with using regular expressions to search the file contents, but the (user side) syntax became somewhat clumsy...again, I will play with this some more to see how best to do this. Finally, my reasoning about ignoring memory consumption is that this procedure is supposed to be used in a script, not in a long running process. Again, thanks for pointing out some things worth investigating...
Posted by Subhajit Dasgupta on May 19, 2009 at 05:09 AM CDT #
Whoa! Dimitris' pointer about using the Boyer Moore algorithms pays off handsomely: will post an update sometime later today.
Posted by Subhajit Dasgupta on May 19, 2009 at 07:15 AM CDT #
I'm not sure I understand what you mean by "clumsy" in:
>I did play a bit with using regular expressions to search the file contents, but the (user side) syntax became somewhat clumsy
Enclosing a string in \Q and \E (oh, I should have written \\Q and \\E there, right) means that it will be treated as a literal to be searched verbatim; no need to escape "special" characters of regex. So, except for \Q and \E, this doesn't look any different than what you were passing to String.contains() method.
Have fun :)
Posted by Dimitris Andreou on May 19, 2009 at 04:05 PM CDT #