Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,32 @@ public long count(Class<?> type, String index, Object indexedValue) throws Excep

@Override
public void close() throws IOException {
DB _db = this._db.getAndSet(null);
if (_db == null) {
return;
synchronized (this._db) {
DB _db = this._db.getAndSet(null);
if (_db == null) {
return;
}

try {
_db.close();
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
}
}

try {
_db.close();
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
/**
* Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator it) throws IOException {
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
it.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ public synchronized void close() throws IOException {
}
}

/**
* Because it's tricky to expose closeable iterators through many internal APIs, especially
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@Override
protected void finalize() throws Throwable {
db.closeIterator(this);
}

private byte[] loadNext() {
if (count >= max) {
return null;
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kvstore_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_${scala.binary.version}</artifactId>
Expand Down
Loading