diff --git a/docs/en/extending-doris/doris-on-es.md b/docs/en/extending-doris/doris-on-es.md index f61b458e96364b..0a65943414fd13 100644 --- a/docs/en/extending-doris/doris-on-es.md +++ b/docs/en/extending-doris/doris-on-es.md @@ -130,8 +130,8 @@ The following parameters are accepted by ES table: Parameter | Description ---|--- **hosts** | ES Cluster Connection Address, maybe one or more node, load-balance is also accepted -**index** | the related ES index name -**type** | the type for this index,If not specified, `_doc` will be used +**index** | the related ES index name, alias is supported, and if you use doc_value, you need to use the real name +**type** | the type for this index, If not specified, `_doc` will be used **user** | username for ES **password** | password for the user diff --git a/docs/zh-CN/extending-doris/doris-on-es.md b/docs/zh-CN/extending-doris/doris-on-es.md index 22a7367171ca32..b80bb7b2cc0a46 100644 --- a/docs/zh-CN/extending-doris/doris-on-es.md +++ b/docs/zh-CN/extending-doris/doris-on-es.md @@ -128,7 +128,7 @@ PROPERTIES ( 参数 | 说明 ---|--- **hosts** | ES集群地址,可以是一个或多个,也可以是ES前端的负载均衡地址 -**index** | 对应的ES的index名字 +**index** | 对应的ES的index名字,支持alias,如果使用doc_value,需要使用真实的名称 **type** | index的type,不指定的情况会使用_doc **user** | ES集群用户名 **password** | 对应用户的密码信息 diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 0d05468ca8fc0a..33e3f68e8127c2 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -127,7 +127,7 @@ import org.apache.doris.deploy.impl.AmbariDeployManager; import org.apache.doris.deploy.impl.K8sDeployManager; import org.apache.doris.deploy.impl.LocalFileDeployManager; -import org.apache.doris.external.elasticsearch.EsStateStore; +import org.apache.doris.external.elasticsearch.EsRepository; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; @@ -301,7 +301,7 @@ public class Catalog { private Daemon replayer; private Daemon timePrinter; private Daemon listener; - private EsStateStore esStateStore; // it is a daemon, so add it here + private EsRepository esRepository; // it is a daemon, so add it here private boolean isFirstTimeStartUp = false; private boolean isElectable; @@ -510,7 +510,7 @@ private Catalog() { this.auth = new PaloAuth(); this.domainResolver = new DomainResolver(auth); - this.esStateStore = new EsStateStore(); + this.esRepository = new EsRepository(); this.metaContext = new MetaContext(); this.metaContext.setThreadLocalInfo(); @@ -1290,7 +1290,7 @@ private void startNonMasterDaemonThreads() { // load and export job label cleaner thread labelCleaner.start(); // ES state store - esStateStore.start(); + esRepository.start(); // domain resolver domainResolver.start(); } @@ -1452,7 +1452,7 @@ public void loadImage(String imageDir) throws IOException, DdlException { // ATTN: this should be done after load Db, and before loadAlterJob recreateTabletInvertIndex(); // rebuild es state state - esStateStore.loadTableFromCatalog(); + esRepository.loadTableFromCatalog(); checksum = loadLoadJob(dis, checksum); checksum = loadAlterJob(dis, checksum); @@ -4376,7 +4376,7 @@ public boolean unprotectDropTable(Database db, long tableId) { } if (table.getType() == TableType.ELASTICSEARCH) { - esStateStore.deRegisterTable(tableId); + esRepository.deRegisterTable(tableId); } else if (table.getType() == TableType.OLAP) { // drop all temp partitions of this table, so that there is no temp partitions in recycle bin, // which make things easier. @@ -4828,8 +4828,8 @@ public String getMasterIp() { return this.masterIp; } - public EsStateStore getEsStateStore() { - return this.esStateStore; + public EsRepository getEsRepository() { + return this.esRepository; } public void setMaster(MasterInfo info) { diff --git a/fe/src/main/java/org/apache/doris/catalog/Database.java b/fe/src/main/java/org/apache/doris/catalog/Database.java index 5333d3177ae918..b287ada07e8959 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/src/main/java/org/apache/doris/catalog/Database.java @@ -304,7 +304,7 @@ public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfN Catalog.getCurrentCatalog().getEditLog().logCreateTable(info); } if (table.getType() == TableType.ELASTICSEARCH) { - Catalog.getCurrentCatalog().getEsStateStore().registerTable((EsTable)table); + Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable)table); } } return result; diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index c7d5e579dcd152..2a26d51b71a2fc 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -20,8 +20,12 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; +import org.apache.doris.external.elasticsearch.EsFieldInfos; import org.apache.doris.external.elasticsearch.EsMajorVersion; -import org.apache.doris.external.elasticsearch.EsTableState; +import org.apache.doris.external.elasticsearch.EsNodeInfo; +import org.apache.doris.external.elasticsearch.EsRestClient; +import org.apache.doris.external.elasticsearch.EsShardPartitions; +import org.apache.doris.external.elasticsearch.EsTablePartitions; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -71,7 +75,7 @@ public class EsTable extends Table { // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState private PartitionInfo partitionInfo; - private EsTableState esTableState; + private EsTablePartitions esTablePartitions; private boolean enableDocValueScan = false; private boolean enableKeywordSniff = true; @@ -114,18 +118,19 @@ public EsTable(long id, String name, List schema, validate(properties); } - public void addFetchField(String originName, String replaceName) { - fieldsContext.put(originName, replaceName); + public void addFieldInfos(EsFieldInfos esFieldInfos) { + if (enableKeywordSniff && esFieldInfos.getFieldsContext() != null) { + fieldsContext = esFieldInfos.getFieldsContext(); + } + if (enableDocValueScan && esFieldInfos.getDocValueContext() != null) { + docValueContext = esFieldInfos.getDocValueContext(); + } } public Map fieldsContext() { return fieldsContext; } - public void addDocValueField(String name, String fieldsName) { - docValueContext.put(name, fieldsName); - } - public Map docValueContext() { return docValueContext; } @@ -386,12 +391,12 @@ public PartitionInfo getPartitionInfo() { return partitionInfo; } - public EsTableState getEsTableState() { - return esTableState; + public EsTablePartitions getEsTablePartitions() { + return esTablePartitions; } - public void setEsTableState(EsTableState esTableState) { - this.esTableState = esTableState; + public void setEsTablePartitions(EsTablePartitions esTablePartitions) { + this.esTablePartitions = esTablePartitions; } public Throwable getLastMetaDataSyncException() { @@ -401,4 +406,29 @@ public Throwable getLastMetaDataSyncException() { public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) { this.lastMetaDataSyncException = lastMetaDataSyncException; } + + /** + * sync es index meta from remote + * @param client esRestClient + */ + public void syncESIndexMeta(EsRestClient client) { + try { + EsFieldInfos fieldInfos = client.getFieldInfos(this.indexName, this.mappingType, this.fullSchema); + EsShardPartitions esShardPartitions = client.getShardPartitions(this.indexName); + Map nodesInfo = client.getHttpNodes(); + if (this.enableKeywordSniff || this.enableDocValueScan) { + addFieldInfos(fieldInfos); + } + + this.esTablePartitions = EsTablePartitions.fromShardPartitions(this, esShardPartitions); + + if (EsTable.TRANSPORT_HTTP.equals(getTransport())) { + this.esTablePartitions.addHttpAddress(nodesInfo); + } + } catch (Throwable e) { + LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e); + this.esTablePartitions = null; + this.lastMetaDataSyncException = e; + } + } } diff --git a/fe/src/main/java/org/apache/doris/common/proc/EsPartitionsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/EsPartitionsProcDir.java index 896201e51def6b..5b470e991ec3ff 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/EsPartitionsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/EsPartitionsProcDir.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.AnalysisException; -import org.apache.doris.external.elasticsearch.EsIndexState; +import org.apache.doris.external.elasticsearch.EsShardPartitions; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -66,33 +66,33 @@ public ProcResult fetchResult() throws AnalysisException { try { RangePartitionInfo rangePartitionInfo = null; if (esTable.getPartitionInfo().getType() == PartitionType.RANGE) { - rangePartitionInfo = (RangePartitionInfo) esTable.getEsTableState().getPartitionInfo(); + rangePartitionInfo = (RangePartitionInfo) esTable.getEsTablePartitions().getPartitionInfo(); } Joiner joiner = Joiner.on(", "); - Map unPartitionedIndices = esTable.getEsTableState().getUnPartitionedIndexStates(); - Map partitionedIndices = esTable.getEsTableState().getPartitionedIndexStates(); - for (EsIndexState esIndexState : unPartitionedIndices.values()) { + Map unPartitionedIndices = esTable.getEsTablePartitions().getUnPartitionedIndexStates(); + Map partitionedIndices = esTable.getEsTablePartitions().getPartitionedIndexStates(); + for (EsShardPartitions esShardPartitions : unPartitionedIndices.values()) { List partitionInfo = new ArrayList(); - partitionInfo.add(esIndexState.getIndexName()); + partitionInfo.add(esShardPartitions.getIndexName()); partitionInfo.add("-"); // partition key partitionInfo.add("-"); // range partitionInfo.add("-"); // dis - partitionInfo.add(esIndexState.getShardRoutings().size()); // shards + partitionInfo.add(esShardPartitions.getShardRoutings().size()); // shards partitionInfo.add(1); // replica num partitionInfos.add(partitionInfo); } - for (EsIndexState esIndexState : partitionedIndices.values()) { + for (EsShardPartitions esShardPartitions : partitionedIndices.values()) { List partitionInfo = new ArrayList(); - partitionInfo.add(esIndexState.getIndexName()); + partitionInfo.add(esShardPartitions.getIndexName()); List partitionColumns = rangePartitionInfo.getPartitionColumns(); List colNames = new ArrayList(); for (Column column : partitionColumns) { colNames.add(column.getName()); } partitionInfo.add(joiner.join(colNames)); // partition key - partitionInfo.add(rangePartitionInfo.getRange(esIndexState.getPartitionId()).toString()); // range + partitionInfo.add(rangePartitionInfo.getRange(esShardPartitions.getPartitionId()).toString()); // range partitionInfo.add("-"); // dis - partitionInfo.add(esIndexState.getShardRoutings().size()); // shards + partitionInfo.add(esShardPartitions.getShardRoutings().size()); // shards partitionInfo.add(1); // replica num partitionInfos.add(partitionInfo); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/EsShardProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/EsShardProcDir.java index 4cba64ecaeefbc..45e34ffaea7bd0 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/EsShardProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/EsShardProcDir.java @@ -25,7 +25,7 @@ import org.apache.doris.catalog.EsTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.ListComparator; -import org.apache.doris.external.elasticsearch.EsIndexState; +import org.apache.doris.external.elasticsearch.EsShardPartitions; import org.apache.doris.external.elasticsearch.EsShardRouting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -55,9 +55,9 @@ public ProcResult fetchResult() { db.readLock(); try { // get infos - EsIndexState esIndexState = esTable.getEsTableState().getIndexState(indexName); - for (int shardId : esIndexState.getShardRoutings().keySet()) { - List shardRoutings = esIndexState.getShardRoutings().get(shardId); + EsShardPartitions esShardPartitions = esTable.getEsTablePartitions().getEsShardPartitions(indexName); + for (int shardId : esShardPartitions.getShardRoutings().keySet()) { + List shardRoutings = esShardPartitions.getShardRoutings().get(shardId); if (shardRoutings != null && shardRoutings.size() > 0) { for (EsShardRouting esShardRouting : shardRoutings) { List shardInfo = new ArrayList(); diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java new file mode 100644 index 00000000000000..59d251118d471f --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.EsTable; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * It is used to hold the field information obtained from es, currently including the fields and docValue, + * it will eventually be added to the EsTable + **/ +public class EsFieldInfos { + + private static final Logger LOG = LogManager.getLogger(EsFieldInfos.class); + + // userId => userId.keyword + private Map fieldsContext; + + // city => city.raw + private Map docValueContext; + + public EsFieldInfos(Map fieldsContext, Map docValueContext) { + this.fieldsContext = fieldsContext; + this.docValueContext = docValueContext; + } + + public Map getFieldsContext() { + return fieldsContext; + } + + public Map getDocValueContext() { + return docValueContext; + } + + /** + * Parse the required field information from the json + * @param colList table column + * @param indexName indexName(alias or really name) + * @param indexMapping the return value of _mapping + * @param docType The docType used by the index + * @return fieldsContext and docValueContext + * @throws Exception + */ + public static EsFieldInfos fromMapping(List colList, String indexName, String indexMapping, String docType) throws ExternalDataSourceException { + JSONObject jsonObject = new JSONObject(indexMapping); + // the indexName use alias takes the first mapping + Iterator keys = jsonObject.keys(); + String docKey = keys.next(); + JSONObject docData = jsonObject.optJSONObject(docKey); + //{ + // "mappings": { + // "doc": { + // "dynamic": "strict", + // "properties": { + // "time": { + // "type": "long" + // }, + // "type": { + // "type": "keyword" + // }, + // "userId": { + // "type": "text", + // "fields": { + // "keyword": { + // "type": "keyword" + // } + // } + // } + // } + // } + // } + //} + JSONObject mappings = docData.optJSONObject("mappings"); + JSONObject rootSchema = mappings.optJSONObject(docType); + JSONObject properties; + // no type in es7 + if (rootSchema == null) { + properties = mappings.optJSONObject("properties"); + } else { + properties = rootSchema.optJSONObject("properties"); + } + if (properties == null) { + throw new ExternalDataSourceException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster"); + } + return parseProperties(colList, properties); + } + + // get fields information in properties + private static EsFieldInfos parseProperties(List colList, JSONObject properties) { + if (properties == null) { + return null; + } + Map fieldsMap = new HashMap<>(); + Map docValueMap = new HashMap<>(); + for (Column col : colList) { + String colName = col.getName(); + if (!properties.has(colName)) { + continue; + } + JSONObject fieldObject = properties.optJSONObject(colName); + String keywordField = getKeywordField(fieldObject, colName); + if (StringUtils.isNotEmpty(keywordField)) { + fieldsMap.put(colName, keywordField); + } + String docValueField = getDocValueField(fieldObject, colName); + if (StringUtils.isNotEmpty(docValueField)) { + docValueMap.put(colName, docValueField); + } + } + return new EsFieldInfos(fieldsMap, docValueMap); + } + + // get a field of keyword type in the fields + private static String getKeywordField(JSONObject fieldObject, String colName) { + String fieldType = fieldObject.optString("type"); + // string-type field used keyword type to generate predicate + // if text field type seen, we should use the `field` keyword type? + if ("text".equals(fieldType)) { + JSONObject fieldsObject = fieldObject.optJSONObject("fields"); + if (fieldsObject != null) { + for (String key : fieldsObject.keySet()) { + JSONObject innerTypeObject = fieldsObject.optJSONObject(key); + // just for text type + if ("keyword".equals(innerTypeObject.optString("type"))) { + return colName + "." + key; + } + } + } + } + return null; + } + + private static String getDocValueField(JSONObject fieldObject, String colName) { + String fieldType = fieldObject.optString("type"); + String docValueField = null; + if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) { + JSONObject fieldsObject = fieldObject.optJSONObject("fields"); + if (fieldsObject != null) { + for (String key : fieldsObject.keySet()) { + JSONObject innerTypeObject = fieldsObject.optJSONObject(key); + if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) { + continue; + } + if (innerTypeObject.has("doc_values")) { + boolean docValue = innerTypeObject.getBoolean("doc_values"); + if (docValue) { + docValueField = colName; + } + } else { + // a : {c : {}} -> a -> a.c + docValueField = colName + "." + key; + } + } + } + return docValueField; + } + // set doc_value = false manually + if (fieldObject.has("doc_values")) { + boolean docValue = fieldObject.optBoolean("doc_values"); + if (!docValue) { + return docValueField; + } + } + docValueField = colName; + return docValueField; + } +} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java new file mode 100644 index 00000000000000..fe574ce5b73554 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.EsTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Table.TableType; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + + +/** + * It is used to call es api to get shard allocation state + */ +public class EsRepository extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(EsRepository.class); + + private Map esTables; + + private Map esClients; + + public EsRepository() { + super("es repository", Config.es_state_sync_interval_second * 1000); + esTables = Maps.newConcurrentMap(); + esClients = Maps.newConcurrentMap(); + } + + public void registerTable(EsTable esTable) { + if (Catalog.isCheckpointThread()) { + return; + } + esTables.put(esTable.getId(), esTable); + esClients.put(esTable.getId(), + new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd())); + LOG.info("register a new table [{}] to sync list", esTable); + } + + public void deRegisterTable(long tableId) { + esTables.remove(tableId); + esClients.remove(tableId); + LOG.info("deregister table [{}] from sync list", tableId); + } + + @Override + protected void runAfterCatalogReady() { + for (EsTable esTable : esTables.values()) { + try { + EsRestClient client = esClients.get(esTable.getId()); + esTable.syncESIndexMeta(client); + } catch (Throwable e) { + LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e); + esTable.setEsTablePartitions(null); + esTable.setLastMetaDataSyncException(e); + } + } + } + + // should call this method to init the state store after loading image + // the rest of tables will be added or removed by replaying edit log + // when fe is start to load image, should call this method to init the state store + public void loadTableFromCatalog() { + if (Catalog.isCheckpointThread()) { + return; + } + List dbIds = Catalog.getCurrentCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database database = Catalog.getCurrentCatalog().getDb(dbId); + + List tables = database.getTables(); + for (Table table : tables) { + if (table.getType() == TableType.ELASTICSEARCH) { + esTables.put(table.getId(), (EsTable) table); + } + } + } + } +} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 83cbc0cf42c21a..adfc294faa34b9 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -17,6 +17,8 @@ package org.apache.doris.external.elasticsearch; +import org.apache.doris.catalog.Column; +import org.apache.http.HttpHeaders; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.util.Strings; @@ -24,111 +26,100 @@ import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; - import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; - import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; public class EsRestClient { + private static final Logger LOG = LogManager.getLogger(EsRestClient.class); private ObjectMapper mapper; - + { mapper = new ObjectMapper(); mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false); mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false); } - + private static OkHttpClient networkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) .build(); - - private String basicAuth; - - private int nextClient = 0; + + private Request.Builder builder; private String[] nodes; private String currentNode; - + private int currentNodeIndex = 0; + public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, + Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } - + public Map getHttpNodes() throws Exception { Map> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map nodes = new HashMap<>(); + Map nodesMap = new HashMap<>(); for (Map.Entry> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; } - - public String getIndexMetaData(String indexName) throws Exception { - String path = "_cluster/state?indices=" + indexName - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; - return execute(path); - + + public EsFieldInfos getFieldInfos(String indexName, String docType, List colList) throws Exception { + String path = indexName + "/_mapping"; + String indexMapping = execute(path); + if (indexMapping == null) { + throw new ExternalDataSourceException( "index[" + indexName + "] not found for the Elasticsearch Cluster"); + } + return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType); } - /** - * - * Get the Elasticsearch cluster version - * - * @return - */ - public EsMajorVersion version() throws Exception { - Map versionMap = get("/", "version"); - - EsMajorVersion majorVersion; - try { - majorVersion = EsMajorVersion.parse(versionMap.get("version")); - } catch (Exception e) { - LOG.warn("detect es version failure on node [{}]", currentNode); - return EsMajorVersion.V_5_X; + + public EsShardPartitions getShardPartitions(String indexName) throws Exception { + String path = indexName + "/_search_shards"; + String searchShards = execute(path); + if (searchShards == null) { + throw new ExternalDataSourceException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster"); } - return majorVersion; + return EsShardPartitions.findShardPartitions(indexName, searchShards); } - + /** - * execute request for specific path + * execute request for specific path,it will try again nodes.length times if it fails * * @param path the path must not leading with '/' - * @return + * @return response */ private String execute(String path) throws Exception { - selectNextNode(); - boolean nextNode; + int retrySize = nodes.length; Exception scratchExceptionForThrow = null; - do { - Request.Builder builder = new Request.Builder(); - if (!Strings.isEmpty(basicAuth)) { - builder.addHeader("Authorization", basicAuth); - } - + for (int i = 0; i < retrySize; i++) { // maybe should add HTTP schema to the address // actually, at this time we can only process http protocol // NOTE. currentNode may have some spaces. @@ -139,11 +130,13 @@ private String execute(String path) throws Exception { if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) { currentNode = "http://" + currentNode; } - Request request = builder.get() .url(currentNode + "/" + path) .build(); Response response = null; + if (LOG.isTraceEnabled()) { + LOG.trace("es rest client request URL: {}", currentNode + "/" + path); + } try { response = networkClient.newCall(request).execute(); if (response.isSuccessful()) { @@ -157,21 +150,20 @@ private String execute(String path) throws Exception { response.close(); } } - nextNode = selectNextNode(); - if (!nextNode) { - LOG.warn("try all nodes [{}],no other nodes left", nodes); - } - } while (nextNode); + selectNextNode(); + } + LOG.warn("try all nodes [{}],no other nodes left", nodes); if (scratchExceptionForThrow != null) { throw scratchExceptionForThrow; } return null; } - + public T get(String q, String key) throws Exception { return parseContent(execute(q), key); } - + + @SuppressWarnings("unchecked") private T parseContent(String response, String key) { Map map = Collections.emptyMap(); try { @@ -182,5 +174,4 @@ private T parseContent(String response, String key) { } return (T) (key != null ? map.get(key) : map); } - } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsIndexState.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java similarity index 57% rename from fe/src/main/java/org/apache/doris/external/elasticsearch/EsIndexState.java rename to fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java index d2d0e90faa927f..30e12a07acb987 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsIndexState.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java @@ -17,30 +17,22 @@ package org.apache.doris.external.elasticsearch; -import org.apache.doris.analysis.PartitionKeyDesc; -import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.SingleRangePartitionDesc; -import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionKey; -import org.apache.doris.catalog.RangePartitionInfo; -import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TNetworkAddress; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONArray; import org.json.JSONObject; - import java.util.List; import java.util.Map; import java.util.Random; -public class EsIndexState { - - private static final Logger LOG = LogManager.getLogger(EsIndexState.class); +public class EsShardPartitions { + + private static final Logger LOG = LogManager.getLogger(EsShardPartitions.class); private final String indexName; // shardid -> host1, host2, host3 @@ -48,14 +40,48 @@ public class EsIndexState { private SingleRangePartitionDesc partitionDesc; private PartitionKey partitionKey; private long partitionId = -1; - - public EsIndexState(String indexName) { + + public EsShardPartitions(String indexName) { this.indexName = indexName; this.shardRoutings = Maps.newHashMap(); this.partitionDesc = null; this.partitionKey = null; } - + + /** + * Parse shardRoutings from the json + * @param indexName indexName(alias or really name) + * @param searchShards the return value of _search_shards + * @return shardRoutings is used for searching + */ + public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws ExternalDataSourceException { + EsShardPartitions indexState = new EsShardPartitions(indexName); + JSONObject jsonObject = new JSONObject(searchShards); + JSONObject nodesMap = jsonObject.getJSONObject("nodes"); + JSONArray shards = jsonObject.getJSONArray("shards"); + int length = shards.length(); + for (int i = 0; i < length; i++) { + List singleShardRouting = Lists.newArrayList(); + JSONArray shardsArray = shards.getJSONArray(i); + int arrayLength = shardsArray.length(); + for (int j = 0; j < arrayLength; j++) { + JSONObject shard = shardsArray.getJSONObject(j); + String shardState = shard.getString("state"); + if ("STARTED".equalsIgnoreCase(shardState) || "RELOCATING".equalsIgnoreCase(shardState)) { + try { + singleShardRouting.add(EsShardRouting.parseShardRouting(shardState, String.valueOf(i), shard, nodesMap)); + } catch (Exception e) { + throw new ExternalDataSourceException( "index[" + indexName + "] findShardPartitions error"); + } + } + } + if (singleShardRouting.isEmpty()) { + LOG.warn("could not find a healthy allocation for [{}][{}]", indexName, i); + } + indexState.addShardRouting(i, singleShardRouting); + } + return indexState; + } public void addHttpAddress(Map nodesInfo) { for (Map.Entry> entry : shardRoutings.entrySet()) { @@ -76,66 +102,15 @@ public TNetworkAddress randomAddress(Map nodesInfo) { EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); return nodeInfos[seed].getPublishAddress(); } - - public static EsIndexState parseIndexStateV55(String indexName, JSONObject indicesRoutingMap, - JSONObject nodesMap, - JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws AnalysisException { - EsIndexState indexState = new EsIndexState(indexName); - JSONObject shardRoutings = indicesRoutingMap.getJSONObject(indexName).getJSONObject("shards"); - for (String shardKey : shardRoutings.keySet()) { - List singleShardRouting = Lists.newArrayList(); - JSONArray shardRouting = shardRoutings.getJSONArray(shardKey); - for (int i = 0; i < shardRouting.length(); ++i) { - JSONObject shard = shardRouting.getJSONObject(i); - String shardState = shard.getString("state"); - if ("STARTED".equalsIgnoreCase(shardState)) { - try { - singleShardRouting.add(EsShardRouting.parseShardRoutingV55(shardState, - shardKey, shard, nodesMap)); - } catch (Exception e) { - LOG.info("errors while parse shard routing from json [{}], ignore this shard", shard.toString(), e); - } - } - } - if (singleShardRouting.isEmpty()) { - LOG.warn("could not find a healthy allocation for [{}][{}]", indexName, shardKey); - } - indexState.addShardRouting(Integer.valueOf(shardKey), singleShardRouting); - } - // get some meta info from es, could be used to prune index when query - // index.bpack.partition.upperbound: stu_age - if (partitionInfo != null && partitionInfo instanceof RangePartitionInfo) { - JSONObject indexMeta = indicesMetaMap.getJSONObject(indexName); - JSONObject partitionSetting = EsUtil.getJsonObject(indexMeta, "settings.index.bpack.partition", 0); - LOG.debug("index {} range partition setting is {}", indexName, - partitionSetting == null ? "" : partitionSetting.toString()); - if (partitionSetting != null && partitionSetting.has("upperbound")) { - String upperBound = partitionSetting.getString("upperbound"); - List upperValues = Lists.newArrayList(new PartitionValue(upperBound)); - PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(upperValues); - // use index name as partition name - SingleRangePartitionDesc desc = new SingleRangePartitionDesc(false, - indexName, partitionKeyDesc, null); - PartitionKey partitionKey = PartitionKey.createPartitionKey( - desc.getPartitionKeyDesc().getUpperValues(), - ((RangePartitionInfo) partitionInfo).getPartitionColumns()); - desc.analyze(((RangePartitionInfo) partitionInfo).getPartitionColumns().size(), null); - indexState.setPartitionDesc(desc); - indexState.setPartitionKey(partitionKey); - } - } - return indexState; - } - public void addShardRouting(int shardId, List singleShardRouting) { shardRoutings.put(shardId, singleShardRouting); } - + public String getIndexName() { return indexName; } - + public Map> getShardRoutings() { return shardRoutings; } @@ -167,6 +142,6 @@ public void setPartitionId(long partitionId) { @Override public String toString() { return "EsIndexState [indexName=" + indexName + ", partitionDesc=" + partitionDesc + ", partitionKey=" - + partitionKey + "]"; + + partitionKey + "]"; } } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java index c4474ae4eb50c4..4f3fa2f1864892 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java @@ -41,7 +41,7 @@ public EsShardRouting(String indexName, int shardId, boolean isPrimary, TNetwork this.nodeId = nodeId; } - public static EsShardRouting parseShardRoutingV55(String indexName, String shardKey, + public static EsShardRouting parseShardRouting(String indexName, String shardKey, JSONObject shardInfo, JSONObject nodesMap) { String nodeId = shardInfo.getString("node"); JSONObject nodeInfo = nodesMap.getJSONObject(nodeId); @@ -51,10 +51,10 @@ public static EsShardRouting parseShardRoutingV55(String indexName, String shard // In http transport mode, should ignore thrift_port, set address to null TNetworkAddress addr = null; if (!StringUtils.isEmpty(thriftPort)) { - addr = new TNetworkAddress(transportAddr[0], Integer.valueOf(thriftPort)); + addr = new TNetworkAddress(transportAddr[0], Integer.parseInt(thriftPort)); } boolean isPrimary = shardInfo.getBoolean("primary"); - return new EsShardRouting(indexName, Integer.valueOf(shardKey), + return new EsShardRouting(indexName, Integer.parseInt(shardKey), isPrimary, addr, nodeId); } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsStateStore.java deleted file mode 100644 index d22bb704b4b51b..00000000000000 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsStateStore.java +++ /dev/null @@ -1,282 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.elasticsearch; - -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.EsTable; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.PartitionKey; -import org.apache.doris.catalog.RangePartitionInfo; -import org.apache.doris.catalog.SinglePartitionInfo; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Table.TableType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.MasterDaemon; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - - -/** - * it is used to call es api to get shard allocation state - */ -public class EsStateStore extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(EsStateStore.class); - - private Map esTables; - - public EsStateStore() { - super("es state store", Config.es_state_sync_interval_second * 1000); - esTables = Maps.newConcurrentMap(); - } - - public void registerTable(EsTable esTable) { - if (Catalog.isCheckpointThread()) { - return; - } - esTables.put(esTable.getId(), esTable); - LOG.info("register a new table [{}] to sync list", esTable.toString()); - } - - public void deRegisterTable(long tableId) { - esTables.remove(tableId); - LOG.info("deregister table [{}] from sync list", tableId); - } - - @Override - protected void runAfterCatalogReady() { - for (EsTable esTable : esTables.values()) { - try { - EsRestClient client = new EsRestClient(esTable.getSeeds(), - esTable.getUserName(), esTable.getPasswd()); - // if user not specify the es version, try to get the remote cluster versoin - // in the future, we maybe need this version - String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); - if (indexMetaData == null) { - esTable.setLastMetaDataSyncException(new Exception("fetch index meta data failure from /_cluster/state")); - // set null for checking in EsScanNode#getShardLocations - esTable.setEsTableState(null); - continue; - } - EsTableState esTableState = getTableState(indexMetaData, esTable); - if (esTableState == null) { - continue; - } - if (EsTable.TRANSPORT_HTTP.equals(esTable.getTransport())) { - Map nodesInfo = client.getHttpNodes(); - esTableState.addHttpAddress(nodesInfo); - } - esTable.setEsTableState(esTableState); - } catch (Throwable e) { - LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e); - esTable.setEsTableState(null); - esTable.setLastMetaDataSyncException(e); - } - } - } - - // should call this method to init the state store after loading image - // the rest of tables will be added or removed by replaying edit log - // when fe is start to load image, should call this method to init the state store - public void loadTableFromCatalog() { - if (Catalog.isCheckpointThread()) { - return; - } - List dbIds = Catalog.getCurrentCatalog().getDbIds(); - for (Long dbId : dbIds) { - Database database = Catalog.getCurrentCatalog().getDb(dbId); - - List
tables = database.getTables(); - for (Table table : tables) { - if (table.getType() == TableType.ELASTICSEARCH) { - esTables.put(table.getId(), (EsTable) table); - } - } - } - } - - @VisibleForTesting - public EsTableState getTableState(String responseStr, EsTable esTable) - throws DdlException, AnalysisException, ExternalDataSourceException { - JSONObject jsonObject = new JSONObject(responseStr); - JSONObject nodesMap = jsonObject.getJSONObject("nodes"); - // we build the doc value context for fields maybe used for scanning - // "properties": { - // "city": { - // "type": "text", // text field does not have docvalue - // "fields": { - // "raw": { - // "type": "keyword" - // } - // } - // } - // } - // then the docvalue context provided the mapping between the select field and real request field : - // {"city": "city.raw"} - JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices"); - JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName()); - if (indexMetaMap == null) { - esTable.setLastMetaDataSyncException(new Exception( "index[" + esTable.getIndexName() + "] not found for the Elasticsearch Cluster")); - return null; - } - if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) { - JSONObject mappings = indexMetaMap.optJSONObject("mappings"); - JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType()); - if (rootSchema == null) { - esTable.setLastMetaDataSyncException(new Exception( "type[" + esTable.getMappingType() + "] not found for the Elasticsearch Cluster with index: [" + esTable.getIndexName() + "]")); - return null; - } - JSONObject schema = rootSchema.optJSONObject("properties"); - List colList = esTable.getFullSchema(); - for (Column col : colList) { - String colName = col.getName(); - if (!schema.has(colName)) { - continue; - } - JSONObject fieldObject = schema.optJSONObject(colName); - String fieldType = fieldObject.optString("type"); - // string-type field used keyword type to generate predicate - if (esTable.isKeywordSniffEnable()) { - // if text field type seen, we should use the `field` keyword type? - if ("text".equals(fieldType)) { - JSONObject fieldsObject = fieldObject.optJSONObject("fields"); - if (fieldsObject != null) { - for (String key : fieldsObject.keySet()) { - JSONObject innerTypeObject = fieldsObject.optJSONObject(key); - // just for text type - if ("keyword".equals(innerTypeObject.optString("type"))) { - esTable.addFetchField(colName, colName + "." + key); - } - } - } - } - } - if (esTable.isDocValueScanEnable()) { - if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) { - JSONObject fieldsObject = fieldObject.optJSONObject("fields"); - if (fieldsObject != null) { - for (String key : fieldsObject.keySet()) { - JSONObject innerTypeObject = fieldsObject.optJSONObject(key); - if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) { - continue; - } - if (innerTypeObject.has("doc_values")) { - boolean docValue = innerTypeObject.getBoolean("doc_values"); - if (docValue) { - esTable.addDocValueField(colName, colName); - } - } else { - // a : {c : {}} -> a -> a.c - esTable.addDocValueField(colName, colName + "." + key); - } - } - } - // skip this field - continue; - } - // set doc_value = false manually - if (fieldObject.has("doc_values")) { - boolean docValue = fieldObject.optBoolean("doc_values"); - if (!docValue) { - continue; - } - } - esTable.addDocValueField(colName, colName); - } - } - } - - JSONObject indicesRoutingMap = jsonObject.getJSONObject("routing_table").getJSONObject("indices"); - EsTableState esTableState = new EsTableState(); - PartitionInfo partitionInfo = null; - if (esTable.getPartitionInfo() != null) { - if (esTable.getPartitionInfo() instanceof RangePartitionInfo) { - RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) esTable.getPartitionInfo(); - partitionInfo = new RangePartitionInfo(rangePartitionInfo.getPartitionColumns()); - esTableState.setPartitionInfo(partitionInfo); - if (LOG.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - int idx = 0; - for (Column column : rangePartitionInfo.getPartitionColumns()) { - if (idx != 0) { - sb.append(", "); - } - sb.append("`").append(column.getName()).append("`"); - idx++; - } - sb.append(")"); - LOG.debug("begin to parse es table [{}] state from cluster state," - + " with partition info [{}]", esTable.getName(), sb.toString()); - } - } else if (esTable.getPartitionInfo() instanceof SinglePartitionInfo) { - LOG.debug("begin to parse es table [{}] state from cluster state, " - + "with no partition info", esTable.getName()); - } else { - throw new ExternalDataSourceException("es table only support range partition, " - + "but current partition type is " - + esTable.getPartitionInfo().getType()); - } - } - - - for (String indexName : indicesRoutingMap.keySet()) { - EsIndexState indexState = EsIndexState.parseIndexStateV55(indexName, - indicesRoutingMap, nodesMap, - indicesMetaMap, partitionInfo); - esTableState.addIndexState(indexName, indexState); - LOG.debug("add index {} to es table {}", indexState, esTable.getName()); - } - - if (partitionInfo instanceof RangePartitionInfo) { - // sort the index state according to partition key and then add to range map - List esIndexStates = esTableState.getPartitionedIndexStates().values() - .stream().collect(Collectors.toList()); - Collections.sort(esIndexStates, new Comparator() { - @Override - public int compare(EsIndexState o1, EsIndexState o2) { - return o1.getPartitionKey().compareTo(o2.getPartitionKey()); - } - }); - long partitionId = 0; - for (EsIndexState esIndexState : esIndexStates) { - Range range = ((RangePartitionInfo) partitionInfo).handleNewSinglePartitionDesc( - esIndexState.getPartitionDesc(), partitionId, false); - esTableState.addPartition(esIndexState.getIndexName(), partitionId); - esIndexState.setPartitionId(partitionId); - ++partitionId; - LOG.debug("add parition to es table [{}] with range [{}]", esTable.getName(), range); - } - } - return esTableState; - } -} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java new file mode 100644 index 00000000000000..52630868edaa7e --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.EsTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.common.DdlException; +import org.apache.doris.thrift.TNetworkAddress; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * save the dynamic info parsed from es cluster state such as shard routing, partition info + */ +public class EsTablePartitions { + + private static final Logger LOG = LogManager.getLogger(EsTablePartitions.class); + + private PartitionInfo partitionInfo; + private Map partitionIdToIndices; + private Map partitionedIndexStates; + private Map unPartitionedIndexStates; + + public EsTablePartitions() { + partitionInfo = null; + partitionIdToIndices = Maps.newHashMap(); + partitionedIndexStates = Maps.newHashMap(); + unPartitionedIndexStates = Maps.newHashMap(); + } + + public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPartitions shardPartitions) + throws ExternalDataSourceException, DdlException { + EsTablePartitions esTablePartitions = new EsTablePartitions(); + RangePartitionInfo partitionInfo = null; + if (esTable.getPartitionInfo() != null) { + if (esTable.getPartitionInfo() instanceof RangePartitionInfo) { + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) esTable.getPartitionInfo(); + partitionInfo = new RangePartitionInfo(rangePartitionInfo.getPartitionColumns()); + esTablePartitions.setPartitionInfo(partitionInfo); + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + int idx = 0; + for (Column column : rangePartitionInfo.getPartitionColumns()) { + if (idx != 0) { + sb.append(", "); + } + sb.append("`").append(column.getName()).append("`"); + idx++; + } + sb.append(")"); + LOG.debug("begin to parse es table [{}] state from search shards," + + " with partition info [{}]", esTable.getName(), sb.toString()); + } + } else if (esTable.getPartitionInfo() instanceof SinglePartitionInfo) { + LOG.debug("begin to parse es table [{}] state from search shards, " + + "with no partition info", esTable.getName()); + } else { + throw new ExternalDataSourceException("es table only support range partition, " + + "but current partition type is " + + esTable.getPartitionInfo().getType()); + } + } + esTablePartitions.addIndexState(esTable.getIndexName(), shardPartitions); + LOG.debug("add index {} to es table {}", shardPartitions, esTable.getName()); + if (partitionInfo != null) { + // sort the index state according to partition key and then add to range map + List esShardPartitionsList = new ArrayList<>( + esTablePartitions.getPartitionedIndexStates().values()); + esShardPartitionsList.sort(Comparator.comparing(EsShardPartitions::getPartitionKey)); + long partitionId = 0; + for (EsShardPartitions esShardPartitions : esShardPartitionsList) { + Range range = partitionInfo.handleNewSinglePartitionDesc( + esShardPartitions.getPartitionDesc(), partitionId, false); + esTablePartitions.addPartition(esShardPartitions.getIndexName(), partitionId); + esShardPartitions.setPartitionId(partitionId); + ++partitionId; + LOG.debug("add parition to es table [{}] with range [{}]", esTable.getName(), + range); + } + } + return esTablePartitions; + } + + public void addHttpAddress(Map nodesInfo) { + for (EsShardPartitions indexState : partitionedIndexStates.values()) { + indexState.addHttpAddress(nodesInfo); + } + for (EsShardPartitions indexState : unPartitionedIndexStates.values()) { + indexState.addHttpAddress(nodesInfo); + } + + } + + public TNetworkAddress randomAddress(Map nodesInfo) { + int seed = new Random().nextInt() % nodesInfo.size(); + EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); + return nodeInfos[seed].getPublishAddress(); + } + + public PartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public void setPartitionInfo(PartitionInfo partitionInfo) { + this.partitionInfo = partitionInfo; + } + + public Map getPartitionIdToIndices() { + return partitionIdToIndices; + } + + public void addPartition(String indexName, long partitionId) { + partitionIdToIndices.put(partitionId, indexName); + } + + public void addIndexState(String indexName, EsShardPartitions indexState) { + if (indexState.getPartitionDesc() != null) { + partitionedIndexStates.put(indexName, indexState); + } else { + unPartitionedIndexStates.put(indexName, indexState); + } + } + + public Map getPartitionedIndexStates() { + return partitionedIndexStates; + } + + public Map getUnPartitionedIndexStates() { + return unPartitionedIndexStates; + } + + public EsShardPartitions getEsShardPartitions(long partitionId) { + if (partitionIdToIndices.containsKey(partitionId)) { + return partitionedIndexStates.get(partitionIdToIndices.get(partitionId)); + } + return null; + } + + public EsShardPartitions getEsShardPartitions(String indexName) { + if (partitionedIndexStates.containsKey(indexName)) { + return partitionedIndexStates.get(indexName); + } + return unPartitionedIndexStates.get(indexName); + } +} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTableState.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTableState.java deleted file mode 100644 index 2b546a68835fd9..00000000000000 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTableState.java +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.elasticsearch; - -import java.util.Map; -import java.util.Random; - -import org.apache.doris.catalog.PartitionInfo; -import com.google.common.collect.Maps; -import org.apache.doris.thrift.TNetworkAddress; - -/** - * save the dynamic info parsed from es cluster state such as shard routing, partition info - */ -public class EsTableState { - - private PartitionInfo partitionInfo; - private Map partitionIdToIndices; - private Map partitionedIndexStates; - private Map unPartitionedIndexStates; - - public EsTableState() { - partitionInfo = null; - partitionIdToIndices = Maps.newHashMap(); - partitionedIndexStates = Maps.newHashMap(); - unPartitionedIndexStates = Maps.newHashMap(); - } - - public void addHttpAddress(Map nodesInfo) { - for (EsIndexState indexState : partitionedIndexStates.values()) { - indexState.addHttpAddress(nodesInfo); - } - for (EsIndexState indexState : unPartitionedIndexStates.values()) { - indexState.addHttpAddress(nodesInfo); - } - - } - - public TNetworkAddress randomAddress(Map nodesInfo) { - int seed = new Random().nextInt() % nodesInfo.size(); - EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); - return nodeInfos[seed].getPublishAddress(); - } - - public PartitionInfo getPartitionInfo() { - return partitionInfo; - } - - public void setPartitionInfo(PartitionInfo partitionInfo) { - this.partitionInfo = partitionInfo; - } - - public Map getPartitionIdToIndices() { - return partitionIdToIndices; - } - - public void addPartition(String indexName, long partitionId) { - partitionIdToIndices.put(partitionId, indexName); - } - - public void addIndexState(String indexName, EsIndexState indexState) { - if (indexState.getPartitionDesc() != null) { - partitionedIndexStates.put(indexName, indexState); - } else { - unPartitionedIndexStates.put(indexName, indexState); - } - } - - public Map getPartitionedIndexStates() { - return partitionedIndexStates; - } - - public Map getUnPartitionedIndexStates() { - return unPartitionedIndexStates; - } - - public EsIndexState getIndexState(long partitionId) { - if (partitionIdToIndices.containsKey(partitionId)) { - return partitionedIndexStates.get(partitionIdToIndices.get(partitionId)); - } - return null; - } - - public EsIndexState getIndexState(String indexName) { - if (partitionedIndexStates.containsKey(indexName)) { - return partitionedIndexStates.get(indexName); - } - return unPartitionedIndexStates.get(indexName); - } -} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index 754f1eeee43995..085d9a1585cd60 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -17,51 +17,50 @@ package org.apache.doris.external.elasticsearch; -import org.json.JSONObject; - import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.common.AnalysisException; +import org.json.JSONObject; public class EsUtil { public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc, - DistributionDesc distributionDesc) - throws AnalysisException { + DistributionDesc distributionDesc) throws AnalysisException { if (partitionDesc == null && distributionDesc == null) { return; } - + if (partitionDesc != null) { if (!(partitionDesc instanceof RangePartitionDesc)) { throw new AnalysisException("Elasticsearch table only permit range partition"); } - + RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc; analyzePartitionDesc(rangePartitionDesc); } - + if (distributionDesc != null) { throw new AnalysisException("could not support distribution clause"); } } - + private static void analyzePartitionDesc(RangePartitionDesc partDesc) throws AnalysisException { if (partDesc.getPartitionColNames() == null || partDesc.getPartitionColNames().isEmpty()) { throw new AnalysisException("No partition columns."); } - + if (partDesc.getPartitionColNames().size() > 1) { - throw new AnalysisException("Elasticsearch table's parition column could only be a single column"); + throw new AnalysisException( + "Elasticsearch table's parition column could only be a single column"); } } - /** * get the json object from specified jsonObject + * * @param jsonObject * @param key * @return diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 497fa631a88026..aa04ec66164b9c 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -26,9 +26,9 @@ import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.external.elasticsearch.EsIndexState; +import org.apache.doris.external.elasticsearch.EsShardPartitions; import org.apache.doris.external.elasticsearch.EsShardRouting; -import org.apache.doris.external.elasticsearch.EsTableState; +import org.apache.doris.external.elasticsearch.EsTablePartitions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TEsScanNode; import org.apache.doris.thrift.TEsScanRange; @@ -65,7 +65,7 @@ public class EsScanNode extends ScanNode { private final Random random = new Random(System.currentTimeMillis()); private Multimap backendMap; private List backendList; - private EsTableState esTableState; + private EsTablePartitions esTablePartitions; private List shardScanRanges = Lists.newArrayList(); private EsTable table; @@ -74,7 +74,7 @@ public class EsScanNode extends ScanNode { public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName); table = (EsTable) (desc.getTable()); - esTableState = table.getEsTableState(); + esTablePartitions = table.getEsTablePartitions(); } @Override @@ -147,23 +147,23 @@ private void assignBackends() throws UserException { // only do partition(es index level) prune private List getShardLocations() throws UserException { // has to get partition info from es state not from table because the partition info is generated from es cluster state dynamically - if (esTableState == null) { + if (esTablePartitions == null) { if (table.getLastMetaDataSyncException() != null) { throw new UserException("fetch es table [" + table.getName() + "] metadata failure: " + table.getLastMetaDataSyncException().getLocalizedMessage()); } throw new UserException("EsTable metadata has not been synced, Try it later"); } - Collection partitionIds = partitionPrune(esTableState.getPartitionInfo()); - List selectedIndex = Lists.newArrayList(); + Collection partitionIds = partitionPrune(esTablePartitions.getPartitionInfo()); + List selectedIndex = Lists.newArrayList(); ArrayList unPartitionedIndices = Lists.newArrayList(); ArrayList partitionedIndices = Lists.newArrayList(); - for (EsIndexState esIndexState : esTableState.getUnPartitionedIndexStates().values()) { - selectedIndex.add(esIndexState); - unPartitionedIndices.add(esIndexState.getIndexName()); + for (EsShardPartitions esShardPartitions : esTablePartitions.getUnPartitionedIndexStates().values()) { + selectedIndex.add(esShardPartitions); + unPartitionedIndices.add(esShardPartitions.getIndexName()); } if (partitionIds != null) { for (Long partitionId : partitionIds) { - EsIndexState indexState = esTableState.getIndexState(partitionId); + EsShardPartitions indexState = esTablePartitions.getEsShardPartitions(partitionId); selectedIndex.add(indexState); partitionedIndices.add(indexState.getIndexName()); } @@ -176,7 +176,7 @@ private List getShardLocations() throws UserException { } int beIndex = random.nextInt(backendList.size()); List result = Lists.newArrayList(); - for (EsIndexState indexState : selectedIndex) { + for (EsShardPartitions indexState : selectedIndex) { for (List shardRouting : indexState.getShardRoutings().values()) { // get backends Set colocatedBes = Sets.newHashSet(); diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 29536fae9383f7..e799352027d288 100644 --- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -79,10 +79,8 @@ public class CatalogTestUtil { public static String testTxnLable10 = "testTxnLable10"; public static String testTxnLable11 = "testTxnLable11"; public static String testTxnLable12 = "testTxnLable12"; - public static String testPartitionedEsTable1 = "partitionedEsTable1"; - public static long testPartitionedEsTableId1 = 14; - public static String testUnPartitionedEsTable1 = "unpartitionedEsTable1"; - public static long testUnPartitionedEsTableId1 = 15; + public static String testEsTable1 = "partitionedEsTable1"; + public static long testEsTableId1 = 14; public static Catalog createTestCatalog() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { @@ -229,8 +227,7 @@ public static Database createSimpleDb(long dbId, long tableId, long partitionId, // add a es table to catalog try { - createPartitionedEsTable(db); - createUnPartitionedEsTable(db); + createEsTable(db); } catch (DdlException e) { // TODO Auto-generated catch block // e.printStackTrace(); @@ -238,21 +235,18 @@ public static Database createSimpleDb(long dbId, long tableId, long partitionId, return db; } - public static void createPartitionedEsTable(Database db) throws DdlException { + public static void createEsTable(Database db) throws DdlException { // columns - List columns = new ArrayList(); - Column k1 = new Column("k1", PrimitiveType.DATE); - k1.setIsKey(true); - columns.add(k1); - Column k2 = new Column("k2", PrimitiveType.INT); - k2.setIsKey(true); - columns.add(k2); - columns.add(new Column("v", ScalarType.createType(PrimitiveType.DOUBLE), false, AggregateType.SUM, "0", "")); + List columns = new ArrayList<>(); + Column userId = new Column("userId", PrimitiveType.VARCHAR); + columns.add(userId); + columns.add(new Column("time", PrimitiveType.BIGINT)); + columns.add(new Column("type", PrimitiveType.VARCHAR)); // table List partitionColumns = Lists.newArrayList(); List singleRangePartitionDescs = Lists.newArrayList(); - partitionColumns.add(k1); + partitionColumns.add(userId); singleRangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1", new PartitionKeyDesc(Lists @@ -263,44 +257,15 @@ public static void createPartitionedEsTable(Database db) throws DdlException { Map properties = Maps.newHashMap(); properties.put(EsTable.HOSTS, "xxx"); properties.put(EsTable.INDEX, "indexa"); + properties.put(EsTable.TYPE, "doc"); properties.put(EsTable.PASSWORD, ""); properties.put(EsTable.USER, "root"); - EsTable esTable = new EsTable(testPartitionedEsTableId1, testPartitionedEsTable1, + properties.put(EsTable.DOC_VALUE_SCAN, "true"); + properties.put(EsTable.KEYWORD_SNIFF, "true"); + EsTable esTable = new EsTable(testEsTableId1, testEsTable1, columns, properties, partitionInfo); db.createTable(esTable); } - - public static void createUnPartitionedEsTable(Database db) throws DdlException { - // columns - List columns = new ArrayList(); - Column k1 = new Column("k1", PrimitiveType.DATE); - k1.setIsKey(true); - columns.add(k1); - Column k2 = new Column("k2", PrimitiveType.INT); - k2.setIsKey(true); - columns.add(k2); - columns.add(new Column("v", ScalarType.createType(PrimitiveType.DOUBLE), false, AggregateType.SUM, "0", "")); - - // table - List partitionColumns = Lists.newArrayList(); - List singleRangePartitionDescs = Lists.newArrayList(); - partitionColumns.add(k1); - - singleRangePartitionDescs.add(new SingleRangePartitionDesc(false, "p1", - new PartitionKeyDesc(Lists - .newArrayList(new PartitionValue("100"))), - null)); - - SinglePartitionInfo partitionInfo = new SinglePartitionInfo(); - Map properties = Maps.newHashMap(); - properties.put(EsTable.HOSTS, "xxx"); - properties.put(EsTable.INDEX, "indexa"); - properties.put(EsTable.PASSWORD, ""); - properties.put(EsTable.USER, "root"); - EsTable esTable = new EsTable(testUnPartitionedEsTableId1, testUnPartitionedEsTable1, - columns, properties, partitionInfo); - db.createTable(esTable); - } public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort) { Backend backend = new Backend(id, host, heartPort); diff --git a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java new file mode 100644 index 00000000000000..a7ce9610de2e72 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.CatalogTestUtil; +import org.apache.doris.catalog.EsTable; +import org.apache.doris.catalog.FakeCatalog; +import org.apache.doris.catalog.FakeEditLog; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.meta.MetaContext; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.net.URISyntaxException; + +public class EsRepositoryTest { + + private static FakeEditLog fakeEditLog; + private static FakeCatalog fakeCatalog; + private static Catalog masterCatalog; + private static String mappingsStr = ""; + private static String es7MappingsStr = ""; + private static String searchShardsStr = ""; + private EsRepository esRepository; + private EsRestClient fakeClient; + + @BeforeClass + public static void init() throws IOException, InstantiationException, IllegalAccessException, + IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, + URISyntaxException { + fakeEditLog = new FakeEditLog(); + fakeCatalog = new FakeCatalog(); + masterCatalog = CatalogTestUtil.createTestCatalog(); + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_40); + metaContext.setThreadLocalInfo(); + // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40); + FakeCatalog.setCatalog(masterCatalog); + mappingsStr = loadJsonFromFile("data/es/mappings.json"); + es7MappingsStr = loadJsonFromFile("data/es/es7_mappings.json"); + searchShardsStr = loadJsonFromFile("data/es/search_shards.json"); + } + + @Before + public void setUp() { + esRepository = new EsRepository(); + fakeClient = new EsRestClient(new String[]{"localhost:9200"}, null, null); + } + + @Test + public void testSetEsTableContext() throws Exception { + EsTable esTable = (EsTable) Catalog.getCurrentCatalog() + .getDb(CatalogTestUtil.testDb1) + .getTable(CatalogTestUtil.testEsTableId1); + // es5 + EsFieldInfos fieldInfos = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, esTable.getMappingType()); + esTable.addFieldInfos(fieldInfos); + assertEquals("userId.keyword", esTable.fieldsContext().get("userId")); + assertEquals("userId.keyword", esTable.docValueContext().get("userId")); + // es7 + EsFieldInfos fieldInfos7 = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), es7MappingsStr, ""); + assertEquals("userId.keyword", fieldInfos7.getFieldsContext().get("userId")); + assertEquals("userId.keyword", fieldInfos7.getDocValueContext().get("userId")); + + } + + @Test(expected = ExternalDataSourceException.class) + public void testSetErrorType() throws Exception { + EsTable esTable = (EsTable) Catalog.getCurrentCatalog() + .getDb(CatalogTestUtil.testDb1) + .getTable(CatalogTestUtil.testEsTableId1); + // error type + EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, "errorType"); + } + + @Test + public void testSetTableState() throws ExternalDataSourceException, DdlException { + EsTable esTable = (EsTable) Catalog.getCurrentCatalog() + .getDb(CatalogTestUtil.testDb1) + .getTable(CatalogTestUtil.testEsTableId1); + EsShardPartitions esShardPartitions = EsShardPartitions.findShardPartitions(esTable.getIndexName(), searchShardsStr); + EsTablePartitions esTablePartitions = EsTablePartitions.fromShardPartitions(esTable, esShardPartitions); + assertNotNull(esTablePartitions); + assertEquals(1, esTablePartitions.getUnPartitionedIndexStates().size()); + assertEquals(5, esTablePartitions.getEsShardPartitions("indexa").getShardRoutings().size()); + } + + private static String loadJsonFromFile(String fileName) throws IOException, URISyntaxException { + File file = new File(EsRepositoryTest.class.getClassLoader().getResource(fileName).toURI()); + InputStream is = new FileInputStream(file); + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + StringBuilder jsonStr = new StringBuilder(); + String line = ""; + while ((line = br.readLine()) != null) { + jsonStr.append(line); + } + br.close(); + is.close(); + return jsonStr.toString(); + } +} \ No newline at end of file diff --git a/fe/src/test/resources/data/es/es7_mappings.json b/fe/src/test/resources/data/es/es7_mappings.json new file mode 100644 index 00000000000000..cf44130f68b5e9 --- /dev/null +++ b/fe/src/test/resources/data/es/es7_mappings.json @@ -0,0 +1,23 @@ +{ + "indexa_2020.05.02": { + "mappings": { + "dynamic": "strict", + "properties": { + "time": { + "type": "long" + }, + "type": { + "type": "keyword" + }, + "userId": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } +} \ No newline at end of file diff --git a/fe/src/test/resources/data/es/mappings.json b/fe/src/test/resources/data/es/mappings.json new file mode 100644 index 00000000000000..1c3f23308ce54f --- /dev/null +++ b/fe/src/test/resources/data/es/mappings.json @@ -0,0 +1,25 @@ +{ + "indexa_2020.05.02": { + "mappings": { + "doc": { + "dynamic": "strict", + "properties": { + "time": { + "type": "long" + }, + "type": { + "type": "keyword" + }, + "userId": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/fe/src/test/resources/data/es/search_shards.json b/fe/src/test/resources/data/es/search_shards.json new file mode 100644 index 00000000000000..0a0446cced6771 --- /dev/null +++ b/fe/src/test/resources/data/es/search_shards.json @@ -0,0 +1,213 @@ +{ + "nodes": { + "yoBi15gPSFe7BKDDOhI-0g": { + "name": "test239-9220-9320", + "ephemeral_id": "Th56FehQT36Zda2l0FQewg", + "transport_address": "192.168.0.1:9320", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "PTrurLShRN6POIuNe8WnFg": { + "name": "test057-29210-29310", + "ephemeral_id": "KyIWi165QVa9sqHYvjeJxw", + "transport_address": "192.168.0.1:29310", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "7mjpC74LQkGEgcGks3-dYQ": { + "name": "test239-9230-9330", + "ephemeral_id": "g821RcAARpy3hIOnboqnHA", + "transport_address": "192.168.0.1:9330", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "_DigEpF0SPmV32lPTzZ_QA": { + "name": "test240-9210-9310", + "ephemeral_id": "sgKUhJ59QuyzUvKXxNnzww", + "transport_address": "192.168.0.1:9310", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "zMH8NhsdTy63mnTeNgGILw": { + "name": "test240-9230-9330", + "ephemeral_id": "OEvljKbCSPaQpU3SPavBvQ", + "transport_address": "192.168.0.1:9330", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "5D_18rKKRzOnAlnbZUdSRw": { + "name": "test239-9210-9310", + "ephemeral_id": "ECEwuqcvTY-qAlT5OcWz2Q", + "transport_address": "192.168.0.1:9310", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "8F5Wip4-Qb6og3jhepn5sg": { + "name": "test058-29210-29310", + "ephemeral_id": "QU24dPM_T-S-OPGoDH9VGA", + "transport_address": "192.168.0.1:29310", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + }, + "7k1HfDdMQ1G8Ob4vUDRl1Q": { + "name": "test240-9220-9320", + "ephemeral_id": "IzSWsgStRriKFdYvBz9hNw", + "transport_address": "192.168.0.1:9320", + "attributes": { + "disk": "hdd", + "ml.max_open_jobs": "10", + "ml.enabled": "true" + } + } + }, + "indices": { + "indexa_2020.05.02": { + "aliases": [ + "indexa" + ] + } + }, + "shards": [ + [ + { + "state": "STARTED", + "primary": true, + "node": "7mjpC74LQkGEgcGks3-dYQ", + "relocating_node": null, + "shard": 0, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "Q77NsOEYR0SlzOSOQsyixQ" + } + }, + { + "state": "STARTED", + "primary": false, + "node": "8F5Wip4-Qb6og3jhepn5sg", + "relocating_node": null, + "shard": 0, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "nKsg32sPSyinydeaRyUA8w" + } + } + ], + [ + { + "state": "STARTED", + "primary": false, + "node": "_DigEpF0SPmV32lPTzZ_QA", + "relocating_node": null, + "shard": 1, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "2L5soNwQTDaNz7EdxNV9yQ" + } + }, + { + "state": "STARTED", + "primary": true, + "node": "yoBi15gPSFe7BKDDOhI-0g", + "relocating_node": null, + "shard": 1, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "FJKsGnYyRniK0BexSdJCiQ" + } + } + ], + [ + { + "state": "STARTED", + "primary": false, + "node": "zMH8NhsdTy63mnTeNgGILw", + "relocating_node": null, + "shard": 2, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "qyTTlZYCQ4y0hQPL5uLIaQ" + } + }, + { + "state": "STARTED", + "primary": true, + "node": "5D_18rKKRzOnAlnbZUdSRw", + "relocating_node": null, + "shard": 2, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "SxFS7Vz_S-yxCCgsU4n52g" + } + } + ], + [ + { + "state": "STARTED", + "primary": false, + "node": "8F5Wip4-Qb6og3jhepn5sg", + "relocating_node": null, + "shard": 3, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "TL2QDmCLQCO51MCLAVmBOA" + } + }, + { + "state": "STARTED", + "primary": true, + "node": "PTrurLShRN6POIuNe8WnFg", + "relocating_node": null, + "shard": 3, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "9Mvuikm_RomoWi7020F48A" + } + } + ], + [ + { + "state": "STARTED", + "primary": false, + "node": "7k1HfDdMQ1G8Ob4vUDRl1Q", + "relocating_node": null, + "shard": 4, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "aE4DvgpkQXut2FN-Skv-sw" + } + }, + { + "state": "STARTED", + "primary": true, + "node": "5D_18rKKRzOnAlnbZUdSRw", + "relocating_node": null, + "shard": 4, + "index": "indexa_2020.05.02", + "allocation_id": { + "id": "u8zHP-RwS1SiJD9ZXJL1yw" + } + } + ] + ] +} \ No newline at end of file