Skip to content

Commit

Permalink
close implicitly create tagTranslator after terminal operation
Browse files Browse the repository at this point in the history
Such as count(), sum(), …, reduce(), or stream().close(). This avoids dangling prepared statements to lie around after a query was performed. This is a regression/consequence of #224.

This also fixes the fact that in the current implementation, because of a typo/bug the tagInterpreter objects themselves were never persisted when calling `osmTag` methods. :-/

Note that this solution passes all tests, but doesn't work in the following simple example!

  @test
  public void testFoo() throws Exception {
    MapReducer<OSMContribution> bar = createMapReducerOSMContribution()
        .timestamps(timestamps72)
        .osmEntityFilter(entity -> entity.getId() == 617308093)
        .osmTag("highway");
    assertTrue(bar.count() > -1);
    assertTrue(bar.osmTag("highway", "traffic_signals").count() > -1);
  }

Here, a runtime exception will be thrown, because when calling the first `bar.count()`, the internal tragTranslator of `bar` is closed and further tag translation requests cannot be answered anymore (as performed in the last line).
  • Loading branch information
tyrasd authored and joker234 committed Oct 29, 2020
1 parent 5a73b2d commit 739fa94
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ protected MapReducer(MapReducer<?> obj) {
@NotNull
protected abstract MapReducer<X> copy();

private void close() {
if (this.tagTranslator != null) {
try {
this.tagTranslator.close();
this.tagTranslator = null;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

// -----------------------------------------------------------------------------------------------
// "Setting" methods and associated internal helpers
// -----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -467,7 +478,7 @@ public MapReducer<X> osmTag(OSMTagInterface tag) {
@Contract(pure = true)
private MapReducer<X> osmTag(OSMTagKey key) {
MapReducer<X> ret = this.copy();
OSHDBTagKey keyId = this.getTagTranslator().getOSHDBTagKeyOf(key);
OSHDBTagKey keyId = ret.getTagTranslator().getOSHDBTagKeyOf(key);
if (!keyId.isPresentInKeytables()) {
LOG.warn("Tag key {} not found. No data will match this filter.", key.toString());
ret.preFilters.add(ignored -> false);
Expand Down Expand Up @@ -502,7 +513,7 @@ public MapReducer<X> osmTag(String key, String value) {
@Contract(pure = true)
private MapReducer<X> osmTag(OSMTag tag) {
MapReducer<X> ret = this.copy();
OSHDBTag keyValueId = this.getTagTranslator().getOSHDBTagOf(tag);
OSHDBTag keyValueId = ret.getTagTranslator().getOSHDBTagOf(tag);
if (!keyValueId.isPresentInKeytables()) {
LOG.warn("Tag {}={} not found. No data will match this filter.",
tag.getKey(), tag.getValue());
Expand All @@ -527,7 +538,7 @@ private MapReducer<X> osmTag(OSMTag tag) {
@Contract(pure = true)
public MapReducer<X> osmTag(String key, Collection<String> values) {
MapReducer<X> ret = this.copy();
OSHDBTagKey oshdbKey = this.getTagTranslator().getOSHDBTagKeyOf(key);
OSHDBTagKey oshdbKey = ret.getTagTranslator().getOSHDBTagKeyOf(key);
int keyId = oshdbKey.toInt();
if (!oshdbKey.isPresentInKeytables() || values.size() == 0) {
LOG.warn((values.size() > 0 ? "Tag key {} not found." : "Empty tag value list.")
Expand All @@ -538,7 +549,7 @@ public MapReducer<X> osmTag(String key, Collection<String> values) {
}
Set<Integer> valueIds = new HashSet<>();
for (String value : values) {
OSHDBTag keyValueId = this.getTagTranslator().getOSHDBTagOf(key, value);
OSHDBTag keyValueId = ret.getTagTranslator().getOSHDBTagOf(key, value);
if (!keyValueId.isPresentInKeytables()) {
LOG.warn("Tag {}={} not found. No data will match this tag value.", key, value);
} else {
Expand Down Expand Up @@ -572,7 +583,7 @@ public MapReducer<X> osmTag(String key, Collection<String> values) {
@Contract(pure = true)
public MapReducer<X> osmTag(String key, Pattern valuePattern) {
MapReducer<X> ret = this.copy();
OSHDBTagKey oshdbKey = this.getTagTranslator().getOSHDBTagKeyOf(key);
OSHDBTagKey oshdbKey = ret.getTagTranslator().getOSHDBTagKeyOf(key);
int keyId = oshdbKey.toInt();
if (!oshdbKey.isPresentInKeytables()) {
LOG.warn("Tag key {} not found. No data will match this filter.", key);
Expand All @@ -588,7 +599,7 @@ public MapReducer<X> osmTag(String key, Pattern valuePattern) {
return false;
}
if (tags[i] == keyId) {
String value = this.getTagTranslator().getOSMTagOf(keyId, tags[i + 1]).getValue();
String value = ret.getTagTranslator().getOSMTagOf(keyId, tags[i + 1]).getValue();
return valuePattern.matcher(value).matches();
}
}
Expand Down Expand Up @@ -622,7 +633,7 @@ public MapReducer<X> osmTag(Collection<? extends OSMTagInterface> tags) {
for (OSMTagInterface tag : tags) {
if (tag instanceof OSMTag) {
OSMTag keyValue = (OSMTag) tag;
OSHDBTag keyValueId = this.getTagTranslator().getOSHDBTagOf(keyValue);
OSHDBTag keyValueId = ret.getTagTranslator().getOSHDBTagOf(keyValue);
if (!keyValueId.isPresentInKeytables()) {
LOG.warn("Tag {}={} not found. No data will match this tag value.",
keyValue.getKey(), keyValue.getValue());
Expand All @@ -631,7 +642,7 @@ public MapReducer<X> osmTag(Collection<? extends OSMTagInterface> tags) {
keyValueIds.add(keyValueId);
}
} else {
OSHDBTagKey keyId = this.getTagTranslator().getOSHDBTagKeyOf((OSMTagKey) tag);
OSHDBTagKey keyId = ret.getTagTranslator().getOSHDBTagKeyOf((OSMTagKey) tag);
preKeyIds.add(keyId.toInt());
keyIds.add(keyId.toInt());
}
Expand Down Expand Up @@ -985,6 +996,7 @@ public <S> S reduce(
SerializableBinaryOperator<S> combiner)
throws Exception {
checkTimeout();
final S result;
switch (this.grouping) {
case NONE:
if (this.mappers.stream().noneMatch(MapFunction::isFlatMapper)) {
Expand All @@ -994,47 +1006,51 @@ public <S> S reduce(
// having just `mapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37
final SerializableFunction<OSMContribution, X> contributionMapper =
data -> mapper.apply(data);
return this.mapReduceCellsOSMContribution(
result = this.mapReduceCellsOSMContribution(
contributionMapper,
identitySupplier,
accumulator,
combiner
);
break;
} else if (this.forClass.equals(OSMEntitySnapshot.class)) {
@SuppressWarnings("Convert2MethodRef")
// having just `mapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37
final SerializableFunction<OSMEntitySnapshot, X> snapshotMapper =
data -> mapper.apply(data);
return this.mapReduceCellsOSMEntitySnapshot(
result = this.mapReduceCellsOSMEntitySnapshot(
snapshotMapper,
identitySupplier,
accumulator,
combiner
);
break;
} else {
throw new UnsupportedOperationException(
"Unimplemented data view: " + this.forClass.toString());
}
} else {
final SerializableFunction<Object, Iterable<X>> flatMapper = this.getFlatMapper();
if (this.forClass.equals(OSMContribution.class)) {
return this.flatMapReduceCellsOSMContributionGroupedById(
result = this.flatMapReduceCellsOSMContributionGroupedById(
(List<OSMContribution> inputList) -> {
List<X> outputList = new LinkedList<>();
inputList.stream()
.map((SerializableFunction<OSMContribution, Iterable<X>>) flatMapper::apply)
.forEach(data -> Iterables.addAll(outputList, data));
return outputList;
}, identitySupplier, accumulator, combiner);
break;
} else if (this.forClass.equals(OSMEntitySnapshot.class)) {
return this.flatMapReduceCellsOSMEntitySnapshotGroupedById(
result = this.flatMapReduceCellsOSMEntitySnapshotGroupedById(
(List<OSMEntitySnapshot> inputList) -> {
List<X> outputList = new LinkedList<>();
inputList.stream()
.map((SerializableFunction<OSMEntitySnapshot, Iterable<X>>) flatMapper::apply)
.forEach(data -> Iterables.addAll(outputList, data));
return outputList;
}, identitySupplier, accumulator, combiner);
break;
} else {
throw new UnsupportedOperationException(
"Unimplemented data view: " + this.forClass.toString());
Expand All @@ -1055,23 +1071,25 @@ public <S> S reduce(
// having just `flatMapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37
final SerializableFunction<List<OSMContribution>, Iterable<X>> contributionFlatMapper =
data -> flatMapper.apply(data);
return this.flatMapReduceCellsOSMContributionGroupedById(
result = this.flatMapReduceCellsOSMContributionGroupedById(
contributionFlatMapper,
identitySupplier,
accumulator,
combiner
);
break;
} else if (this.forClass.equals(OSMEntitySnapshot.class)) {
@SuppressWarnings("Convert2MethodRef")
// having just `flatMapper::apply` here is problematic, see https://github.com/GIScience/oshdb/pull/37
final SerializableFunction<List<OSMEntitySnapshot>, Iterable<X>> snapshotFlatMapper =
data -> flatMapper.apply(data);
return this.flatMapReduceCellsOSMEntitySnapshotGroupedById(
result = this.flatMapReduceCellsOSMEntitySnapshotGroupedById(
snapshotFlatMapper,
identitySupplier,
accumulator,
combiner
);
break;
} else {
throw new UnsupportedOperationException(
"Unimplemented data view: " + this.forClass.toString());
Expand All @@ -1080,6 +1098,8 @@ public <S> S reduce(
throw new UnsupportedOperationException(
"Unsupported grouping: " + this.grouping.toString());
}
this.close();
return result;
}

/**
Expand Down Expand Up @@ -1470,7 +1490,7 @@ public List<X> collect() throws Exception {
@Contract(pure = true)
public Stream<X> stream() throws Exception {
try {
return this.streamInternal();
return this.streamInternal().onClose(this::close);
} catch (UnsupportedOperationException e) {
LOG.info("stream not directly supported by chosen backend, falling back to "
+ ".collect().stream()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.common.collect.Streams;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -165,7 +164,7 @@ private <S> S reduce(
CellProcessor<S> cellProcessor,
SerializableSupplier<S> identitySupplier,
SerializableBinaryOperator<S> combiner
) throws ParseException, SQLException, IOException {
) throws ParseException, IOException {
this.executionStartTimeMillis = System.currentTimeMillis();

CellIterator cellIterator = new CellIterator(
Expand Down Expand Up @@ -217,7 +216,7 @@ private <S> S reduce(
*/
private Stream<X> stream(
CellProcessor<Stream<X>> cellProcessor
) throws ParseException, SQLException, IOException {
) throws ParseException, IOException {
this.executionStartTimeMillis = System.currentTimeMillis();

CellIterator cellIterator = new CellIterator(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.heigit.bigspatialdata.oshdb.api.mapreducer.backend;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
Expand All @@ -20,7 +19,6 @@
import org.jetbrains.annotations.NotNull;
import org.json.simple.parser.ParseException;


public class MapReducerJdbcMultithread<X> extends MapReducerJdbc<X> {
public MapReducerJdbcMultithread(OSHDBDatabase oshdb,
Class<? extends OSHDBMapReducible> forClass) {
Expand All @@ -47,7 +45,7 @@ private <S> S reduce(
CellProcessor<S> processor,
SerializableSupplier<S> identitySupplier,
SerializableBinaryOperator<S> combiner
) throws ParseException, SQLException, IOException {
) throws ParseException, IOException {
this.executionStartTimeMillis = System.currentTimeMillis();

CellIterator cellIterator = new CellIterator(
Expand All @@ -69,7 +67,7 @@ private <S> S reduce(

private Stream<X> stream(
CellProcessor<Stream<X>> processor
) throws ParseException, SQLException, IOException {
) throws ParseException, IOException {
this.executionStartTimeMillis = System.currentTimeMillis();

CellIterator cellIterator = new CellIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private <S> S reduce(

private Stream<X> stream(
CellProcessor<Stream<X>> cellProcessor
) throws ParseException, SQLException, IOException, ClassNotFoundException {
) throws ParseException, IOException {
this.executionStartTimeMillis = System.currentTimeMillis();

CellIterator cellIterator = new CellIterator(
Expand Down

0 comments on commit 739fa94

Please sign in to comment.