Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e2d09f5
MOD: fix varchar max error
stalary May 2, 2020
b6ff6b2
MOD: Split /_cluster/state to [indexName/_mappings, indexName/_search…
stalary May 2, 2020
d858a74
MOD: Mod some details
stalary May 2, 2020
a3d2928
MOD: Cancel the folding line,Remove the changes to varchar.md
stalary May 6, 2020
558b8ee
Merge branch 'master' of https://github.com/apache/incubator-doris in…
stalary May 18, 2020
d25b21f
MOD: Make some changes based on comments
stalary May 18, 2020
407e913
MOD: make some changes based on the comments
stalary May 24, 2020
f06f846
FIX: fix dovValueField bug
stalary May 24, 2020
a381b68
MOD: remove useless import
stalary May 24, 2020
8400246
MOD
stalary May 24, 2020
7b79445
Merge branch 'master' of https://github.com/apache/incubator-doris in…
stalary May 24, 2020
451b9a6
MOD: Supplement Doris on es documentation
stalary May 26, 2020
3e768c8
MOD: refactor code
stalary Jun 10, 2020
2ed77b6
Merge branch 'master' into stalary_enhancement
stalary Jun 10, 2020
73af7d5
Merge branch 'master' of https://github.com/apache/incubator-doris in…
stalary Jun 10, 2020
b975c7a
MOD: Adjust import order, resolve conflicts
stalary Jun 10, 2020
3782f4b
MOD: adjust import order
stalary Jun 11, 2020
97b2338
MOD: Abstract code
stalary Jun 12, 2020
c2f7925
Merge branch 'master' of https://github.com/apache/incubator-doris in…
stalary Jun 19, 2020
95b5117
MOD: Refactor some places
stalary Jun 20, 2020
5e42b5b
MOD: EsRepository logic moves to EsTable
stalary Jun 22, 2020
e3748ec
Merge branch 'master' into stalary_enhancement
stalary Jun 23, 2020
a7ba9d1
MOD: Abstract code, change the function name
stalary Jun 23, 2020
303d0a2
MOD: format
stalary Jun 23, 2020
15ba2d3
MOD: mod error info
stalary Jun 23, 2020
5fbae71
MOD: format code
stalary Jun 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/extending-doris/doris-on-es.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ The following parameters are accepted by ES table:
Parameter | Description
---|---
**hosts** | ES Cluster Connection Address, maybe one or more node, load-balance is also accepted
**index** | the related ES index name
**type** | the type for this indexIf not specified, `_doc` will be used
**index** | the related ES index name, alias is supported, and if you use doc_value, you need to use the real name
**type** | the type for this index, If not specified, `_doc` will be used
**user** | username for ES
**password** | password for the user

Expand Down
2 changes: 1 addition & 1 deletion docs/zh-CN/extending-doris/doris-on-es.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ PROPERTIES (
参数 | 说明
---|---
**hosts** | ES集群地址,可以是一个或多个,也可以是ES前端的负载均衡地址
**index** | 对应的ES的index名字
**index** | 对应的ES的index名字,支持alias,如果使用doc_value,需要使用真实的名称
**type** | index的type,不指定的情况会使用_doc
**user** | ES集群用户名
**password** | 对应用户的密码信息
Expand Down
16 changes: 8 additions & 8 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
import org.apache.doris.deploy.impl.LocalFileDeployManager;
import org.apache.doris.external.elasticsearch.EsStateStore;
import org.apache.doris.external.elasticsearch.EsRepository;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.ha.HAProtocol;
Expand Down Expand Up @@ -301,7 +301,7 @@ public class Catalog {
private Daemon replayer;
private Daemon timePrinter;
private Daemon listener;
private EsStateStore esStateStore; // it is a daemon, so add it here
private EsRepository esRepository; // it is a daemon, so add it here

private boolean isFirstTimeStartUp = false;
private boolean isElectable;
Expand Down Expand Up @@ -510,7 +510,7 @@ private Catalog() {
this.auth = new PaloAuth();
this.domainResolver = new DomainResolver(auth);

this.esStateStore = new EsStateStore();
this.esRepository = new EsRepository();

this.metaContext = new MetaContext();
this.metaContext.setThreadLocalInfo();
Expand Down Expand Up @@ -1290,7 +1290,7 @@ private void startNonMasterDaemonThreads() {
// load and export job label cleaner thread
labelCleaner.start();
// ES state store
esStateStore.start();
esRepository.start();
// domain resolver
domainResolver.start();
}
Expand Down Expand Up @@ -1452,7 +1452,7 @@ public void loadImage(String imageDir) throws IOException, DdlException {
// ATTN: this should be done after load Db, and before loadAlterJob
recreateTabletInvertIndex();
// rebuild es state state
esStateStore.loadTableFromCatalog();
esRepository.loadTableFromCatalog();

checksum = loadLoadJob(dis, checksum);
checksum = loadAlterJob(dis, checksum);
Expand Down Expand Up @@ -4376,7 +4376,7 @@ public boolean unprotectDropTable(Database db, long tableId) {
}

if (table.getType() == TableType.ELASTICSEARCH) {
esStateStore.deRegisterTable(tableId);
esRepository.deRegisterTable(tableId);
} else if (table.getType() == TableType.OLAP) {
// drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
// which make things easier.
Expand Down Expand Up @@ -4828,8 +4828,8 @@ public String getMasterIp() {
return this.masterIp;
}

public EsStateStore getEsStateStore() {
return this.esStateStore;
public EsRepository getEsRepository() {
return this.esRepository;
}

public void setMaster(MasterInfo info) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfN
Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);
}
if (table.getType() == TableType.ELASTICSEARCH) {
Catalog.getCurrentCatalog().getEsStateStore().registerTable((EsTable)table);
Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable)table);
}
}
return result;
Expand Down
54 changes: 42 additions & 12 deletions fe/src/main/java/org/apache/doris/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.external.elasticsearch.EsFieldInfos;
import org.apache.doris.external.elasticsearch.EsMajorVersion;
import org.apache.doris.external.elasticsearch.EsTableState;
import org.apache.doris.external.elasticsearch.EsNodeInfo;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsShardPartitions;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
Expand Down Expand Up @@ -71,7 +75,7 @@ public class EsTable extends Table {
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in esTableState
private PartitionInfo partitionInfo;
private EsTableState esTableState;
private EsTablePartitions esTablePartitions;
private boolean enableDocValueScan = false;
private boolean enableKeywordSniff = true;

Expand Down Expand Up @@ -114,18 +118,19 @@ public EsTable(long id, String name, List<Column> schema,
validate(properties);
}

public void addFetchField(String originName, String replaceName) {
fieldsContext.put(originName, replaceName);
public void addFieldInfos(EsFieldInfos esFieldInfos) {
if (enableKeywordSniff && esFieldInfos.getFieldsContext() != null) {
fieldsContext = esFieldInfos.getFieldsContext();
}
if (enableDocValueScan && esFieldInfos.getDocValueContext() != null) {
docValueContext = esFieldInfos.getDocValueContext();
}
}

public Map<String, String> fieldsContext() {
return fieldsContext;
}

public void addDocValueField(String name, String fieldsName) {
docValueContext.put(name, fieldsName);
}

public Map<String, String> docValueContext() {
return docValueContext;
}
Expand Down Expand Up @@ -386,12 +391,12 @@ public PartitionInfo getPartitionInfo() {
return partitionInfo;
}

public EsTableState getEsTableState() {
return esTableState;
public EsTablePartitions getEsTablePartitions() {
return esTablePartitions;
}

public void setEsTableState(EsTableState esTableState) {
this.esTableState = esTableState;
public void setEsTablePartitions(EsTablePartitions esTablePartitions) {
this.esTablePartitions = esTablePartitions;
}

public Throwable getLastMetaDataSyncException() {
Expand All @@ -401,4 +406,29 @@ public Throwable getLastMetaDataSyncException() {
public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) {
this.lastMetaDataSyncException = lastMetaDataSyncException;
}

/**
* sync es index meta from remote
* @param client esRestClient
*/
public void syncESIndexMeta(EsRestClient client) {
try {
EsFieldInfos fieldInfos = client.getFieldInfos(this.indexName, this.mappingType, this.fullSchema);
EsShardPartitions esShardPartitions = client.getShardPartitions(this.indexName);
Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes();
if (this.enableKeywordSniff || this.enableDocValueScan) {
addFieldInfos(fieldInfos);
}

this.esTablePartitions = EsTablePartitions.fromShardPartitions(this, esShardPartitions);

if (EsTable.TRANSPORT_HTTP.equals(getTransport())) {
this.esTablePartitions.addHttpAddress(nodesInfo);
}
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e);
this.esTablePartitions = null;
this.lastMetaDataSyncException = e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.external.elasticsearch.EsIndexState;
import org.apache.doris.external.elasticsearch.EsShardPartitions;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -66,33 +66,33 @@ public ProcResult fetchResult() throws AnalysisException {
try {
RangePartitionInfo rangePartitionInfo = null;
if (esTable.getPartitionInfo().getType() == PartitionType.RANGE) {
rangePartitionInfo = (RangePartitionInfo) esTable.getEsTableState().getPartitionInfo();
rangePartitionInfo = (RangePartitionInfo) esTable.getEsTablePartitions().getPartitionInfo();
}
Joiner joiner = Joiner.on(", ");
Map<String, EsIndexState> unPartitionedIndices = esTable.getEsTableState().getUnPartitionedIndexStates();
Map<String, EsIndexState> partitionedIndices = esTable.getEsTableState().getPartitionedIndexStates();
for (EsIndexState esIndexState : unPartitionedIndices.values()) {
Map<String, EsShardPartitions> unPartitionedIndices = esTable.getEsTablePartitions().getUnPartitionedIndexStates();
Map<String, EsShardPartitions> partitionedIndices = esTable.getEsTablePartitions().getPartitionedIndexStates();
for (EsShardPartitions esShardPartitions : unPartitionedIndices.values()) {
List<Comparable> partitionInfo = new ArrayList<Comparable>();
partitionInfo.add(esIndexState.getIndexName());
partitionInfo.add(esShardPartitions.getIndexName());
partitionInfo.add("-"); // partition key
partitionInfo.add("-"); // range
partitionInfo.add("-"); // dis
partitionInfo.add(esIndexState.getShardRoutings().size()); // shards
partitionInfo.add(esShardPartitions.getShardRoutings().size()); // shards
partitionInfo.add(1); // replica num
partitionInfos.add(partitionInfo);
}
for (EsIndexState esIndexState : partitionedIndices.values()) {
for (EsShardPartitions esShardPartitions : partitionedIndices.values()) {
List<Comparable> partitionInfo = new ArrayList<Comparable>();
partitionInfo.add(esIndexState.getIndexName());
partitionInfo.add(esShardPartitions.getIndexName());
List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();
List<String> colNames = new ArrayList<String>();
for (Column column : partitionColumns) {
colNames.add(column.getName());
}
partitionInfo.add(joiner.join(colNames)); // partition key
partitionInfo.add(rangePartitionInfo.getRange(esIndexState.getPartitionId()).toString()); // range
partitionInfo.add(rangePartitionInfo.getRange(esShardPartitions.getPartitionId()).toString()); // range
partitionInfo.add("-"); // dis
partitionInfo.add(esIndexState.getShardRoutings().size()); // shards
partitionInfo.add(esShardPartitions.getShardRoutings().size()); // shards
partitionInfo.add(1); // replica num
partitionInfos.add(partitionInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.doris.catalog.EsTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.external.elasticsearch.EsIndexState;
import org.apache.doris.external.elasticsearch.EsShardPartitions;
import org.apache.doris.external.elasticsearch.EsShardRouting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -55,9 +55,9 @@ public ProcResult fetchResult() {
db.readLock();
try {
// get infos
EsIndexState esIndexState = esTable.getEsTableState().getIndexState(indexName);
for (int shardId : esIndexState.getShardRoutings().keySet()) {
List<EsShardRouting> shardRoutings = esIndexState.getShardRoutings().get(shardId);
EsShardPartitions esShardPartitions = esTable.getEsTablePartitions().getEsShardPartitions(indexName);
for (int shardId : esShardPartitions.getShardRoutings().keySet()) {
List<EsShardRouting> shardRoutings = esShardPartitions.getShardRoutings().get(shardId);
if (shardRoutings != null && shardRoutings.size() > 0) {
for (EsShardRouting esShardRouting : shardRoutings) {
List<Comparable> shardInfo = new ArrayList<Comparable>();
Expand Down
Loading