Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.doris.common.UserException;

public class ExternalDataSourceException extends UserException {
public class DorisEsException extends UserException {

private static final long serialVersionUID = 7912833584319374692L;

public ExternalDataSourceException(String msg) {
public DorisEsException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Map<String, String> getDocValueContext() {
* @return fieldsContext and docValueContext
* @throws Exception
*/
public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws ExternalDataSourceException {
public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws DorisEsException {
JSONObject jsonObject = new JSONObject(indexMapping);
// the indexName use alias takes the first mapping
Iterator<String> keys = jsonObject.keys();
Expand Down Expand Up @@ -103,7 +103,7 @@ public static EsFieldInfos fromMapping(List<Column> colList, String indexName, S
properties = rootSchema.optJSONObject("properties");
}
if (properties == null) {
throw new ExternalDataSourceException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster");
throw new DorisEsException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster");
}
return parseProperties(colList, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public EsFieldInfos getFieldInfos(String indexName, String docType, List<Column>
String path = indexName + "/_mapping";
String indexMapping = execute(path);
if (indexMapping == null) {
throw new ExternalDataSourceException( "index[" + indexName + "] not found for the Elasticsearch Cluster");
throw new DorisEsException( "index[" + indexName + "] not found for the Elasticsearch Cluster");
}
return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType);
}
Expand All @@ -105,7 +105,7 @@ public EsShardPartitions getShardPartitions(String indexName) throws Exception {
String path = indexName + "/_search_shards";
String searchShards = execute(path);
if (searchShards == null) {
throw new ExternalDataSourceException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster");
throw new DorisEsException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster");
}
return EsShardPartitions.findShardPartitions(indexName, searchShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.apache.doris.analysis.SingleRangePartitionDesc;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -47,32 +50,38 @@ public EsShardPartitions(String indexName) {
this.partitionDesc = null;
this.partitionKey = null;
}

/**
* Parse shardRoutings from the json
* @param indexName indexName(alias or really name)
*
* @param indexName indexName(alias or really name)
* @param searchShards the return value of _search_shards
* @return shardRoutings is used for searching
*/
public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws ExternalDataSourceException {
public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException {
EsShardPartitions indexState = new EsShardPartitions(indexName);
JSONObject jsonObject = new JSONObject(searchShards);
JSONObject nodesMap = jsonObject.getJSONObject("nodes");
JSONArray shards = jsonObject.getJSONArray("shards");
int length = shards.length();
for (int i = 0; i < length; i++) {
List<EsShardRouting> singleShardRouting = Lists.newArrayList();
JSONArray shardsArray = shards.getJSONArray(i);
int arrayLength = shardsArray.length();
for (int j = 0; j < arrayLength; j++) {
JSONObject shard = shardsArray.getJSONObject(j);
String shardState = shard.getString("state");
JSONObject indexShard = shardsArray.getJSONObject(j);
String shardState = indexShard.getString("state");
if ("STARTED".equalsIgnoreCase(shardState) || "RELOCATING".equalsIgnoreCase(shardState)) {
try {
singleShardRouting.add(EsShardRouting.parseShardRouting(shardState, String.valueOf(i), shard, nodesMap));
singleShardRouting.add(
EsShardRouting.newSearchShard(
indexShard.getString("index"),
indexShard.getInt("shard"),
indexShard.getBoolean("primary"),
indexShard.getString("node"),
jsonObject.getJSONObject("nodes")));
} catch (Exception e) {
throw new ExternalDataSourceException( "index[" + indexName + "] findShardPartitions error");
}
LOG.error("fetch index [{}] shard partitions failure", indexName, e);
throw new DorisEsException("fetch [" + indexName + "] shard partitions failure [" + e.getMessage() + "]"); }
}
}
if (singleShardRouting.isEmpty()) {
Expand Down Expand Up @@ -142,6 +151,6 @@ public void setPartitionId(long partitionId) {
@Override
public String toString() {
return "EsIndexState [indexName=" + indexName + ", partitionDesc=" + partitionDesc + ", partitionKey="
+ partitionKey + "]";
+ partitionKey + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.doris.external.elasticsearch;


import org.apache.doris.thrift.TNetworkAddress;
import org.apache.commons.lang.StringUtils;
import org.json.JSONObject;

import org.apache.doris.thrift.TNetworkAddress;

public class EsShardRouting {

Expand All @@ -41,9 +41,8 @@ public EsShardRouting(String indexName, int shardId, boolean isPrimary, TNetwork
this.nodeId = nodeId;
}

public static EsShardRouting parseShardRouting(String indexName, String shardKey,
JSONObject shardInfo, JSONObject nodesMap) {
String nodeId = shardInfo.getString("node");
public static EsShardRouting newSearchShard(String indexName, int shardId, boolean isPrimary,
String nodeId, JSONObject nodesMap) {
JSONObject nodeInfo = nodesMap.getJSONObject(nodeId);
String[] transportAddr = nodeInfo.getString("transport_address").split(":");
// get thrift port from node info
Expand All @@ -53,9 +52,7 @@ public static EsShardRouting parseShardRouting(String indexName, String shardKey
if (!StringUtils.isEmpty(thriftPort)) {
addr = new TNetworkAddress(transportAddr[0], Integer.parseInt(thriftPort));
}
boolean isPrimary = shardInfo.getBoolean("primary");
return new EsShardRouting(indexName, Integer.parseInt(shardKey),
isPrimary, addr, nodeId);
return new EsShardRouting(indexName, shardId, isPrimary, addr, nodeId);
}

public int getShardId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package org.apache.doris.external.elasticsearch;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
Expand All @@ -31,11 +25,19 @@
import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.common.DdlException;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.Maps;
import com.google.common.collect.Range;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
* save the dynamic info parsed from es cluster state such as shard routing, partition info
*/
Expand All @@ -56,7 +58,7 @@ public EsTablePartitions() {
}

public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPartitions shardPartitions)
throws ExternalDataSourceException, DdlException {
throws DorisEsException, DdlException {
EsTablePartitions esTablePartitions = new EsTablePartitions();
RangePartitionInfo partitionInfo = null;
if (esTable.getPartitionInfo() != null) {
Expand All @@ -82,7 +84,7 @@ public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPart
LOG.debug("begin to parse es table [{}] state from search shards, "
+ "with no partition info", esTable.getName());
} else {
throw new ExternalDataSourceException("es table only support range partition, "
throw new DorisEsException("es table only support range partition, "
+ "but current partition type is "
+ esTable.getPartitionInfo().getType());
}
Expand Down
8 changes: 4 additions & 4 deletions fe/src/main/java/org/apache/doris/planner/EsScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -214,7 +214,7 @@ private List<TScanRangeLocations> getShardLocations() throws UserException {
// Generate on es scan range
TEsScanRange esScanRange = new TEsScanRange();
esScanRange.setEs_hosts(shardAllocations);
esScanRange.setIndex(indexState.getIndexName());
esScanRange.setIndex(shardRouting.get(0).getIndexName());
esScanRange.setType(table.getMappingType());
esScanRange.setShard_id(shardRouting.get(0).getShardId());
// Scan range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testSetEsTableContext() throws Exception {

}

@Test(expected = ExternalDataSourceException.class)
@Test(expected = DorisEsException.class)
public void testSetErrorType() throws Exception {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
Expand All @@ -101,7 +101,7 @@ public void testSetErrorType() throws Exception {
}

@Test
public void testSetTableState() throws ExternalDataSourceException, DdlException {
public void testSetTableState() throws DorisEsException, DdlException {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
.getTable(CatalogTestUtil.testEsTableId1);
Expand Down