From 7bc69bb5c707341aef3676538463294f4448a3ff Mon Sep 17 00:00:00 2001 From: Sumanth Bandi Date: Thu, 9 Jul 2015 19:30:17 +0530 Subject: [PATCH] Adding Elasticsearch connector. Please check and suggest improvements. --- presto-elasticsearch/pom.xml | 123 +++++++ .../elasticsearch/ElasticsearchClient.java | 304 ++++++++++++++++++ .../elasticsearch/ElasticsearchColumn.java | 88 +++++ .../ElasticsearchColumnHandle.java | 122 +++++++ .../ElasticsearchColumnMetadata.java | 54 ++++ .../elasticsearch/ElasticsearchConfig.java | 27 ++ .../elasticsearch/ElasticsearchConnector.java | 76 +++++ .../ElasticsearchConnectorFactory.java | 62 ++++ .../ElasticsearchConnectorId.java | 42 +++ .../ElasticsearchHandleResolver.java | 59 ++++ .../elasticsearch/ElasticsearchMetadata.java | 168 ++++++++++ .../elasticsearch/ElasticsearchModule.java | 72 +++++ .../elasticsearch/ElasticsearchPartition.java | 53 +++ .../elasticsearch/ElasticsearchPlugin.java | 48 +++ .../ElasticsearchRecordCursor.java | 257 +++++++++++++++ .../elasticsearch/ElasticsearchRecordSet.java | 59 ++++ .../ElasticsearchRecordSetProvider.java | 43 +++ .../elasticsearch/ElasticsearchSplit.java | 86 +++++ .../ElasticsearchSplitManager.java | 74 +++++ .../elasticsearch/ElasticsearchTable.java | 78 +++++ .../ElasticsearchTableHandle.java | 82 +++++ .../ElasticsearchTableSource.java | 58 ++++ .../presto/elasticsearch/MyJSONTest.java | 76 +++++ .../presto/elasticsearch/TestClient.java | 51 +++ .../facebook/presto/elasticsearch/Types.java | 33 ++ .../services/com.facebook.presto.spi.Plugin | 1 + .../presto/elasticsearch/AppTest.java | 39 +++ 27 files changed, 2235 insertions(+) create mode 100644 presto-elasticsearch/pom.xml create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumn.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnHandle.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnMetadata.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnector.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorId.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchHandleResolver.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchModule.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSet.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplit.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java create mode 100644 presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/Types.java create mode 100644 presto-elasticsearch/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin create mode 100644 presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java diff --git a/presto-elasticsearch/pom.xml b/presto-elasticsearch/pom.xml new file mode 100644 index 000000000000..6375bb7e76aa --- /dev/null +++ b/presto-elasticsearch/pom.xml @@ -0,0 +1,123 @@ + + + 4.0.0 + + com.facebook.presto + presto-root + 0.108-SNAPSHOT + + com.facebook.presto.elasticsearch + presto-elasticsearch + 1.0-SNAPSHOT + presto-elasticsearch + http://maven.apache.org + + + junit + junit + 3.8.1 + test + + + + org.elasticsearch + elasticsearch + 1.6.0 + + + + com.facebook.presto + presto-spi + provided + + + + io.airlift + bootstrap + provided + + + + io.airlift + json + provided + + + + io.airlift + log + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-core + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + io.airlift + units + provided + + + + io.airlift + configuration + provided + + + + io.airlift + slice + provided + + + + com.google.guava + guava + provided + + + + javax.inject + javax.inject + provided + + + + com.google.inject + guice + provided + + + + javax.validation + validation-api + provided + + + + org.json + json + 20080701 + + + + + + + diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java new file mode 100644 index 000000000000..d39a27be964e --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java @@ -0,0 +1,304 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.type.Type; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Resources; +import io.airlift.json.JsonCodec; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.json.JSONException; +import org.json.JSONObject; + +import javax.inject.Inject; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.*; +import java.util.concurrent.ExecutionException; + +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.DoubleType.DOUBLE; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Maps.transformValues; +import static com.google.common.collect.Maps.uniqueIndex; +import static io.airlift.json.JsonCodec.listJsonCodec; +import static java.nio.charset.StandardCharsets.UTF_8; + + + +public class ElasticsearchClient +{ + /** + * SchemaName -> (TableName -> TableMetadata) + */ + private Supplier>> schemas; + private ElasticsearchConfig config; + private JsonCodec>> catalogCodec; + + @Inject + public ElasticsearchClient(ElasticsearchConfig config, JsonCodec>> catalogCodec) + throws IOException + { + checkNotNull(config, "config is null"); + checkNotNull(catalogCodec, "catalogCodec is null"); + + this.config = config; + this.catalogCodec = catalogCodec; + + schemas = Suppliers.memoize(schemasSupplier(catalogCodec, config.getMetadata())); + } + + private ElasticsearchColumn makeColumn(String fieldPath_Type) throws JSONException, IOException { + String[] items = fieldPath_Type.split(":"); + + if(items.length != 2) { + /* + System.out.println("The items are :"); + for (String it : items) + { + System.out.println(it); + } + */ + return null; + //assert (items.length == 2); + } + + String type = items[1]; + String path = items[0]; + Type prestoType = VARCHAR; // default. It will be corrected below + + // take only properties from dimensions and measurements for now + if(!(path.startsWith("measurements"))) return null; + + + if(path.endsWith(".type")) + { + path = path.substring(0, path.lastIndexOf('.')); + + // replace '.properties.' with '.' + path = path.replaceAll("\\.properties\\.", "."); + } + + if(type.equals("double") || type.equals("float") || type.equals("integer") || type.equals("string")) + { + if(type.equals("double")) prestoType = DOUBLE; + else if(type.equals("float")) prestoType = DOUBLE; + else if(type.equals("integer")) prestoType = BIGINT; + else if(type.equals("long")) prestoType = BIGINT; + else if(type.equals("string")) prestoType = VARCHAR; + } + else return null; + + ElasticsearchColumn column = new ElasticsearchColumn(path.replaceAll("\\.","_"), prestoType, path, type); + return column; + } + + private void getColumns(ElasticsearchTableSource src, Set columns) throws ExecutionException, InterruptedException, IOException, JSONException { + + /* + Get the current set of columns for one of the sources of a table + */ + String hostaddress = src.getHostaddress(); + int port = src.getPort(); + String clusterName = src.getClusterName(); + String index = src.getIndex(); + String type = src.getType(); + + System.out.println("connecting ...."); + System.out.println("hostaddress :" + hostaddress); + System.out.println("port :" + port); + System.out.println("clusterName :" + clusterName); + System.out.println("index :" + index); + System.out.println("type :" + type); + + + + Settings settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", clusterName) + .build(); + + Client client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress( + hostaddress, port)); + + GetMappingsResponse res = client.admin().indices().getMappings(new GetMappingsRequest().indices(index).types(type)).get(); + + + ImmutableOpenMap mapping = res.mappings().get(index); + for (ObjectObjectCursor c : mapping) { + //System.out.println(c.key+" = "+c.value.source()); + String data = c.value.source().toString(); + JSONObject json = new JSONObject(data); + json = json.getJSONObject(type).getJSONObject("properties"); + //System.out.println(json.toString(2)); + + List leaves = (new MyJSONTest()).getListJson(json); + for (String fieldPath_Type : leaves) + { + ElasticsearchColumn clm = makeColumn(fieldPath_Type); + if (!(clm == null)) { + columns.add(clm); + } + } + //System.out.println("--------------------"); + } + + client.close(); + } + + private List getColumnsMetadata(List columns) + { + List columnsMetadata = new ArrayList<>(); + for (ElasticsearchColumn clm : columns) + { + columnsMetadata.add(new ElasticsearchColumnMetadata(clm)); + } + return columnsMetadata; + } + + + private void updateTableColumns_ColumnsMetadata(ElasticsearchTable table) + { + Set columns = new HashSet(); + for(ElasticsearchTableSource src : table.getSources()) + { + try { + getColumns(src,columns); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } catch (JSONException e) { + System.out.println("JSONException caught !!!"); + e.printStackTrace(); + System.out.println("JSONException caught !!!"); + } + } + + List columnsList = new ArrayList(columns); + table.setColumns(columnsList); + table.setColumnsMetadata(getColumnsMetadata(columnsList)); + } + + private void updateSchemas() throws IOException + { + // load from the metadata json file + schemas = Suppliers.memoize(schemasSupplier(catalogCodec, config.getMetadata())); + + Map> schemasMap = schemas.get(); + for (Map.Entry> entry : schemasMap.entrySet()) { + + Map tablesMap = entry.getValue(); + for (Map.Entry tableEntry : tablesMap.entrySet()) { + updateTableColumns_ColumnsMetadata(tableEntry.getValue()); + } + } + //schemas = Suppliers.ofInstance(schemasMap); + schemas = Suppliers.memoize(Suppliers.ofInstance(schemasMap)); + } + + public Set getSchemaNames() { + + //System.out.println("mark : getSchemaNames()"); + return schemas.get().keySet(); + } + + public Set getTableNames(String schema) + { + checkNotNull(schema, "schema is null"); + Map tables = schemas.get().get(schema); + if (tables == null) { + return ImmutableSet.of(); + } + return tables.keySet(); + } + + public ElasticsearchTable getTable(String schema, String tableName) + { + try { + this.updateSchemas(); + } catch (IOException e) { + e.printStackTrace(); + } + + checkNotNull(schema, "schema is null"); + checkNotNull(tableName, "tableName is null"); + Map tables = schemas.get().get(schema); + if (tables == null) { + return null; + } + return tables.get(tableName); + } + + private static Supplier>> schemasSupplier(final JsonCodec>> catalogCodec, final URI metadataUri) + { + + return () -> { + try { + //System.out.println("mark : executing method schemasSupplier() :"); + return lookupSchemas(metadataUri, catalogCodec); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + }; + + + } + + private static Map> lookupSchemas(URI metadataUri, JsonCodec>> catalogCodec) + throws IOException + { + // This function is called in the constructor of ElasticsearchClient + //System.out.println("mark : in method lookupSchemas()"); + + URL result = metadataUri.toURL(); + System.out.println("result : " + result); + + String json = Resources.toString(result, UTF_8); + System.out.println("json : " + json); + + Map> catalog = catalogCodec.fromJson(json); + + return ImmutableMap.copyOf(transformValues(catalog, resolveAndIndexTables(metadataUri))); + } + + private static Function, Map> resolveAndIndexTables(final URI metadataUri) + { + return tables -> { + Iterable resolvedTables = transform(tables, tableUriResolver(metadataUri)); + return ImmutableMap.copyOf(uniqueIndex(resolvedTables, ElasticsearchTable::getName)); + }; + } + + private static Function tableUriResolver(final URI baseUri) + { + return table -> { + //List sources = ImmutableList.copyOf(transform(table.getSources(), baseUri::resolve)); + List sources = table.getSources(); + return new ElasticsearchTable(table.getName(), table.getColumns(), sources); + }; + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumn.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumn.java new file mode 100644 index 000000000000..58abe6257448 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumn.java @@ -0,0 +1,88 @@ +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.type.Type; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import com.facebook.presto.spi.type.Type; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.isNullOrEmpty; + +public final class ElasticsearchColumn +{ + private final String name; + private final Type type; + private final String jsonPath; + private final String jsonType; + + @JsonCreator + public ElasticsearchColumn( + @JsonProperty("name") String name, + @JsonProperty("type") Type type, + @JsonProperty("jsonPath") String jsonPath, + @JsonProperty("jsonType") String jsonType) + { + checkArgument(!isNullOrEmpty(name), "name is null or is empty"); + this.name = name; + this.type = checkNotNull(type, "type is null"); + this.jsonPath = checkNotNull(jsonPath, "jsonPath is null"); + this.jsonType = checkNotNull(jsonType, "jsonType is null"); + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public Type getType() + { + return type; + } + + @JsonProperty + public String getJsonPath() + { + return jsonPath; + } + + @JsonProperty + public String getJsonType() + { + return jsonType; + } + + @Override + public int hashCode() + { + return Objects.hash(name, type, jsonPath, jsonType); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + ElasticsearchColumn other = (ElasticsearchColumn) obj; + return Objects.equals(this.name, other.name) && + Objects.equals(this.type, other.type) && + Objects.equals(this.jsonPath, other.jsonPath) && + Objects.equals(this.jsonType, other.jsonType); + } + + @Override + public String toString() + { + return name + ":" + type + ":" + jsonPath + ":" + jsonType; + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnHandle.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnHandle.java new file mode 100644 index 000000000000..d6e463a10d25 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnHandle.java @@ -0,0 +1,122 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.type.Type; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkNotNull; + +public final class ElasticsearchColumnHandle + implements ColumnHandle +{ + private final String connectorId; + private final String columnName; + private final Type columnType; + private final String columnJsonPath; + private final String columnJsonType; + /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table + IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename) + The columnhandle of clm3 : has ordinalposition = 3 + */ + private final int ordinalPosition; + + @JsonCreator + public ElasticsearchColumnHandle( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("columnName") String columnName, + @JsonProperty("columnType") Type columnType, + @JsonProperty("columnJsonPath") String columnJsonPath, + @JsonProperty("columnJsonType") String columnJsonType, + @JsonProperty("ordinalPosition") int ordinalPosition) + { + this.connectorId = checkNotNull(connectorId, "connectorId is null"); + this.columnName = checkNotNull(columnName, "columnName is null"); + this.columnType = checkNotNull(columnType, "columnType is null"); + this.columnJsonPath = checkNotNull(columnJsonPath, "columnJsonPath is null"); + this.columnJsonType = checkNotNull(columnJsonType, "columnJsonType is null"); + this.ordinalPosition = ordinalPosition; + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + + @JsonProperty + public String getColumnJsonPath() + { + return columnJsonPath; + } + + @JsonProperty + public String getColumnJsonType() + { + return columnJsonType; + } + + + @JsonProperty + public int getOrdinalPosition() + { + return ordinalPosition; + } + + public ColumnMetadata getColumnMetadata() + { + return new ColumnMetadata(columnName, columnType, false); + } + + @Override + public int hashCode() + { + return Objects.hash(connectorId, columnName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + ElasticsearchColumnHandle other = (ElasticsearchColumnHandle) obj; + return Objects.equals(this.connectorId, other.connectorId) && + Objects.equals(this.columnName, other.columnName); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("connectorId", connectorId) + .add("columnName", columnName) + .add("columnType", columnType) + .add("columnJsonPath", columnJsonPath) + .add("columnJsonType", columnJsonType) + .add("ordinalPosition", ordinalPosition) + .toString(); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnMetadata.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnMetadata.java new file mode 100644 index 000000000000..05966e7aeb96 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchColumnMetadata.java @@ -0,0 +1,54 @@ +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.type.Type; + +/** + * Created by sprinklr on 03/07/15. + */ +public class ElasticsearchColumnMetadata extends ColumnMetadata +{ + private final String jsonPath; + private final String jsonType; + + public ElasticsearchColumnMetadata(String name, Type type, String jsonPath, String jsonType, boolean partitionKey) { + super(name, type, partitionKey, null, false); + this.jsonPath = jsonPath; + this.jsonType = jsonType; + } + + public ElasticsearchColumnMetadata(ElasticsearchColumn column) + { + this(column.getName(), column.getType(), column.getJsonPath(), column.getJsonType(), false); + } + + public String getJsonPath() { + return jsonPath; + } + + public String getJsonType() { + return jsonType; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + ElasticsearchColumnMetadata that = (ElasticsearchColumnMetadata) o; + + if (!jsonPath.equals(that.jsonPath)) return false; + return jsonType.equals(that.jsonType); + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + jsonPath.hashCode(); + result = 31 * result + jsonType.hashCode(); + return result; + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java new file mode 100644 index 000000000000..9bc0544e149e --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java @@ -0,0 +1,27 @@ + +package com.facebook.presto.elasticsearch; + +import io.airlift.configuration.Config; + +import javax.validation.constraints.NotNull; + +import java.net.URI; + +public class ElasticsearchConfig +{ + private URI metadata; + + @NotNull + public URI getMetadata() + { + return metadata; + } + + @Config("metadata-uri") + public ElasticsearchConfig setMetadata(URI metadata) + { + this.metadata = metadata; + return this; + } + +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnector.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnector.java new file mode 100644 index 000000000000..217984ea6429 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnector.java @@ -0,0 +1,76 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.Connector; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorMetadata; +import com.facebook.presto.spi.ConnectorRecordSetProvider; +import com.facebook.presto.spi.ConnectorSplitManager; +import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.log.Logger; + +import javax.inject.Inject; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchConnector + implements Connector +{ + private static final Logger log = Logger.get(ElasticsearchConnector.class); + + private final LifeCycleManager lifeCycleManager; + private final ElasticsearchMetadata metadata; + private final ElasticsearchSplitManager splitManager; + private final ElasticsearchRecordSetProvider recordSetProvider; + private final ElasticsearchHandleResolver handleResolver; + + @Inject + public ElasticsearchConnector( + LifeCycleManager lifeCycleManager, + ElasticsearchMetadata metadata, + ElasticsearchSplitManager splitManager, + ElasticsearchRecordSetProvider recordSetProvider, + ElasticsearchHandleResolver handleResolver) + { + this.lifeCycleManager = checkNotNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadata = checkNotNull(metadata, "metadata is null"); + this.splitManager = checkNotNull(splitManager, "splitManager is null"); + this.recordSetProvider = checkNotNull(recordSetProvider, "recordSetProvider is null"); + this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); + } + + @Override + public ConnectorMetadata getMetadata() + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() + { + return recordSetProvider; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return handleResolver; + } + + @Override + public final void shutdown() + { + try { + lifeCycleManager.stop(); + } + catch (Exception e) { + log.error(e, "Error shutting down connector"); + } + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java new file mode 100644 index 000000000000..edf037eaa0ce --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java @@ -0,0 +1,62 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.Connector; +import com.facebook.presto.spi.ConnectorFactory; +import com.facebook.presto.spi.type.TypeManager; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.json.JsonModule; + +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchConnectorFactory + implements ConnectorFactory +{ + private final TypeManager typeManager; + private final Map optionalConfig; + + public ElasticsearchConnectorFactory(TypeManager typeManager, Map optionalConfig) + { + this.typeManager = checkNotNull(typeManager, "typeManager is null"); + this.optionalConfig = ImmutableMap.copyOf(checkNotNull(optionalConfig, "optionalConfig is null")); + } + + @Override + public String getName() + { + return "elasticsearch"; + } + + @Override + public Connector create(final String connectorId, Map requiredConfig) + { + checkNotNull(requiredConfig, "requiredConfig is null"); + checkNotNull(optionalConfig, "optionalConfig is null"); + + try { + // A plugin is not required to use Guice; it is just very convenient + Bootstrap app = new Bootstrap( + new JsonModule(), + new ElasticsearchModule(connectorId, typeManager)); + + Injector injector = app + .strictConfig() + .doNotInitializeLogging() + .setRequiredConfigurationProperties(requiredConfig) + .setOptionalConfigurationProperties(optionalConfig) + .initialize(); + + + + return injector.getInstance(ElasticsearchConnector.class); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorId.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorId.java new file mode 100644 index 000000000000..2fbbd7b380c4 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorId.java @@ -0,0 +1,42 @@ + +package com.facebook.presto.elasticsearch; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkNotNull; + +public final class ElasticsearchConnectorId +{ + private final String id; + + public ElasticsearchConnectorId(String id) + { + this.id = checkNotNull(id, "id is null"); + } + + @Override + public String toString() + { + return id; + } + + @Override + public int hashCode() + { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + ElasticsearchConnectorId other = (ElasticsearchConnectorId) obj; + return Objects.equals(this.id, other.id); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchHandleResolver.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchHandleResolver.java new file mode 100644 index 000000000000..12d278068794 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchHandleResolver.java @@ -0,0 +1,59 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; + +import javax.inject.Inject; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchHandleResolver + implements ConnectorHandleResolver +{ + private final String connectorId; + + @Inject + public ElasticsearchHandleResolver(ElasticsearchConnectorId clientId) + { + this.connectorId = checkNotNull(clientId, "clientId is null").toString(); + } + + @Override + public boolean canHandle(ConnectorTableHandle tableHandle) + { + return tableHandle instanceof ElasticsearchTableHandle && ((ElasticsearchTableHandle) tableHandle).getConnectorId().equals(connectorId); + } + + @Override + public boolean canHandle(ColumnHandle columnHandle) + { + return columnHandle instanceof ElasticsearchColumnHandle && ((ElasticsearchColumnHandle) columnHandle).getConnectorId().equals(connectorId); + } + + @Override + public boolean canHandle(ConnectorSplit split) + { + return split instanceof ElasticsearchSplit && ((ElasticsearchSplit) split).getConnectorId().equals(connectorId); + } + + @Override + public Class getTableHandleClass() + { + return ElasticsearchTableHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return ElasticsearchColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return ElasticsearchSplit.class; + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java new file mode 100644 index 000000000000..82f71c65108b --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java @@ -0,0 +1,168 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.ReadOnlyConnectorMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.TableNotFoundException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.facebook.presto.elasticsearch.Types.checkType; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchMetadata + extends ReadOnlyConnectorMetadata +{ + private final String connectorId; + + private final ElasticsearchClient elasticsearchClient; + + @Inject + public ElasticsearchMetadata(ElasticsearchConnectorId connectorId, ElasticsearchClient elasticsearchClient) + { + this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); + this.elasticsearchClient = checkNotNull(elasticsearchClient, "client is null"); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return listSchemaNames(); + } + + public List listSchemaNames() + { + return ImmutableList.copyOf(elasticsearchClient.getSchemaNames()); + } + + @Override + public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + if (!listSchemaNames(session).contains(tableName.getSchemaName())) { + return null; + } + + ElasticsearchTable table = elasticsearchClient.getTable(tableName.getSchemaName(), tableName.getTableName()); + if (table == null) { + return null; + } + + return new ElasticsearchTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName()); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorTableHandle table) + { + ElasticsearchTableHandle elasticsearchTableHandle = checkType(table, ElasticsearchTableHandle.class, "table"); + checkArgument(elasticsearchTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); + SchemaTableName tableName = new SchemaTableName(elasticsearchTableHandle.getSchemaName(), elasticsearchTableHandle.getTableName()); + + return getTableMetadata(tableName); + } + + @Override + public List listTables(ConnectorSession session, String schemaNameOrNull) + { + Set schemaNames; + if (schemaNameOrNull != null) { + schemaNames = ImmutableSet.of(schemaNameOrNull); + } + else { + schemaNames = elasticsearchClient.getSchemaNames(); + } + + ImmutableList.Builder builder = ImmutableList.builder(); + for (String schemaName : schemaNames) { + for (String tableName : elasticsearchClient.getTableNames(schemaName)) { + builder.add(new SchemaTableName(schemaName, tableName)); + } + } + return builder.build(); + } + + @Override + public ColumnHandle getSampleWeightColumnHandle(ConnectorTableHandle tableHandle) + { + return null; + } + + @Override + public Map getColumnHandles(ConnectorTableHandle tableHandle) + { + ElasticsearchTableHandle elasticsearchTableHandle = checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); + checkArgument(elasticsearchTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); + + ElasticsearchTable table = elasticsearchClient.getTable(elasticsearchTableHandle.getSchemaName(), elasticsearchTableHandle.getTableName()); + if (table == null) { + throw new TableNotFoundException(elasticsearchTableHandle.toSchemaTableName()); + } + + ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + int index = 0; + for (ColumnMetadata column : table.getColumnsMetadata()) { + ElasticsearchColumnMetadata esColumn = (ElasticsearchColumnMetadata)column; + columnHandles.put(esColumn.getName(), new ElasticsearchColumnHandle(connectorId, column.getName(), column.getType(), + esColumn.getJsonPath(), esColumn.getJsonType(), index)); + index++; + } + return columnHandles.build(); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + checkNotNull(prefix, "prefix is null"); + ImmutableMap.Builder> columns = ImmutableMap.builder(); + for (SchemaTableName tableName : listTables(session, prefix)) { + ConnectorTableMetadata tableMetadata = getTableMetadata(tableName); + // table can disappear during listing operation + if (tableMetadata != null) { + columns.put(tableName, tableMetadata.getColumns()); + } + } + return columns.build(); + } + + private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) + { + if (!listSchemaNames().contains(tableName.getSchemaName())) { + return null; + } + + ElasticsearchTable table = elasticsearchClient.getTable(tableName.getSchemaName(), tableName.getTableName()); + if (table == null) { + return null; + } + + return new ConnectorTableMetadata(tableName, table.getColumnsMetadata()); + } + + private List listTables(ConnectorSession session, SchemaTablePrefix prefix) + { + if (prefix.getSchemaName() == null) { + return listTables(session, prefix.getSchemaName()); + } + return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); + return checkType(columnHandle, ElasticsearchColumnHandle.class, "columnHandle").getColumnMetadata(); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchModule.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchModule.java new file mode 100644 index 000000000000..b439ab0b21fa --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchModule.java @@ -0,0 +1,72 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.TypeManager; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import javax.inject.Inject; + +import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonBinder.jsonBinder; +import static io.airlift.json.JsonCodec.listJsonCodec; +import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; + +public class ElasticsearchModule + implements Module +{ + private final String connectorId; + private final TypeManager typeManager; + + public ElasticsearchModule(String connectorId, TypeManager typeManager) + { + this.connectorId = checkNotNull(connectorId, "connector id is null"); + this.typeManager = checkNotNull(typeManager, "typeManager is null"); + } + + @Override + public void configure(Binder binder) + { + binder.bind(TypeManager.class).toInstance(typeManager); + + binder.bind(ElasticsearchConnector.class).in(Scopes.SINGLETON); + binder.bind(ElasticsearchConnectorId.class).toInstance(new ElasticsearchConnectorId(connectorId)); + binder.bind(ElasticsearchMetadata.class).in(Scopes.SINGLETON); + binder.bind(ElasticsearchClient.class).in(Scopes.SINGLETON); + binder.bind(ElasticsearchSplitManager.class).in(Scopes.SINGLETON); + binder.bind(ElasticsearchRecordSetProvider.class).in(Scopes.SINGLETON); + binder.bind(ElasticsearchHandleResolver.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(ElasticsearchConfig.class); + + jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); + jsonCodecBinder(binder).bindMapJsonCodec(String.class, listJsonCodec(ElasticsearchTable.class)); + } + + public static final class TypeDeserializer + extends FromStringDeserializer + { + private final TypeManager typeManager; + + @Inject + public TypeDeserializer(TypeManager typeManager) + { + super(Type.class); + this.typeManager = checkNotNull(typeManager, "typeManager is null"); + } + + @Override + protected Type _deserialize(String value, DeserializationContext context) + { + Type type = typeManager.getType(parseTypeSignature(value)); + checkArgument(type != null, "Unknown type %s", value); + return type; + } + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java new file mode 100644 index 000000000000..7cf9249af767 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java @@ -0,0 +1,53 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPartition; +import com.facebook.presto.spi.TupleDomain; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchPartition + implements ConnectorPartition +{ + private final String schemaName; + private final String tableName; + + public ElasticsearchPartition(String schemaName, String tableName) + { + this.schemaName = checkNotNull(schemaName, "schema name is null"); + this.tableName = checkNotNull(tableName, "table name is null"); + } + + @Override + public String getPartitionId() + { + return schemaName + ":" + tableName; + } + + public String getSchemaName() + { + return schemaName; + } + + public String getTableName() + { + return tableName; + } + + @Override + public TupleDomain getTupleDomain() + { + return TupleDomain.all(); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("schemaName", schemaName) + .add("tableName", tableName) + .toString(); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java new file mode 100644 index 000000000000..d5b01cf49bfc --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java @@ -0,0 +1,48 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ConnectorFactory; +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.type.TypeManager; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchPlugin + implements Plugin +{ + private TypeManager typeManager; + private Map optionalConfig = ImmutableMap.of(); + + @Override + public synchronized void setOptionalConfig(Map optionalConfig) + { + this.optionalConfig = ImmutableMap.copyOf(checkNotNull(optionalConfig, "optionalConfig is null")); + } + + @Inject + public synchronized void setTypeManager(TypeManager typeManager) + { + this.typeManager = typeManager; + } + + public synchronized Map getOptionalConfig() + { + return optionalConfig; + } + + @Override + public synchronized List getServices(Class type) + { + if (type == ConnectorFactory.class) { + return ImmutableList.of(type.cast(new ElasticsearchConnectorFactory(typeManager, getOptionalConfig()))); + } + return ImmutableList.of(); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java new file mode 100644 index 000000000000..61b9c83eaaf9 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java @@ -0,0 +1,257 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.RecordCursor; +import com.facebook.presto.spi.type.Type; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import com.google.common.io.CountingInputStream; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + +import java.io.IOException; +import java.util.*; + +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; +import static com.facebook.presto.spi.type.DoubleType.DOUBLE; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.nio.charset.StandardCharsets.UTF_8; + +public class ElasticsearchRecordCursor + implements RecordCursor +{ + //private static final Splitter LINE_SPLITTER = Splitter.on(",").trimResults(); + + private final List columnHandles; + //private final int[] fieldToColumnIndex; + Map jsonpathToIndex = new HashMap(); + + private final Iterator lines; + private long totalBytes; + + private List fields; + + public ElasticsearchRecordCursor(List columnHandles, ElasticsearchTableSource tableSource) + { + this.columnHandles = columnHandles; + + //fieldToColumnIndex = new int[columnHandles.size()]; + for (int i = 0; i < columnHandles.size(); i++) { + ElasticsearchColumnHandle columnHandle = columnHandles.get(i); + //fieldToColumnIndex[i] = columnHandle.getOrdinalPosition(); + //jsonpathToIndex.put(columnHandle.getColumnJsonPath(), columnHandle.getOrdinalPosition()); + + jsonpathToIndex.put(columnHandle.getColumnJsonPath(), i); + } + + /* + try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) { + lines = byteSource.asCharSource(UTF_8).readLines().iterator(); + totalBytes = input.getCount(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + */ + + Settings settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", tableSource.getClusterName()) + /*.put("client.transport.sniff", true)*/.build(); + + Client client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress( + tableSource.getHostaddress(), tableSource.getPort())); + + + //String[] fields = new String[] {"user", "dim.age" , "measurements.FACEBOOK_PAGE_CONSUMPTIONS_UNIQUE"}; + ArrayList fieldsNeeded = new ArrayList(); + for (int i = 0; i < columnHandles.size(); i++) { + ElasticsearchColumnHandle columnHandle = columnHandles.get(i); + fieldsNeeded.add(columnHandle.getColumnJsonPath()); + } + + /*SearchResponse response = client.prepareSearch(tableSource.getIndex()) + .setTypes(tableSource.getType()) + //.setQuery(QueryBuilders.termQuery("dimensions.SN_TYPE", "facebook")) + .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) + .setFrom(0).setSize(1000000).setExplain(true) + .execute() + .actionGet(); + lines = Arrays.asList(response.getHits().getHits()).iterator();*/ + + + + lines = getRows_faster(client, tableSource, fieldsNeeded).iterator(); + + totalBytes = 0; + + client.close(); + } + + private List getRows(Client client, ElasticsearchTableSource tableSource, ArrayList fieldsNeeded) + { + SearchResponse response = client.prepareSearch(tableSource.getIndex()) + .setTypes(tableSource.getType()) + //.setQuery(QueryBuilders.termQuery("dimensions.SN_TYPE", "facebook")) + .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) + .setFrom(0).setSize(1000000).setExplain(true) + .execute() + .actionGet(); + return Arrays.asList(response.getHits().getHits()); + } + + private List getRows_faster(Client client, ElasticsearchTableSource tableSource, ArrayList fieldsNeeded) + { + List rows = new ArrayList<>(); + SearchResponse scrollResp = client.prepareSearch(tableSource.getIndex()) + .setTypes(tableSource.getType()) + .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) + .setSearchType(SearchType.SCAN) + .setScroll(new TimeValue(60000)) + .setSize(20000).execute().actionGet(); //20000 hits per shard will be returned for each scroll + //Scroll until no hits are returned + while (true) { + + for (SearchHit hit : scrollResp.getHits().getHits()) { + //Handle the hit... + rows.add(hit); + } + scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + //Break condition: No hits are returned + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + return rows; + } + + @Override + public long getTotalBytes() + { + return totalBytes; + } + + @Override + public long getCompletedBytes() + { + return totalBytes; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public Type getType(int field) + { + checkArgument(field < columnHandles.size(), "Invalid field index"); + return columnHandles.get(field).getColumnType(); + } + + @Override + public boolean advanceNextPosition() + { + if (!lines.hasNext()) { + return false; + } + SearchHit hit = lines.next(); + + //fields = LINE_SPLITTER.splitToList(line); + fields = new ArrayList(Collections.nCopies(columnHandles.size(), "-1")); + + Map map = hit.getFields(); + for (Map.Entry entry : map.entrySet()) { + String jsonPath = entry.getKey().toString(); + SearchHitField fieldvar = entry.getValue(); + + // we get the value , wrapped in a list (of size 1 ofcourse) -> [value] (The java api returns in this way) + ArrayList lis = new ArrayList(fieldvar.getValues()); + // get the value + String value = String.valueOf(lis.get(0)); + + fields.set(jsonpathToIndex.get(jsonPath), value); + + + //System.out.println("key, " + path + " value " + lis.get(0) ); + } + + totalBytes += fields.size(); + + + return true; + } + + private String getFieldValue(int field) + { + checkState(fields != null, "Cursor has not been advanced yet"); + + //int columnIndex = fieldToColumnIndex[field]; + //return fields.get(columnIndex); + return fields.get(field); + } + + @Override + public boolean getBoolean(int field) + { + checkFieldType(field, BOOLEAN); + return Boolean.parseBoolean(getFieldValue(field)); + } + + @Override + public long getLong(int field) + { + checkFieldType(field, BIGINT); + return Long.parseLong(getFieldValue(field)); + } + + @Override + public double getDouble(int field) + { + checkFieldType(field, DOUBLE); + return Double.parseDouble(getFieldValue(field)); + } + + @Override + public Slice getSlice(int field) + { + checkFieldType(field, VARCHAR); + return Slices.utf8Slice(getFieldValue(field)); + } + + @Override + public boolean isNull(int field) + { + checkArgument(field < columnHandles.size(), "Invalid field index"); + return Strings.isNullOrEmpty(getFieldValue(field)); + } + + private void checkFieldType(int field, Type expected) + { + Type actual = getType(field); + checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field, expected, actual); + } + + @Override + public void close() + { + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSet.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSet.java new file mode 100644 index 000000000000..9114bfff4ef9 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSet.java @@ -0,0 +1,59 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.RecordCursor; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.type.Type; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteSource; +import com.google.common.io.Resources; + +import java.net.MalformedURLException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchRecordSet + implements RecordSet +{ + private final List columnHandles; + private final List columnTypes; + //private final ByteSource byteSource; + private final ElasticsearchTableSource tableSource; + + + public ElasticsearchRecordSet(ElasticsearchSplit split, List columnHandles) + { + checkNotNull(split, "split is null"); + + this.columnHandles = checkNotNull(columnHandles, "column handles is null"); + ImmutableList.Builder types = ImmutableList.builder(); + for (ElasticsearchColumnHandle column : columnHandles) { + types.add(column.getColumnType()); + } + this.columnTypes = types.build(); + + /* + try { + byteSource = Resources.asByteSource(split.getUri().toURL()); + } + catch (MalformedURLException e) { + throw Throwables.propagate(e); + } + */ + tableSource = split.getUri(); + } + + @Override + public List getColumnTypes() + { + return columnTypes; + } + + @Override + public RecordCursor cursor() + { + return new ElasticsearchRecordCursor(columnHandles, tableSource); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java new file mode 100644 index 000000000000..3da14ca6bf43 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java @@ -0,0 +1,43 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorRecordSetProvider; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.RecordSet; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.util.List; + +import static com.facebook.presto.elasticsearch.Types.checkType; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchRecordSetProvider + implements ConnectorRecordSetProvider +{ + private final String connectorId; + + @Inject + public ElasticsearchRecordSetProvider(ElasticsearchConnectorId connectorId) + { + this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); + } + + @Override + public RecordSet getRecordSet(ConnectorSplit split, List columns) + { + checkNotNull(split, "partitionChunk is null"); + ElasticsearchSplit elasticsearchSplit = checkType(split, ElasticsearchSplit.class, "split"); + checkArgument(elasticsearchSplit.getConnectorId().equals(connectorId), "split is not for this connector"); + + ImmutableList.Builder handles = ImmutableList.builder(); + for (ColumnHandle handle : columns) { + handles.add(checkType(handle, ElasticsearchColumnHandle.class, "handle")); + } + + return new ElasticsearchRecordSet(elasticsearchSplit, handles.build()); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplit.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplit.java new file mode 100644 index 000000000000..90b2bfadb04b --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplit.java @@ -0,0 +1,86 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.net.URI; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ElasticsearchSplit + implements ConnectorSplit +{ + private final String connectorId; + private final String schemaName; + private final String tableName; + private final ElasticsearchTableSource uri; + private final boolean remotelyAccessible; + private final ImmutableList addresses; + + @JsonCreator + public ElasticsearchSplit( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("uri") ElasticsearchTableSource uri) + { + this.schemaName = checkNotNull(schemaName, "schema name is null"); + this.connectorId = checkNotNull(connectorId, "connector id is null"); + this.tableName = checkNotNull(tableName, "table name is null"); + this.uri = checkNotNull(uri, "uri is null"); + +// if ("http".equalsIgnoreCase(uri.getScheme()) || "https".equalsIgnoreCase(uri.getScheme())) { + remotelyAccessible = true; + //addresses = ImmutableList.of(HostAddress.fromUri(uri)); + addresses = ImmutableList.of(); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public ElasticsearchTableSource getUri() + { + return uri; + } + + @Override + public boolean isRemotelyAccessible() + { + // only http or https is remotely accessible + return remotelyAccessible; + } + + @Override + public List getAddresses() + { + return addresses; + } + + @Override + public Object getInfo() + { + return this; + } + +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java new file mode 100644 index 000000000000..227dcd7a1e58 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java @@ -0,0 +1,74 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPartition; +import com.facebook.presto.spi.ConnectorPartitionResult; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitManager; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.TupleDomain; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.facebook.presto.elasticsearch.Types.checkType; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +public class ElasticsearchSplitManager + implements ConnectorSplitManager +{ + private final String connectorId; + private final ElasticsearchClient elasticsearchClient; + + @Inject + public ElasticsearchSplitManager(ElasticsearchConnectorId connectorId, ElasticsearchClient elasticsearchClient) + { + this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); + this.elasticsearchClient = checkNotNull(elasticsearchClient, "client is null"); + } + + @Override + public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain tupleDomain) + { + ElasticsearchTableHandle elasticsearchTableHandle = checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); + + // elasticsearch connector has only one partition + List partitions = ImmutableList.of(new ElasticsearchPartition(elasticsearchTableHandle.getSchemaName(), elasticsearchTableHandle.getTableName())); + // elasticsearch connector does not do any additional processing/filtering with the TupleDomain, so just return the whole TupleDomain + return new ConnectorPartitionResult(partitions, tupleDomain); + } + + @Override + public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle, List partitions) + { + checkNotNull(partitions, "partitions is null"); + checkArgument(partitions.size() == 1, "Expected one partition but got %s", partitions.size()); + ConnectorPartition partition = partitions.get(0); + + ElasticsearchPartition elasticsearchPartition = checkType(partition, ElasticsearchPartition.class, "partition"); + + ElasticsearchTableHandle elasticsearchTableHandle = (ElasticsearchTableHandle) tableHandle; + ElasticsearchTable table = elasticsearchClient.getTable(elasticsearchTableHandle.getSchemaName(), elasticsearchTableHandle.getTableName()); + // this can happen if table is removed during a query + checkState(table != null, "Table %s.%s no longer exists", elasticsearchTableHandle.getSchemaName(), elasticsearchTableHandle.getTableName()); + + List splits = new ArrayList<>(); + for (ElasticsearchTableSource uri : table.getSources()) { + int clmsCount = table.getColumns().size(); + splits.add(new ElasticsearchSplit(connectorId, elasticsearchPartition.getSchemaName(), elasticsearchPartition.getTableName(), uri)); + } + Collections.shuffle(splits); + + return new FixedSplitSource(connectorId, splits); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java new file mode 100644 index 000000000000..719fe35c471d --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java @@ -0,0 +1,78 @@ +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnMetadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.isNullOrEmpty; + +public class ElasticsearchTable +{ + private final String name; + + private List columns; + private List columnsMetadata; + private final List sources; + + @JsonCreator + public ElasticsearchTable( + @JsonProperty("name") String name, + @JsonProperty("columns") List columns, + @JsonProperty("sources") List sources) + { + checkArgument(!isNullOrEmpty(name), "name is null or is empty"); + this.name = checkNotNull(name, "name is null"); + this.columns = ImmutableList.copyOf(checkNotNull(columns, "columns is null")); + this.sources = ImmutableList.copyOf(checkNotNull(sources, "sources is null")); + + ImmutableList.Builder columnsMetadata = ImmutableList.builder(); + for (ElasticsearchColumn column : this.columns) { + columnsMetadata.add(new ElasticsearchColumnMetadata(column.getName(), column.getType(), column.getJsonPath(), column.getJsonType(), false)); + } + this.columnsMetadata = columnsMetadata.build(); + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public List getColumns() + { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + @JsonProperty + public List getSources() + { + return sources; + } + + public List getColumnsMetadata() + { + //List bases = columnsMetadata; + //return bases; + + //List lis = new ArrayList(columnsMetadata); + //return lis; + + return new ArrayList<>(columnsMetadata); + } + + public void setColumnsMetadata(List columnsMetadata) { + this.columnsMetadata = columnsMetadata; + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java new file mode 100644 index 000000000000..2b57ed3f659b --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java @@ -0,0 +1,82 @@ + +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Joiner; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkNotNull; + +public final class ElasticsearchTableHandle + implements ConnectorTableHandle +{ + private final String connectorId; + private final String schemaName; + private final String tableName; + + @JsonCreator + public ElasticsearchTableHandle( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName) + { + this.connectorId = checkNotNull(connectorId, "connectorId is null"); + this.schemaName = checkNotNull(schemaName, "schemaName is null"); + this.tableName = checkNotNull(tableName, "tableName is null"); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + public SchemaTableName toSchemaTableName() + { + return new SchemaTableName(schemaName, tableName); + } + + @Override + public int hashCode() + { + return Objects.hash(connectorId, schemaName, tableName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + ElasticsearchTableHandle other = (ElasticsearchTableHandle) obj; + return Objects.equals(this.connectorId, other.connectorId) && + Objects.equals(this.schemaName, other.schemaName) && + Objects.equals(this.tableName, other.tableName); + } + + @Override + public String toString() + { + return Joiner.on(":").join(connectorId, schemaName, tableName); + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java new file mode 100644 index 000000000000..18fb8d3ac06d --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java @@ -0,0 +1,58 @@ +package com.facebook.presto.elasticsearch; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Created by sprinklr on 03/07/15. + */ +public class ElasticsearchTableSource { + + private String hostaddress; + private int port; + private String clusterName; + private String index; + private String type; + + @JsonCreator + public ElasticsearchTableSource( + @JsonProperty("hostaddress") String hostaddress, + @JsonProperty("port") int port, + @JsonProperty("clusterName") String clusterName, + @JsonProperty("index") String index, + @JsonProperty("type") String type) + { + this.hostaddress = checkNotNull(hostaddress, "hostaddress is null"); + this.port = checkNotNull(port, "port is null"); + this.clusterName = checkNotNull(clusterName, "clusterName is null"); + this.index = checkNotNull(index, "index is null"); + this.type = checkNotNull(type, "type is null"); + } + + @JsonProperty + public String getHostaddress() { + return hostaddress; + } + + @JsonProperty + public int getPort() { + return port; + } + + @JsonProperty + public String getClusterName() { + return clusterName; + } + + @JsonProperty + public String getIndex() { + return index; + } + + @JsonProperty + public String getType() { + return type; + } +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java new file mode 100644 index 000000000000..58345538b8df --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java @@ -0,0 +1,76 @@ +package com.facebook.presto.elasticsearch; + +// http://stackoverflow.com/questions/26183948/output-list-of-all-paths-to-leaf-nodes-in-a-json-document-in-java + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class MyJSONTest { + + /* + leaf is of the form -> path:obj + */ + private ArrayList leaves; + + public MyJSONTest() + { + leaves = new ArrayList(); + } + + public List getListJson(JSONObject json) throws JSONException { + listJSONObject("", json); + return leaves; + } + + private void listObject(String parent, Object data) throws JSONException { + if (data instanceof JSONObject) { + listJSONObject(parent, (JSONObject)data); + } else if (data instanceof JSONArray) { + listJSONArray(parent, (JSONArray) data); + } else { + listPrimitive(parent, data); + } + } + + private void listJSONObject(String parent, JSONObject json) throws JSONException { + Iterator it = json.keys(); + while (it.hasNext()) { + String key = (String)it.next(); + Object child = json.get(key); + String childKey = parent.isEmpty() ? key : parent + "." + key; + listObject(childKey, child); + } + } + + private void listJSONArray(String parent, JSONArray json) throws JSONException { + for (int i = 0; i < json.length(); i++) { + Object data = json.get(i); + listObject(parent, data); + } + } + + private void listPrimitive(String parent, Object obj) { + //System.out.println(parent + ":" + obj); + leaves.add(parent + ":" + obj.toString()); + } + + public static void main(String[] args) throws JSONException { + String data = "{\"store\":{\"book\":[{\"category\":\"reference\",\"author\":\"NigelRees\",\"title\":\"SayingsoftheCentury\",\"price\":8.95},{\"category\":\"fiction\",\"author\":\"HermanMelville\",\"title\":\"MobyDick\",\"isbn\":\"0-553-21311-3\",\"price\":8.99},],\"bicycle\":{\"color\":\"red\",\"price\":19.95}},\"expensive\":10}"; + JSONObject json = new JSONObject(data); + System.out.println(json.get("store")); + //System.out.println(json.toString(2)); + List leaves = (new MyJSONTest()).getListJson(json); + + for(String s : leaves) + { + System.out.println(s); + System.out.println("....."); + } + } + +} \ No newline at end of file diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java new file mode 100644 index 000000000000..e41ea47a8349 --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java @@ -0,0 +1,51 @@ + +package com.facebook.presto.elasticsearch; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Resources; +import io.airlift.json.JsonCodec; + +import javax.inject.Inject; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + + +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Maps.transformValues; +import static com.google.common.collect.Maps.uniqueIndex; +import static io.airlift.json.JsonCodec.listJsonCodec; +import static java.nio.charset.StandardCharsets.UTF_8; +import static io.airlift.json.JsonCodec.*; + +//import org.json.JSONException; +//import org.json.JSONObject; + +public class TestClient { + + + + public static void main( String[] args ) throws InterruptedException, IOException, ExecutionException { + + //getSpecificFields(); + //getJsonKeys_IndexType(); + + + System.out.println("Hello World!"); + } + + +} diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/Types.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/Types.java new file mode 100644 index 000000000000..ab9d26e4b57e --- /dev/null +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/Types.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.elasticsearch; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +final class Types +{ + private Types() {} + + public static B checkType(A value, Class target, String name) + { + checkNotNull(value, "%s is null", name); + checkArgument(target.isInstance(value), + "%s must be of type %s, not %s", + name, + target.getName(), + value.getClass().getName()); + return target.cast(value); + } +} diff --git a/presto-elasticsearch/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin b/presto-elasticsearch/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin new file mode 100644 index 000000000000..23045329e32d --- /dev/null +++ b/presto-elasticsearch/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin @@ -0,0 +1 @@ +com.facebook.presto.elasticsearch.ElasticsearchPlugin diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java new file mode 100644 index 000000000000..880e5969e160 --- /dev/null +++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java @@ -0,0 +1,39 @@ +package com.facebook.presto.elasticsearch; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + + } +}