Skip to content

Commit

Permalink
First PoC.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yuryf@bitquilltech.com>
  • Loading branch information
Yury-Fridlyand committed Nov 16, 2022
1 parent 662a938 commit f335101
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ public LocalTime timeValue() {
return time;
}

@Override
public LocalDate dateValue() {
return LocalDate.now();
}

@Override
public LocalDateTime datetimeValue() {
return LocalDateTime.of(dateValue(), timeValue());
}

@Override
public Instant timestampValue() {
return ZonedDateTime.of(dateValue(), timeValue(), ExprTimestampValue.ZONE).toInstant();
}

@Override
public String toString() {
return String.format("TIME '%s'", value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ExprTimestampValue extends AbstractExprValue {
/**
* todo. only support UTC now.
*/
private static final ZoneId ZONE = ZoneId.of("UTC");
public static final ZoneId ZONE = ZoneId.of("UTC");

private final Instant timestamp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,22 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.opensearch.common.time.DateFormatters;
import org.opensearch.sql.data.model.ExprBooleanValue;
import org.opensearch.sql.data.model.ExprByteValue;
Expand All @@ -61,6 +67,7 @@
import org.opensearch.sql.opensearch.data.utils.Content;
import org.opensearch.sql.opensearch.data.utils.ObjectContent;
import org.opensearch.sql.opensearch.data.utils.OpenSearchJsonContent;
import org.opensearch.sql.opensearch.mapping.MappingEntry;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;

/**
Expand All @@ -73,6 +80,9 @@ public class OpenSearchExprValueFactory {
@Setter
private Map<String, ExprType> typeMapping;


private Map<String, MappingEntry> typeMapping2;

@Getter
@Setter
private OpenSearchAggregationResponseParser parser;
Expand All @@ -81,26 +91,26 @@ public class OpenSearchExprValueFactory {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final Map<ExprType, Function<Content, ExprValue>> typeActionMap =
new ImmutableMap.Builder<ExprType, Function<Content, ExprValue>>()
.put(INTEGER, c -> new ExprIntegerValue(c.intValue()))
.put(LONG, c -> new ExprLongValue(c.longValue()))
.put(SHORT, c -> new ExprShortValue(c.shortValue()))
.put(BYTE, c -> new ExprByteValue(c.byteValue()))
.put(FLOAT, c -> new ExprFloatValue(c.floatValue()))
.put(DOUBLE, c -> new ExprDoubleValue(c.doubleValue()))
.put(STRING, c -> new ExprStringValue(c.stringValue()))
.put(BOOLEAN, c -> ExprBooleanValue.of(c.booleanValue()))
private final Map<ExprType, BiFunction<Content, MappingEntry, ExprValue>> typeActionMap =
new ImmutableMap.Builder<ExprType, BiFunction<Content, MappingEntry, ExprValue>>()
.put(INTEGER, (c, m) -> new ExprIntegerValue(c.intValue()))
.put(LONG, (c, m) -> new ExprLongValue(c.longValue()))
.put(SHORT, (c, m) -> new ExprShortValue(c.shortValue()))
.put(BYTE, (c, m) -> new ExprByteValue(c.byteValue()))
.put(FLOAT, (c, m) -> new ExprFloatValue(c.floatValue()))
.put(DOUBLE, (c, m) -> new ExprDoubleValue(c.doubleValue()))
.put(STRING, (c, m) -> new ExprStringValue(c.stringValue()))
.put(BOOLEAN, (c, m) -> ExprBooleanValue.of(c.booleanValue()))
.put(TIMESTAMP, this::parseTimestamp)
.put(DATE, c -> new ExprDateValue(parseTimestamp(c).dateValue().toString()))
.put(TIME, c -> new ExprTimeValue(parseTimestamp(c).timeValue().toString()))
.put(DATETIME, c -> new ExprDatetimeValue(parseTimestamp(c).datetimeValue()))
.put(OPENSEARCH_TEXT, c -> new OpenSearchExprTextValue(c.stringValue()))
.put(OPENSEARCH_TEXT_KEYWORD, c -> new OpenSearchExprTextKeywordValue(c.stringValue()))
.put(OPENSEARCH_IP, c -> new OpenSearchExprIpValue(c.stringValue()))
.put(OPENSEARCH_GEO_POINT, c -> new OpenSearchExprGeoPointValue(c.geoValue().getLeft(),
.put(DATE, (c, m) -> new ExprDateValue(parseTimestamp(c, m).dateValue().toString()))
.put(TIME, (c, m) -> new ExprTimeValue(parseTimestamp(c, m).timeValue().toString()))
.put(DATETIME, (c, m) -> new ExprDatetimeValue(parseTimestamp(c, m).datetimeValue()))
.put(OPENSEARCH_TEXT, (c, m) -> new OpenSearchExprTextValue(c.stringValue()))
.put(OPENSEARCH_TEXT_KEYWORD, (c, m) -> new OpenSearchExprTextKeywordValue(c.stringValue()))
.put(OPENSEARCH_IP, (c, m) -> new OpenSearchExprIpValue(c.stringValue()))
.put(OPENSEARCH_GEO_POINT, (c, m) -> new OpenSearchExprGeoPointValue(c.geoValue().getLeft(),
c.geoValue().getRight()))
.put(OPENSEARCH_BINARY, c -> new OpenSearchExprBinaryValue(c.stringValue()))
.put(OPENSEARCH_BINARY, (c, m) -> new OpenSearchExprBinaryValue(c.stringValue()))
.build();

/**
Expand All @@ -111,6 +121,12 @@ public OpenSearchExprValueFactory(
this.typeMapping = typeMapping;
}

public OpenSearchExprValueFactory(Map<String, ExprType> typeMapping,
Map<String, MappingEntry> typeMapping2) {
this.typeMapping = typeMapping;
this.typeMapping2 = typeMapping2;
}

/**
* The struct construction has the following assumption. 1. The field has OpenSearch Object
* data type. https://www.elastic.co/guide/en/elasticsearch/reference/current/object.html 2. The
Expand Down Expand Up @@ -151,7 +167,7 @@ private ExprValue parse(Content content, String field, Optional<ExprType> fieldT
return parseArray(content, field);
} else {
if (typeActionMap.containsKey(type)) {
return typeActionMap.get(type).apply(content);
return typeActionMap.get(type).apply(content, typeMapping2.getOrDefault(field, null));
} else {
throw new IllegalStateException(
String.format(
Expand Down Expand Up @@ -188,11 +204,61 @@ private ExprValue constructTimestamp(String value) {
}
}

private ExprValue parseTimestamp(Content value) {
// returns java.time.format.Parsed
private TemporalAccessor parseTimestampString(String value, MappingEntry mapping) {
if (mapping == null) {
return null;
}
for (var formatter : mapping.getRegularFormatters()) {
try {
return formatter.parse(value);
} catch (Exception ignored) {
// nothing to do, try another format
}
}
for (var formatter : mapping.getNamedFormatters()) {
try {
return formatter.parse(value);
} catch (Exception ignored) {
// nothing to do, try another format
}
}
return null;
}

private ExprValue parseTimestamp(Content value, MappingEntry mapping) {
if (value.isNumber()) {
return new ExprTimestampValue(Instant.ofEpochMilli(value.longValue()));
} else if (value.isString()) {
return constructTimestamp(value.stringValue());
TemporalAccessor parsed = parseTimestampString(value.stringValue(), mapping);
if (parsed == null) { // failed to parse or no formats given
return constructTimestamp(value.stringValue());
}
try {
return new ExprTimestampValue(Instant.from(parsed));
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
// TODO return not ExprTimestampValue
try {
return new ExprTimestampValue(new ExprDateValue(LocalDate.from(parsed)).timestampValue());
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
try {
return new ExprTimestampValue(new ExprDatetimeValue(LocalDateTime.from(parsed)).timestampValue());
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
try {
return new ExprTimestampValue(new ExprTimeValue(LocalTime.from(parsed)).timestampValue());
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
// TODO throw exception
LogManager.getLogger(OpenSearchExprValueFactory.class).error(
String.format("Can't recognize parsed value: %s, %s", parsed, parsed.getClass()));
return new ExprStringValue(value.stringValue());
} else {
return new ExprTimestampValue((Instant) value.objectValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ public class IndexMapping {
/** Field mappings from field name to field type in OpenSearch date type system. */
private final Map<String, String> fieldMappings;

public Map<String, MappingEntry> mapping2;

public IndexMapping(Map<String, String> fieldMappings) {
this.fieldMappings = fieldMappings;
}

public IndexMapping(MappingMetadata metaData) {
this.mapping2 = flat2(metaData.getSourceAsMap());
this.fieldMappings = flatMappings(metaData.getSourceAsMap());
}

Expand Down Expand Up @@ -65,6 +68,17 @@ public <T> Map<String, T> getAllFieldTypes(Function<String, T> transform) {
.collect(Collectors.toMap(Map.Entry::getKey, e -> transform.apply(e.getValue())));
}

@SuppressWarnings("unchecked")
private Map<String, MappingEntry> flat2(Map<String, Object> indexMapping) {
return ((Map<String, Object>)indexMapping.get("properties")).entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> {
Map<String, Object> mapping = (Map<String, Object>) e.getValue();
return new MappingEntry((String) mapping.getOrDefault("type", "object"),
(String) mapping.getOrDefault("format", null), null);
}));
}


@SuppressWarnings("unchecked")
private Map<String, String> flatMappings(Map<String, Object> indexMapping) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.opensearch.mapping;

import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.sql.data.type.ExprType;

@AllArgsConstructor
public class MappingEntry {

@Getter
private String fieldType;

@Getter
private String formats;

@Getter
@Setter
private ExprType dataType;

public MappingEntry(String fieldType) {
this(fieldType, null, null);
}

public List<String> getFormatList() {
if (formats == null || formats.isEmpty()) {
return List.of();
}
return Arrays.stream(formats.split("\\|\\|")).map(String::trim).collect(Collectors.toList());
}

public List<DateFormatter> getNamedFormatters() {
return getFormatList().stream().filter(f -> {
try {
DateTimeFormatter.ofPattern(f);
return false;
} catch (Exception e) {
return true;
}
})
.map(DateFormatter::forPattern).collect(Collectors.toList());
}

public List<DateTimeFormatter> getRegularFormatters() {
return getFormatList().stream().map(f -> {
try {
return DateTimeFormatter.ofPattern(f);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Triple;
import org.opensearch.common.collect.Tuple;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.mapping.MappingEntry;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;

/**
Expand Down Expand Up @@ -121,6 +125,20 @@ public Map<String, ExprType> getFieldTypes() {
return fieldTypes;
}

// TODO possible collision if two indices have fields with same names
public Map<String, MappingEntry> getFieldTypes2() {
Map<String, IndexMapping> indexMappings = client.getIndexMappings(indexName.getIndexNames());
Map<String, MappingEntry> fieldTypes = new HashMap<>();

for (IndexMapping indexMapping : indexMappings.values()) {
indexMapping.mapping2.forEach((key, value) ->
value.setDataType(transformESTypeToExprType(value.getFieldType())));
fieldTypes
.putAll(indexMapping.mapping2);
}
return fieldTypes;
}

/**
* Get the minimum of the max result windows of the indices.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public Integer getMaxResultWindow() {
@Override
public PhysicalPlan implement(LogicalPlan plan) {
OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName,
getMaxResultWindow(), new OpenSearchExprValueFactory(getFieldTypes()));
getMaxResultWindow(), new OpenSearchExprValueFactory(getFieldTypes(),
new OpenSearchDescribeIndexRequest(client, indexName).getFieldTypes2()));

/*
* Visit logical plan with index scan as context so logical operators visited, such as
Expand Down

0 comments on commit f335101

Please sign in to comment.