Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 23 additions & 51 deletions fe/src/main/java/org/apache/doris/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.external.elasticsearch.EsFieldInfos;
import org.apache.doris.external.elasticsearch.EsMajorVersion;
import org.apache.doris.external.elasticsearch.EsNodeInfo;
import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsShardPartitions;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Strings;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Strings;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -83,27 +81,6 @@ public class EsTable extends Table {

private Map<String, String> tableContext = new HashMap<>();

// used to indicate which fields can get from ES docavalue
// because elasticsearch can have "fields" feature, field can have
// two or more types, the first type maybe have not docvalue but other
// can have, such as (text field not have docvalue, but keyword can have):
// "properties": {
// "city": {
// "type": "text",
// "fields": {
// "raw": {
// "type": "keyword"
// }
// }
// }
// }
// then the docvalue context provided the mapping between the select field and real request field :
// {"city": "city.raw"}
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
private Map<String, String> docValueContext = new HashMap<>();

private Map<String, String> fieldsContext = new HashMap<>();

// record the latest and recently exception when sync ES table metadata (mapping, shard location)
private Throwable lastMetaDataSyncException = null;

Expand All @@ -118,21 +95,13 @@ public EsTable(long id, String name, List<Column> schema,
validate(properties);
}

public void addFieldInfos(EsFieldInfos esFieldInfos) {
if (enableKeywordSniff && esFieldInfos.getFieldsContext() != null) {
fieldsContext = esFieldInfos.getFieldsContext();
}
if (enableDocValueScan && esFieldInfos.getDocValueContext() != null) {
docValueContext = esFieldInfos.getDocValueContext();
}
}

public Map<String, String> fieldsContext() {
return fieldsContext;
return esMetaStateTracker.searchContext().fetchFieldsContext();
}

public Map<String, String> docValueContext() {
return docValueContext;
return esMetaStateTracker.searchContext().docValueFieldsContext();
}

public boolean isDocValueScanEnable() {
Expand Down Expand Up @@ -179,9 +148,12 @@ private void validate(Map<String, String> properties) throws DdlException {
if (properties.containsKey(VERSION)) {
try {
majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim());
if (majorVersion.before(EsMajorVersion.V_5_X)) {
throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] ");
}
} catch (Exception e) {
throw new DdlException("fail to parse ES major version, version= "
+ properties.get(VERSION).trim() + ", shoud be like '6.5.3' ");
+ properties.get(VERSION).trim() + ", should be like '6.5.3' ");
}
}

Expand Down Expand Up @@ -399,6 +371,10 @@ public void setEsTablePartitions(EsTablePartitions esTablePartitions) {
this.esTablePartitions = esTablePartitions;
}

public EsMajorVersion esVersion() {
return majorVersion;
}

public Throwable getLastMetaDataSyncException() {
return lastMetaDataSyncException;
}
Expand All @@ -407,24 +383,20 @@ public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) {
this.lastMetaDataSyncException = lastMetaDataSyncException;
}

private EsMetaStateTracker esMetaStateTracker;

/**
* sync es index meta from remote
* sync es index meta from remote ES Cluster
*
* @param client esRestClient
*/
public void syncESIndexMeta(EsRestClient client) {
public void syncTableMetaData(EsRestClient client) {
if (esMetaStateTracker == null) {
esMetaStateTracker = new EsMetaStateTracker(client, this);
}
try {
EsFieldInfos fieldInfos = client.getFieldInfos(this.indexName, this.mappingType, this.fullSchema);
EsShardPartitions esShardPartitions = client.getShardPartitions(this.indexName);
Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes();
if (this.enableKeywordSniff || this.enableDocValueScan) {
addFieldInfos(fieldInfos);
}

this.esTablePartitions = EsTablePartitions.fromShardPartitions(this, esShardPartitions);

if (EsTable.TRANSPORT_HTTP.equals(getTransport())) {
this.esTablePartitions.addHttpAddress(nodesInfo);
}
esMetaStateTracker.run();
this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e);
this.esTablePartitions = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.doris.external.elasticsearch;

import org.apache.doris.common.UserException;

public class DorisEsException extends UserException {

private static final long serialVersionUID = 7912833584319374692L;
public class DorisEsException extends RuntimeException {

public DorisEsException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

/**
* Elasticsearch major version information, useful to check client's query compatibility with the Rest API.
*
* <p>
* reference es-hadoop:
*
*/
public class EsMajorVersion {

public static final EsMajorVersion V_0_X = new EsMajorVersion((byte) 0, "0.x");
public static final EsMajorVersion V_1_X = new EsMajorVersion((byte) 1, "1.x");
public static final EsMajorVersion V_2_X = new EsMajorVersion((byte) 2, "2.x");
public static final EsMajorVersion V_5_X = new EsMajorVersion((byte) 5, "5.x");
public static final EsMajorVersion V_6_X = new EsMajorVersion((byte) 6, "6.x");
public static final EsMajorVersion V_7_X = new EsMajorVersion((byte) 7, "7.x");
public static final EsMajorVersion V_8_X = new EsMajorVersion((byte) 8, "8.x");
public static final EsMajorVersion LATEST = V_7_X;

public final byte major;
Expand Down Expand Up @@ -62,7 +66,16 @@ public boolean onOrBefore(EsMajorVersion version) {
return version.major >= major;
}

public static EsMajorVersion parse(String version) throws Exception {
public static EsMajorVersion parse(String version) throws DorisEsException {
if (version.startsWith("0.")) {
return new EsMajorVersion((byte) 0, version);
}
if (version.startsWith("1.")) {
return new EsMajorVersion((byte) 1, version);
}
if (version.startsWith("2.")) {
return new EsMajorVersion((byte) 2, version);
}
if (version.startsWith("5.")) {
return new EsMajorVersion((byte) 5, version);
}
Expand All @@ -72,8 +85,12 @@ public static EsMajorVersion parse(String version) throws Exception {
if (version.startsWith("7.")) {
return new EsMajorVersion((byte) 7, version);
}
throw new Exception("Unsupported/Unknown Elasticsearch version [" + version + "]." +
"Highest supported version is [" + LATEST.version + "]. You may need to upgrade ES-Hadoop.");
// used for the next released ES version
if (version.startsWith("8.")) {
return new EsMajorVersion((byte) 8, version);
}
throw new DorisEsException("Unsupported/Unknown ES Cluster version [" + version + "]." +
"Highest supported version is [" + LATEST.version + "].");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.external.elasticsearch;

import org.apache.doris.catalog.EsTable;

import java.util.LinkedList;
import java.util.List;

/**
* It is responsible for this class to schedule all network request sent to remote ES Cluster
* Request sequence
* 1. GET /
* 2. GET {index}/_mapping
* 3. GET {index}/_search_shards
* <p>
* note: step 1 is not necessary
*/
public class EsMetaStateTracker {

private List<SearchPhase> builtinSearchPhase = new LinkedList<>();
private SearchContext searchContext;

public EsMetaStateTracker(EsRestClient client, EsTable esTable) {
builtinSearchPhase.add(new VersionPhase(client));
builtinSearchPhase.add(new MappingPhase(client));
builtinSearchPhase.add(new PartitionPhase(client));
searchContext = new SearchContext(esTable);
}

public SearchContext searchContext() {
return searchContext;
}

public void run() throws DorisEsException {
for (SearchPhase searchPhase : builtinSearchPhase) {
searchPhase.preProcess(searchContext);
searchPhase.execute(searchContext);
searchPhase.postProcess(searchContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class EsNodeInfo {
private boolean hasThrift;
private TNetworkAddress thriftAddress;

public EsNodeInfo(String id, Map<String, Object> map) throws Exception {
public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
this.id = id;
EsMajorVersion version = EsMajorVersion.parse((String) map.get("version"));
this.name = (String) map.get("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.doris.external.elasticsearch;


import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;

import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -33,7 +36,8 @@


/**
* It is used to call es api to get shard allocation state
* It is responsible for loading all ES external table's meta-data such as `fields`, `partitions` periodically,
* playing the `repo` role at Doris On ES
*/
public class EsRepository extends MasterDaemon {

Expand Down Expand Up @@ -69,8 +73,7 @@ public void deRegisterTable(long tableId) {
protected void runAfterCatalogReady() {
for (EsTable esTable : esTables.values()) {
try {
EsRestClient client = esClients.get(esTable.getId());
esTable.syncESIndexMeta(client);
esTable.syncTableMetaData(esClients.get(esTable.getId()));
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e);
esTable.setEsTablePartitions(null);
Expand Down
Loading