diff --git a/src/edu/jhu/thrax/lexprob/SequenceFileLexprobTable.java b/src/edu/jhu/thrax/lexprob/SequenceFileLexprobTable.java index 65cbd14..0b6398f 100644 --- a/src/edu/jhu/thrax/lexprob/SequenceFileLexprobTable.java +++ b/src/edu/jhu/thrax/lexprob/SequenceFileLexprobTable.java @@ -2,16 +2,18 @@ import java.io.IOException; import java.net.URI; +import java.util.Arrays; import java.util.Iterator; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; +import edu.jhu.thrax.util.ChainedIterators; + /** * A base class for lexical probability tables that will be read from a Hadoop sequence file that is * held on disk. This class serves to hide all the horrible Hadoop filesystem plumbing from more @@ -30,6 +32,7 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx fs = FileSystem.get(uri, conf); files = fs.globStatus(new Path(fileGlob)); if (files.length == 0) throw new IOException("no files found in lexprob glob:" + fileGlob); + Arrays.sort(files); // some implementations (like local FS) don't return a sorted list of files } protected abstract void initialize(Iterable entries); @@ -37,7 +40,7 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx public abstract float get(int car, int cdr); public abstract boolean contains(int car, int cdr); - + /** * Return an Iterable that will range over all the entries in a series of globbed files. * @@ -46,77 +49,23 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx * @param files an array of FileStatus from getGlobStatus * @return an Iterable over all entries in all files in the files glob */ - protected static Iterable getSequenceFileIterator(FileSystem theFS, + protected static Iterable getSequenceFileIterator(FileSystem fs, Configuration conf, FileStatus[] files) { - final LongWritable pair = new LongWritable(); - final FloatWritable d = new FloatWritable(0.0f); - final FileStatus[] theFiles = files; - final Configuration theConf = conf; - final FileSystem fs = theFS; - - final Iterator iterator = new Iterator() { - int fileIndex = 0; - TableEntry lookahead = null; - SequenceFile.Reader reader = null; - - public boolean hasNext() { - try { - // if we've already peeked at the next entry, it can be - // returned - if (lookahead != null) return true; - // if the reader is null, we haven't looked at a single - // file yet, so set the reader to read the first file - if (reader == null) reader = new SequenceFile.Reader(fs, theFiles[0].getPath(), theConf); - // reader is not null here, so try to read an entry - boolean gotNew = reader.next(pair, d); - if (gotNew) { - // there was something to read - lookahead = new TableEntry(pair, d); - return true; - } - fileIndex++; - // else, move to the next file - // but if there are no more, return false - if (fileIndex >= theFiles.length) return false; - reader.close(); - reader = new SequenceFile.Reader(fs, theFiles[fileIndex].getPath(), theConf); - // new file, so try again - gotNew = reader.next(pair, d); - if (gotNew) { - lookahead = new TableEntry(pair, d); - return true; - } - return false; - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - public TableEntry next() { - try { - // return the lookahead, if possible - if (lookahead != null) { - TableEntry val = lookahead; - lookahead = null; - return val; - } - boolean gotNew = reader.next(pair, d); - if (gotNew) - return new TableEntry(pair, d); - else - return null; - } catch (IOException e) { - throw new IllegalArgumentException(); - } - } - - public void remove() { - throw new UnsupportedOperationException(); - } - }; return new Iterable() { + + @Override public Iterator iterator() { - return iterator; + Iterator> fileIterators = Arrays.asList(files).stream() + .map(file -> { + try { + return new SequenceFile.Reader(fs, file.getPath(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .map(seqFile -> new SequenceFileTableEntryIterator(seqFile)) + .collect(Collectors.toList()).iterator(); + return new ChainedIterators(fileIterators); } }; } diff --git a/src/edu/jhu/thrax/lexprob/SequenceFileTableEntryIterator.java b/src/edu/jhu/thrax/lexprob/SequenceFileTableEntryIterator.java new file mode 100644 index 0000000..44492dd --- /dev/null +++ b/src/edu/jhu/thrax/lexprob/SequenceFileTableEntryIterator.java @@ -0,0 +1,67 @@ +package edu.jhu.thrax.lexprob; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; + +public class SequenceFileTableEntryIterator implements Iterator { + + private final SequenceFile.Reader reader; + + private final LongWritable pair = new LongWritable(); + private final FloatWritable d = new FloatWritable(0.0f); + + private Optional lookahead = Optional.empty(); + private boolean finishedReading = false; + + public SequenceFileTableEntryIterator(SequenceFile.Reader reader) { + this.reader = reader; + } + + @Override + public boolean hasNext() { + if (lookahead.isPresent()) { + return true; + } + lookahead = tryReadNext(); + if (lookahead.isPresent()) { + return true; + } else { + return false; + } + } + + @Override + public TableEntry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + TableEntry nextEntry = lookahead.get(); + lookahead = Optional.empty(); + return nextEntry; + } + + private Optional tryReadNext() { + if (finishedReading) { + return Optional.empty(); + } + try { + boolean gotNew = reader.next(pair, d); + if (gotNew) { + // there was something to read + return Optional.of(new TableEntry(pair, d)); + } else { + finishedReading = true; + return Optional.empty(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/src/edu/jhu/thrax/util/ChainedIterators.java b/src/edu/jhu/thrax/util/ChainedIterators.java new file mode 100644 index 0000000..1c101f2 --- /dev/null +++ b/src/edu/jhu/thrax/util/ChainedIterators.java @@ -0,0 +1,55 @@ +package edu.jhu.thrax.util; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class ChainedIterators implements Iterator { + + private Iterator> iteratorOfIterators; + private Iterator currentIterator; + private boolean finished = false; + + public ChainedIterators(Iterator> iteratorOfIterators) { + this.iteratorOfIterators = iteratorOfIterators; + moveToNextIterator(); + } + + public ChainedIterators(Collection> iteratorOfIterators) { + this.iteratorOfIterators = iteratorOfIterators.iterator(); + moveToNextIterator(); + } + + @Override + public boolean hasNext() { + if (finished) { + return false; + } + if (currentIterator.hasNext()) { + return true; + } else { + moveToNextIterator(); + return !finished; + } + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentIterator.next(); + } + + private void moveToNextIterator() { + while (iteratorOfIterators.hasNext()) { + currentIterator = iteratorOfIterators.next(); + if (currentIterator.hasNext()) { + finished = false; + return; + } + } + finished = true; + } + +}