diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java similarity index 89% rename from fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java rename to fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java index abb67440f791e2..c1ea1f4af41137 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java @@ -19,11 +19,11 @@ import org.apache.doris.common.UserException; -public class ExternalDataSourceException extends UserException { +public class DorisEsException extends UserException { private static final long serialVersionUID = 7912833584319374692L; - public ExternalDataSourceException(String msg) { + public DorisEsException(String msg) { super(msg); } } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java index 59d251118d471f..5edb80bb890b44 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java @@ -64,7 +64,7 @@ public Map getDocValueContext() { * @return fieldsContext and docValueContext * @throws Exception */ - public static EsFieldInfos fromMapping(List colList, String indexName, String indexMapping, String docType) throws ExternalDataSourceException { + public static EsFieldInfos fromMapping(List colList, String indexName, String indexMapping, String docType) throws DorisEsException { JSONObject jsonObject = new JSONObject(indexMapping); // the indexName use alias takes the first mapping Iterator keys = jsonObject.keys(); @@ -103,7 +103,7 @@ public static EsFieldInfos fromMapping(List colList, String indexName, S properties = rootSchema.optJSONObject("properties"); } if (properties == null) { - throw new ExternalDataSourceException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster"); + throw new DorisEsException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster"); } return parseProperties(colList, properties); } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index adfc294faa34b9..dd7f93fa9693e5 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -95,7 +95,7 @@ public EsFieldInfos getFieldInfos(String indexName, String docType, List String path = indexName + "/_mapping"; String indexMapping = execute(path); if (indexMapping == null) { - throw new ExternalDataSourceException( "index[" + indexName + "] not found for the Elasticsearch Cluster"); + throw new DorisEsException( "index[" + indexName + "] not found for the Elasticsearch Cluster"); } return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType); } @@ -105,7 +105,7 @@ public EsShardPartitions getShardPartitions(String indexName) throws Exception { String path = indexName + "/_search_shards"; String searchShards = execute(path); if (searchShards == null) { - throw new ExternalDataSourceException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster"); + throw new DorisEsException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster"); } return EsShardPartitions.findShardPartitions(indexName, searchShards); } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java index 30e12a07acb987..5caa6c034788fe 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java @@ -20,12 +20,15 @@ import org.apache.doris.analysis.SingleRangePartitionDesc; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.thrift.TNetworkAddress; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONArray; import org.json.JSONObject; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.util.List; import java.util.Map; import java.util.Random; @@ -47,17 +50,17 @@ public EsShardPartitions(String indexName) { this.partitionDesc = null; this.partitionKey = null; } - + /** * Parse shardRoutings from the json - * @param indexName indexName(alias or really name) + * + * @param indexName indexName(alias or really name) * @param searchShards the return value of _search_shards * @return shardRoutings is used for searching */ - public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws ExternalDataSourceException { + public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException { EsShardPartitions indexState = new EsShardPartitions(indexName); JSONObject jsonObject = new JSONObject(searchShards); - JSONObject nodesMap = jsonObject.getJSONObject("nodes"); JSONArray shards = jsonObject.getJSONArray("shards"); int length = shards.length(); for (int i = 0; i < length; i++) { @@ -65,14 +68,20 @@ public static EsShardPartitions findShardPartitions(String indexName, String sea JSONArray shardsArray = shards.getJSONArray(i); int arrayLength = shardsArray.length(); for (int j = 0; j < arrayLength; j++) { - JSONObject shard = shardsArray.getJSONObject(j); - String shardState = shard.getString("state"); + JSONObject indexShard = shardsArray.getJSONObject(j); + String shardState = indexShard.getString("state"); if ("STARTED".equalsIgnoreCase(shardState) || "RELOCATING".equalsIgnoreCase(shardState)) { try { - singleShardRouting.add(EsShardRouting.parseShardRouting(shardState, String.valueOf(i), shard, nodesMap)); + singleShardRouting.add( + EsShardRouting.newSearchShard( + indexShard.getString("index"), + indexShard.getInt("shard"), + indexShard.getBoolean("primary"), + indexShard.getString("node"), + jsonObject.getJSONObject("nodes"))); } catch (Exception e) { - throw new ExternalDataSourceException( "index[" + indexName + "] findShardPartitions error"); - } + LOG.error("fetch index [{}] shard partitions failure", indexName, e); + throw new DorisEsException("fetch [" + indexName + "] shard partitions failure [" + e.getMessage() + "]"); } } } if (singleShardRouting.isEmpty()) { @@ -142,6 +151,6 @@ public void setPartitionId(long partitionId) { @Override public String toString() { return "EsIndexState [indexName=" + indexName + ", partitionDesc=" + partitionDesc + ", partitionKey=" - + partitionKey + "]"; + + partitionKey + "]"; } } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java index 4f3fa2f1864892..980a62d4684a97 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java @@ -18,10 +18,10 @@ package org.apache.doris.external.elasticsearch; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.commons.lang.StringUtils; import org.json.JSONObject; -import org.apache.doris.thrift.TNetworkAddress; public class EsShardRouting { @@ -41,9 +41,8 @@ public EsShardRouting(String indexName, int shardId, boolean isPrimary, TNetwork this.nodeId = nodeId; } - public static EsShardRouting parseShardRouting(String indexName, String shardKey, - JSONObject shardInfo, JSONObject nodesMap) { - String nodeId = shardInfo.getString("node"); + public static EsShardRouting newSearchShard(String indexName, int shardId, boolean isPrimary, + String nodeId, JSONObject nodesMap) { JSONObject nodeInfo = nodesMap.getJSONObject(nodeId); String[] transportAddr = nodeInfo.getString("transport_address").split(":"); // get thrift port from node info @@ -53,9 +52,7 @@ public static EsShardRouting parseShardRouting(String indexName, String shardKey if (!StringUtils.isEmpty(thriftPort)) { addr = new TNetworkAddress(transportAddr[0], Integer.parseInt(thriftPort)); } - boolean isPrimary = shardInfo.getBoolean("primary"); - return new EsShardRouting(indexName, Integer.parseInt(shardKey), - isPrimary, addr, nodeId); + return new EsShardRouting(indexName, shardId, isPrimary, addr, nodeId); } public int getShardId() { diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java index 52630868edaa7e..c1306be8d0af31 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java @@ -17,12 +17,6 @@ package org.apache.doris.external.elasticsearch; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Random; - import org.apache.doris.catalog.Column; import org.apache.doris.catalog.EsTable; import org.apache.doris.catalog.PartitionInfo; @@ -31,11 +25,19 @@ import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.common.DdlException; import org.apache.doris.thrift.TNetworkAddress; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Random; + /** * save the dynamic info parsed from es cluster state such as shard routing, partition info */ @@ -56,7 +58,7 @@ public EsTablePartitions() { } public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPartitions shardPartitions) - throws ExternalDataSourceException, DdlException { + throws DorisEsException, DdlException { EsTablePartitions esTablePartitions = new EsTablePartitions(); RangePartitionInfo partitionInfo = null; if (esTable.getPartitionInfo() != null) { @@ -82,7 +84,7 @@ public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPart LOG.debug("begin to parse es table [{}] state from search shards, " + "with no partition info", esTable.getName()); } else { - throw new ExternalDataSourceException("es table only support range partition, " + throw new DorisEsException("es table only support range partition, " + "but current partition type is " + esTable.getPartitionInfo().getType()); } diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index aa04ec66164b9c..03cfc7076a9740 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -40,6 +40,9 @@ import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -47,9 +50,6 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -214,7 +214,7 @@ private List getShardLocations() throws UserException { // Generate on es scan range TEsScanRange esScanRange = new TEsScanRange(); esScanRange.setEs_hosts(shardAllocations); - esScanRange.setIndex(indexState.getIndexName()); + esScanRange.setIndex(shardRouting.get(0).getIndexName()); esScanRange.setType(table.getMappingType()); esScanRange.setShard_id(shardRouting.get(0).getShardId()); // Scan range diff --git a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java index a7ce9610de2e72..c367ec0f8388d1 100644 --- a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java +++ b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java @@ -91,7 +91,7 @@ public void testSetEsTableContext() throws Exception { } - @Test(expected = ExternalDataSourceException.class) + @Test(expected = DorisEsException.class) public void testSetErrorType() throws Exception { EsTable esTable = (EsTable) Catalog.getCurrentCatalog() .getDb(CatalogTestUtil.testDb1) @@ -101,7 +101,7 @@ public void testSetErrorType() throws Exception { } @Test - public void testSetTableState() throws ExternalDataSourceException, DdlException { + public void testSetTableState() throws DorisEsException, DdlException { EsTable esTable = (EsTable) Catalog.getCurrentCatalog() .getDb(CatalogTestUtil.testDb1) .getTable(CatalogTestUtil.testEsTableId1);