Skip to content

Commit

Permalink
Implement external ID lookup for SQL mode (#3496)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld authored and thoniTUB committed Jul 25, 2024
1 parent a2aad8a commit 12b0497
Show file tree
Hide file tree
Showing 29 changed files with 639 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept;
import com.bakdata.conquery.apiv1.query.concept.specific.CQOr;
import com.bakdata.conquery.apiv1.query.concept.specific.external.CQExternal;
import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolver;
import com.bakdata.conquery.io.result.ResultRender.ResultRendererProvider;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.metrics.ExecutionMetrics;
Expand Down Expand Up @@ -108,7 +109,6 @@ public class QueryProcessor {
private Validator validator;



public Stream<ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
final Collection<ManagedExecution> allQueries = storage.getAllExecutions();

Expand Down Expand Up @@ -294,14 +294,13 @@ public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject su
public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, ExternalUpload upload) {

final Namespace namespace = datasetRegistry.get(dataset.getId());
final CQExternal.ResolveStatistic statistic = CQExternal.resolveEntities(
final EntityResolver.ResolveStatistic statistic = namespace.getEntityResolver().resolveEntities(
upload.getValues(),
upload.getFormat(),
namespace.getStorage().getIdMapping(),
config.getIdColumns(),
config.getLocale().getDateReader(),
upload.isOneRowPerEntity(),
true
upload.isOneRowPerEntity()
);

// Resolving nothing is a problem thus we fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.bakdata.conquery.apiv1.query.CQElement;
Expand All @@ -21,7 +19,6 @@
import com.bakdata.conquery.models.config.IdColumnConfig;
import com.bakdata.conquery.models.error.ConqueryError;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap;
import com.bakdata.conquery.models.query.QueryExecutionContext;
import com.bakdata.conquery.models.query.QueryPlanContext;
import com.bakdata.conquery.models.query.QueryResolveContext;
Expand All @@ -33,16 +30,12 @@
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.resultinfo.SimpleResultInfo;
import com.bakdata.conquery.models.types.ResultType;
import com.bakdata.conquery.util.DateReader;
import com.bakdata.conquery.util.io.IdColumnUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonView;
import com.google.common.collect.Streams;
import io.dropwizard.validation.ValidationMethod;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -55,8 +48,6 @@
@NoArgsConstructor
public class CQExternal extends CQElement {

private static final String FORMAT_EXTRA = "EXTRA";

/**
* Describes the format of {@code values}, how to extract data from each row:
* <p>
Expand Down Expand Up @@ -118,7 +109,7 @@ public QPNode createQueryPlan(QueryPlanContext context, ConceptQueryPlan plan) {
final String[] extraHeaders = Streams.zip(
Arrays.stream(headers),
format.stream(),
(header, format) -> format.equals(FORMAT_EXTRA) ? header : null
(header, format) -> format.equals(EntityResolverUtil.FORMAT_EXTRA) ? header : null
)
.filter(Objects::nonNull)
.toArray(String[]::new);
Expand Down Expand Up @@ -180,78 +171,18 @@ private ExternalNode<List<String>> createExternalNodeForList(QueryPlanContext co
);
}

/**
* For each row try and collect all dates.
*
* @return Row -> Dates
*/
private static CDateSet[] readDates(String[][] values, List<String> format, DateReader dateReader) {
final CDateSet[] out = new CDateSet[values.length];

final List<DateFormat> dateFormats = format.stream()
.map(CQExternal::resolveDateFormat)
// Don't use Stream#toList to preserve null-values
.collect(Collectors.toList());


/*
If no format is provided, put empty dates into output.
This indicates that no date context was provided and
the entries are not restricted by any date restriction,
but can also don't contribute to any date aggregation.
*/
if (dateFormats.stream().allMatch(Objects::isNull)) {
// Initialize empty
for (int row = 0; row < values.length; row++) {
out[row] = CDateSet.createEmpty();
}
return out;
}

for (int row = 1; row < values.length; row++) {
try {
final CDateSet dates = CDateSet.createEmpty();

// Collect all specified dates into a single set.
for (int col = 0; col < dateFormats.size(); col++) {
final DateFormat dateFormat = dateFormats.get(col);

if (dateFormat == null) {
continue;
}
dateFormat.readDates(values[row][col], dateReader, dates);
}

if (dates.isEmpty()) {
continue;
}

if (out[row] == null) {
out[row] = CDateSet.createEmpty();
}

out[row].addAll(dates);
}
catch (Exception e) {
log.warn("Failed to parse Date from {}", row, e);
}
}

return out;
}

@Override
public void resolve(QueryResolveContext context) {
headers = values[0];

final ResolveStatistic resolved =
resolveEntities(values, format,
context.getNamespace().getStorage().getIdMapping(),
context.getConfig().getIdColumns(),
context.getConfig().getLocale().getDateReader(),
onlySingles,
context.getConfig().getSqlConnectorConfig().isEnabled()
);
final EntityResolver.ResolveStatistic resolved = context.getNamespace().getEntityResolver().resolveEntities(
values,
format,
context.getNamespace().getStorage().getIdMapping(),
context.getConfig().getIdColumns(),
context.getConfig().getLocale().getDateReader(),
onlySingles
);

if (resolved.getResolved().isEmpty()) {
throw new ConqueryError.ExternalResolveEmptyError();
Expand All @@ -277,158 +208,6 @@ public void resolve(QueryResolveContext context) {
extra = resolved.getExtra();
}

@Data
public static class ResolveStatistic {

@JsonIgnore
private final Map<String, CDateSet> resolved;

/**
* Entity -> Column -> Values
*/
@JsonIgnore
private final Map<String, Map<String, List<String>>> extra;

private final List<String[]> unreadableDate;
private final List<String[]> unresolvedId;

}

/**
* Helper method to try and resolve entities in values using the specified format.
*/
public static ResolveStatistic resolveEntities(@NotEmpty String[][] values, @NotEmpty List<String> format, EntityIdMap mapping, IdColumnConfig idColumnConfig, @NotNull DateReader dateReader, boolean onlySingles, boolean isInSqlMode) {
final Map<String, CDateSet> resolved = new HashMap<>();

final List<String[]> unresolvedDate = new ArrayList<>();
final List<String[]> unresolvedId = new ArrayList<>();

// extract dates from rows
final CDateSet[] rowDates = readDates(values, format, dateReader);

// Extract extra data from rows by Row, to be collected into by entities
// Row -> Column -> Value
final Map<String, String>[] extraDataByRow = readExtras(values, format);

final List<Function<String[], EntityIdMap.ExternalId>> readers = IdColumnUtil.getIdReaders(format, idColumnConfig.getIdMappers());

// We will not be able to resolve anything...
if (readers.isEmpty()) {
return new ResolveStatistic(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), List.of(values));
}

// Entity -> Column -> Values
final Map<String, Map<String, List<String>>> extraDataByEntity = new HashMap<>();

// ignore the first row, because this is the header
for (int rowNum = 1; rowNum < values.length; rowNum++) {

final String[] row = values[rowNum];

if (rowDates[rowNum] == null) {
unresolvedDate.add(row);
continue;
}

// TODO proper implementation of EntityIdMap#resolve for SQL mode
String resolvedId = isInSqlMode
? String.valueOf(row[0])
: tryResolveId(row, readers, mapping);

if (resolvedId == null) {
unresolvedId.add(row);
continue;
}

//read the dates from the row
resolved.put(resolvedId, rowDates[rowNum]);

// Entity was resolved for row so we collect the data.
if (extraDataByRow[rowNum] != null) {

for (Map.Entry<String, String> entry : extraDataByRow[rowNum].entrySet()) {
extraDataByEntity.computeIfAbsent(resolvedId, (ignored) -> new HashMap<>())
.computeIfAbsent(entry.getKey(), (ignored) -> new ArrayList<>())
.add(entry.getValue());
}
}
}

if (onlySingles) {
// Check that there is at most one value per entity and per column
final boolean alright = extraDataByEntity.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.allMatch(l -> l.size() <= 1);
if (!alright) {
throw new ConqueryError.ExternalResolveOnePerRowError();
}
}

return new ResolveStatistic(resolved, extraDataByEntity, unresolvedDate, unresolvedId);
}

/**
* Try to extract a {@link com.bakdata.conquery.models.identifiable.mapping.EntityIdMap.ExternalId} from the row,
* then try to map it to an internal {@link com.bakdata.conquery.models.query.entity.Entity}
*/
private static String tryResolveId(String[] row, List<Function<String[], EntityIdMap.ExternalId>> readers, EntityIdMap mapping) {
String resolvedId = null;

for (Function<String[], EntityIdMap.ExternalId> reader : readers) {
final EntityIdMap.ExternalId externalId = reader.apply(row);

if (externalId == null) {
continue;
}

String innerResolved = mapping.resolve(externalId);

if (innerResolved == null) {
continue;
}

// Only if all resolvable ids agree on the same entity, do we return the id.
if (resolvedId != null && !innerResolved.equals(resolvedId)) {
log.error("`{}` maps to different Entities", (Object) row);
continue;
}

resolvedId = innerResolved;
}
return resolvedId;
}

/**
* Try and extract Extra data from input to be returned as extra-data in output.
* <p>
* Line -> ( Column -> Value )
*/
private static Map<String, String>[] readExtras(String[][] values, List<String> format) {
final String[] names = values[0];
final Map<String, String>[] extrasByRow = new Map[values.length];


for (int line = 1; line < values.length; line++) {
for (int col = 0; col < format.size(); col++) {
if (!format.get(col).equals(FORMAT_EXTRA)) {
continue;
}


if (extrasByRow[line] == null) {
extrasByRow[line] = new HashMap<>(names.length);
}

extrasByRow[line].put(names[col], values[line][col]);
}
}


return extrasByRow;
}


@Override
public RequiredEntities collectRequiredEntities(QueryExecutionContext context) {
return new RequiredEntities(valuesResolved.keySet());
Expand All @@ -441,7 +220,7 @@ public List<ResultInfo> getResultInfos() {
}
List<ResultInfo> resultInfos = new ArrayList<>();
for (int col = 0; col < format.size(); col++) {
if (!format.get(col).equals(FORMAT_EXTRA)) {
if (!format.get(col).equals(EntityResolverUtil.FORMAT_EXTRA)) {
continue;
}

Expand Down Expand Up @@ -484,15 +263,4 @@ public boolean isHeadersUnique() {
return false;
}

/**
* Try to resolve a date format, return nothing if not possible.
*/
private static DateFormat resolveDateFormat(String name) {
try {
return DateFormat.valueOf(name);
}
catch (IllegalArgumentException e) {
return null; // Does not exist
}
}
}
Loading

0 comments on commit 12b0497

Please sign in to comment.