Skip to content

Commit

Permalink
Refactoring of sequence file iterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Domhan committed Dec 1, 2016
1 parent 0d766be commit 705fdb7
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 71 deletions.
91 changes: 20 additions & 71 deletions src/edu/jhu/thrax/lexprob/SequenceFileLexprobTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Iterator;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;

import edu.jhu.thrax.util.ChainedIterators;

/**
* A base class for lexical probability tables that will be read from a Hadoop sequence file that is
* held on disk. This class serves to hide all the horrible Hadoop filesystem plumbing from more
Expand All @@ -30,14 +32,15 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx
fs = FileSystem.get(uri, conf);
files = fs.globStatus(new Path(fileGlob));
if (files.length == 0) throw new IOException("no files found in lexprob glob:" + fileGlob);
Arrays.sort(files); // some implementations (like local FS) don't return a sorted list of files
}

protected abstract void initialize(Iterable<TableEntry> entries);

public abstract float get(int car, int cdr);

public abstract boolean contains(int car, int cdr);

/**
* Return an Iterable that will range over all the entries in a series of globbed files.
*
Expand All @@ -46,77 +49,23 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx
* @param files an array of FileStatus from getGlobStatus
* @return an Iterable over all entries in all files in the files glob
*/
protected static Iterable<TableEntry> getSequenceFileIterator(FileSystem theFS,
protected static Iterable<TableEntry> getSequenceFileIterator(FileSystem fs,
Configuration conf, FileStatus[] files) {
final LongWritable pair = new LongWritable();
final FloatWritable d = new FloatWritable(0.0f);
final FileStatus[] theFiles = files;
final Configuration theConf = conf;
final FileSystem fs = theFS;

final Iterator<TableEntry> iterator = new Iterator<TableEntry>() {
int fileIndex = 0;
TableEntry lookahead = null;
SequenceFile.Reader reader = null;

public boolean hasNext() {
try {
// if we've already peeked at the next entry, it can be
// returned
if (lookahead != null) return true;
// if the reader is null, we haven't looked at a single
// file yet, so set the reader to read the first file
if (reader == null) reader = new SequenceFile.Reader(fs, theFiles[0].getPath(), theConf);
// reader is not null here, so try to read an entry
boolean gotNew = reader.next(pair, d);
if (gotNew) {
// there was something to read
lookahead = new TableEntry(pair, d);
return true;
}
fileIndex++;
// else, move to the next file
// but if there are no more, return false
if (fileIndex >= theFiles.length) return false;
reader.close();
reader = new SequenceFile.Reader(fs, theFiles[fileIndex].getPath(), theConf);
// new file, so try again
gotNew = reader.next(pair, d);
if (gotNew) {
lookahead = new TableEntry(pair, d);
return true;
}
return false;
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

public TableEntry next() {
try {
// return the lookahead, if possible
if (lookahead != null) {
TableEntry val = lookahead;
lookahead = null;
return val;
}
boolean gotNew = reader.next(pair, d);
if (gotNew)
return new TableEntry(pair, d);
else
return null;
} catch (IOException e) {
throw new IllegalArgumentException();
}
}

public void remove() {
throw new UnsupportedOperationException();
}
};
return new Iterable<TableEntry>() {

@Override
public Iterator<TableEntry> iterator() {
return iterator;
Iterator<? extends Iterator<TableEntry>> fileIterators = Arrays.asList(files).stream()
.map(file -> {
try {
return new SequenceFile.Reader(fs, file.getPath(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(seqFile -> new SequenceFileTableEntryIterator(seqFile))
.collect(Collectors.toList()).iterator();
return new ChainedIterators<TableEntry>(fileIterators);
}
};
}
Expand Down
67 changes: 67 additions & 0 deletions src/edu/jhu/thrax/lexprob/SequenceFileTableEntryIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package edu.jhu.thrax.lexprob;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;

public class SequenceFileTableEntryIterator implements Iterator<TableEntry> {

private final SequenceFile.Reader reader;

private final LongWritable pair = new LongWritable();
private final FloatWritable d = new FloatWritable(0.0f);

private Optional<TableEntry> lookahead = Optional.empty();
private boolean finishedReading = false;

public SequenceFileTableEntryIterator(SequenceFile.Reader reader) {
this.reader = reader;
}

@Override
public boolean hasNext() {
if (lookahead.isPresent()) {
return true;
}
lookahead = tryReadNext();
if (lookahead.isPresent()) {
return true;
} else {
return false;
}
}

@Override
public TableEntry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
TableEntry nextEntry = lookahead.get();
lookahead = Optional.empty();
return nextEntry;
}

private Optional<TableEntry> tryReadNext() {
if (finishedReading) {
return Optional.empty();
}
try {
boolean gotNew = reader.next(pair, d);
if (gotNew) {
// there was something to read
return Optional.of(new TableEntry(pair, d));
} else {
finishedReading = true;
return Optional.empty();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
55 changes: 55 additions & 0 deletions src/edu/jhu/thrax/util/ChainedIterators.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package edu.jhu.thrax.util;

import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;

public class ChainedIterators<T> implements Iterator<T> {

private Iterator<? extends Iterator<T>> iteratorOfIterators;
private Iterator<T> currentIterator;
private boolean finished = false;

public ChainedIterators(Iterator<? extends Iterator<T>> iteratorOfIterators) {
this.iteratorOfIterators = iteratorOfIterators;
moveToNextIterator();
}

public ChainedIterators(Collection<? extends Iterator<T>> iteratorOfIterators) {
this.iteratorOfIterators = iteratorOfIterators.iterator();
moveToNextIterator();
}

@Override
public boolean hasNext() {
if (finished) {
return false;
}
if (currentIterator.hasNext()) {
return true;
} else {
moveToNextIterator();
return !finished;
}
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentIterator.next();
}

private void moveToNextIterator() {
while (iteratorOfIterators.hasNext()) {
currentIterator = iteratorOfIterators.next();
if (currentIterator.hasNext()) {
finished = false;
return;
}
}
finished = true;
}

}

0 comments on commit 705fdb7

Please sign in to comment.