From 43ded30d431372be68d1e2716325e6eb8e4a77cc Mon Sep 17 00:00:00 2001
From: gnehil
Date: Fri, 27 Dec 2024 09:54:42 +0800
Subject: [PATCH 1/2] [feature](load) new insgestion load (#45937)
### What problem does this PR solve?
Problem Summary:
Ingestion Load is used to load pre-processed data into doris.
Preprocessing refers to writing the result data to an external storage
system after the data is processed according to the partitioning,
bucketing and aggregation methods defined by the doris table.
The preprocessing is completed by the external system, and then the BE
reads the data and converts it into segment files and saves it.
The basic flow is as follows:

### Release note
[feature](load) new insgestion load
(cherry picked from commit 6580f6bfbcea53efaf0f21ea556f781ce65b98b7)
---
.github/actions/action-pr-title | 2 +-
.github/actions/ccache-action | 2 +-
.github/actions/get-workflow-origin | 2 +-
.github/actions/paths-filter | 2 +-
be/src/apache-orc | 2 +-
be/src/clucene | 2 +-
be/src/olap/push_handler.cpp | 53 +-
.../java/org/apache/doris/common/Config.java | 4 +
.../apache/doris/sparkdpp/EtlJobConfig.java | 6 +-
.../apache/doris/catalog/SparkResource.java | 1 +
.../apache/doris/httpv2/rest/LoadAction.java | 211 +++
.../org/apache/doris/load/EtlJobType.java | 1 +
.../doris/load/loadv2/IngestionLoadJob.java | 1139 +++++++++++++++++
.../org/apache/doris/load/loadv2/LoadJob.java | 5 +
.../apache/doris/load/loadv2/LoadManager.java | 50 +-
.../doris/load/loadv2/SparkEtlJobHandler.java | 1 +
.../load/loadv2/SparkLauncherMonitor.java | 1 +
.../doris/load/loadv2/SparkLoadAppHandle.java | 1 +
.../doris/load/loadv2/SparkLoadJob.java | 1 +
.../load/loadv2/SparkLoadPendingTask.java | 8 +-
.../loadv2/SparkPendingTaskAttachment.java | 1 +
.../doris/load/loadv2/SparkRepository.java | 1 +
.../load/loadv2/SparkYarnConfigFiles.java | 1 +
.../org/apache/doris/master/MasterImpl.java | 7 +-
.../apache/doris/persist/gson/GsonUtils.java | 9 +-
.../load/loadv2/SparkLoadPendingTaskTest.java | 26 +-
.../MinimumCoverageRollupTreeBuilderTest.java | 8 +-
.../load/loadv2/etl/SparkEtlJobTest.java | 4 +-
.../data/load_p0/ingestion_load/data.parquet | Bin 0 -> 5745 bytes
.../data/load_p0/ingestion_load/data1.parquet | Bin 0 -> 4057 bytes
.../load_p0/ingestion_load/data2-0.parquet | Bin 0 -> 851 bytes
.../load_p0/ingestion_load/data2-1.parquet | Bin 0 -> 781 bytes
.../load_p0/ingestion_load/data2-2.parquet | Bin 0 -> 781 bytes
.../load_p0/ingestion_load/data2-3.parquet | Bin 0 -> 839 bytes
.../ingestion_load/test_ingestion_load.out | 37 +
.../test_ingestion_load_multi_table.out | 25 +
...est_ingestion_load_with_inverted_index.out | 13 +
.../test_ingestion_load_with_partition.out | 7 +
.../ingestion_load/test_ingestion_load.groovy | 222 ++++
.../test_ingestion_load_alter_column.groovy | 208 +++
...test_ingestion_load_alter_partition.groovy | 224 ++++
.../test_ingestion_load_drop_table.groovy | 196 +++
.../test_ingestion_load_multi_table.groovy | 208 +++
..._ingestion_load_with_inverted_index.groovy | 166 +++
.../test_ingestion_load_with_partition.groovy | 160 +++
45 files changed, 2972 insertions(+), 45 deletions(-)
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/loadv2/IngestionLoadJob.java
create mode 100644 regression-test/data/load_p0/ingestion_load/data.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data1.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-0.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-1.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-2.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/data2-3.parquet
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load.out
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load_multi_table.out
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load_with_inverted_index.out
create mode 100644 regression-test/data/load_p0/ingestion_load/test_ingestion_load_with_partition.out
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_column.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_partition.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_drop_table.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_multi_table.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_inverted_index.groovy
create mode 100644 regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_partition.groovy
diff --git a/.github/actions/action-pr-title b/.github/actions/action-pr-title
index 10f7ff082a0f12..077bddd7bdabd0 160000
--- a/.github/actions/action-pr-title
+++ b/.github/actions/action-pr-title
@@ -1 +1 @@
-Subproject commit 10f7ff082a0f1239f8cc39ccba39d11f32ca2407
+Subproject commit 077bddd7bdabd0d2b1b25ed0754c7e62e184d7ee
diff --git a/.github/actions/ccache-action b/.github/actions/ccache-action
index ca3acd2731eef1..3cfe8f57e1c7bf 160000
--- a/.github/actions/ccache-action
+++ b/.github/actions/ccache-action
@@ -1 +1 @@
-Subproject commit ca3acd2731eef11f1572ccb126356c2f9298d35e
+Subproject commit 3cfe8f57e1c7bffe434f38879f1ebca09e169288
diff --git a/.github/actions/get-workflow-origin b/.github/actions/get-workflow-origin
index e2dae063368361..3778755869bc9c 160000
--- a/.github/actions/get-workflow-origin
+++ b/.github/actions/get-workflow-origin
@@ -1 +1 @@
-Subproject commit e2dae063368361e4cd1f510e8785cd73bca9352e
+Subproject commit 3778755869bc9ca829e7b45b5d179fa000f97b44
diff --git a/.github/actions/paths-filter b/.github/actions/paths-filter
index 4512585405083f..de90cc6fb38fc0 160000
--- a/.github/actions/paths-filter
+++ b/.github/actions/paths-filter
@@ -1 +1 @@
-Subproject commit 4512585405083f25c027a35db413c2b3b9006d50
+Subproject commit de90cc6fb38fc0963ad72b210f1f284cd68cea36
diff --git a/be/src/apache-orc b/be/src/apache-orc
index ef68c6ff736a84..72787269f5f52a 160000
--- a/be/src/apache-orc
+++ b/be/src/apache-orc
@@ -1 +1 @@
-Subproject commit ef68c6ff736a84c8c7185d4a08397c67eff53ad6
+Subproject commit 72787269f5f52ab0174bac1dbf54050bb7b60242
diff --git a/be/src/clucene b/be/src/clucene
index 4f5449c903778f..30b63dc3406899 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 4f5449c903778fae32884586c728587c24a58806
+Subproject commit 30b63dc34068996c15d451a27d0593c519cb97fc
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 99637eaf764a7b..896c472f79abe2 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -34,16 +34,17 @@
#include
#include
#include
+#include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/cumulative_compaction_time_series_policy.h"
+#include "io/hdfs_builder.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/pending_rowset_helper.h"
-#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
@@ -54,10 +55,11 @@
#include "olap/txn_manager.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/simple_function_factory.h"
@@ -355,8 +357,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
_file_params.expr_of_dest_slot = _params.expr_of_dest_slot;
_file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans;
_file_params.strict_mode = _params.strict_mode;
- _file_params.__isset.broker_addresses = true;
- _file_params.broker_addresses = t_scan_range.broker_addresses;
+ if (_ranges[0].file_type == TFileType::FILE_HDFS) {
+ _file_params.hdfs_params = parse_properties(_params.properties);
+ } else {
+ _file_params.__isset.broker_addresses = true;
+ _file_params.broker_addresses = t_scan_range.broker_addresses;
+ }
for (const auto& range : _ranges) {
TFileRangeDesc file_range;
@@ -485,17 +491,36 @@ Status PushBrokerReader::_cast_to_input_block() {
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
- vectorized::ColumnsWithTypeAndName arguments {
- arg,
- {vectorized::DataTypeString().create_column_const(
- arg.column->size(), remove_nullable(return_type)->get_family_name()),
- std::make_shared(), ""}};
- auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
- "CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
- RETURN_IF_ERROR(
- func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
- _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ // bitmap convert:src -> to_base64 -> bitmap_from_base64
+ if (slot_desc->type().is_bitmap_type()) {
+ auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
+ vectorized::DataTypeString().get_type_as_type_descriptor(),
+ slot_desc->is_nullable());
+ auto func_to_base64 = vectorized::SimpleFunctionFactory::instance().get_function(
+ "to_base64", {arg}, base64_return_type);
+ RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
+ arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
+ auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
+ auto func_bitmap_from_base64 =
+ vectorized::SimpleFunctionFactory::instance().get_function(
+ "bitmap_from_base64", {arg_base64}, return_type);
+ RETURN_IF_ERROR(func_bitmap_from_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
+ arg_base64.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ } else {
+ vectorized::ColumnsWithTypeAndName arguments {
+ arg,
+ {vectorized::DataTypeString().create_column_const(
+ arg.column->size(), remove_nullable(return_type)->get_family_name()),
+ std::make_shared(), ""}};
+ auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
+ "CAST", arguments, return_type);
+ RETURN_IF_ERROR(
+ func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ }
}
return Status::OK();
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ab09c68998f24b..e7f1f38b2ecafb 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -667,6 +667,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = EnvUtils.getDorisHome() + "/lib/yarn-config";
+ @ConfField(mutable = true, masterOnly = true, description = {"Ingestion load 的默认超时时间,单位是秒。",
+ "Default timeout for ingestion load job, in seconds."})
+ public static int ingestion_load_default_timeout_second = 86400; // 1 day
+
@ConfField(mutable = true, masterOnly = true, description = {"Broker Load 的最大等待 job 数量。"
+ "这个值是一个期望值。在某些情况下,比如切换 master,当前等待的 job 数量可能会超过这个值。",
"Maximal number of waiting jobs for Broker Load. This is a desired number. "
diff --git a/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java b/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
index c59901d383b648..8d9d5de54b59f1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
@@ -371,14 +371,17 @@ public static class EtlIndex implements Serializable {
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
+ @SerializedName(value = "schemaVersion")
+ public int schemaVersion;
public EtlIndex(long indexId, List etlColumns, int schemaHash,
- String indexType, boolean isBaseIndex) {
+ String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
+ this.schemaVersion = schemaVersion;
}
public EtlColumn getColumn(String name) {
@@ -398,6 +401,7 @@ public String toString() {
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ + ", schemaVersion=" + schemaVersion
+ '}';
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index 59b6d16801e3f1..4e85b8208a78a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -71,6 +71,7 @@
*
* DROP RESOURCE "spark0";
*/
+@Deprecated
public class SparkResource extends Resource {
private static final Logger LOG = LogManager.getLogger(SparkResource.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 3511d01d210db7..415e8947467751 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -27,13 +27,21 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.rest.manager.HttpUtils;
+import org.apache.doris.load.FailMsg;
import org.apache.doris.load.StreamLoadHandler;
+import org.apache.doris.load.loadv2.IngestionLoadJob;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.GroupCommitPlanner;
@@ -45,9 +53,14 @@
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.transaction.BeginTransactionException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -59,10 +72,14 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
@@ -716,4 +733,198 @@ private Backend selectBackendForGroupCommit(String clusterName, HttpServletReque
}
return backend;
}
+
+ /**
+ * Request body example:
+ * {
+ * "label": "test",
+ * "tableToPartition": {
+ * "tbl_test_spark_load": ["p1","p2"]
+ * },
+ * "properties": {
+ * "strict_mode": "true",
+ * "timeout": 3600000
+ * }
+ * }
+ *
+ */
+ @RequestMapping(path = "/api/ingestion_load/{" + CATALOG_KEY + "}/{" + DB_KEY
+ + "}/_create", method = RequestMethod.POST)
+ public Object createIngestionLoad(HttpServletRequest request, HttpServletResponse response,
+ @PathVariable(value = CATALOG_KEY) String catalog,
+ @PathVariable(value = DB_KEY) String db) {
+ if (needRedirect(request.getScheme())) {
+ return redirectToHttps(request);
+ }
+
+ executeCheckPassword(request, response);
+
+ if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog)) {
+ return ResponseEntityBuilder.okWithCommonError("Only support internal catalog. "
+ + "Current catalog is " + catalog);
+ }
+
+ Object redirectView = redirectToMaster(request, response);
+ if (redirectView != null) {
+ return redirectView;
+ }
+
+ String fullDbName = getFullDbName(db);
+
+ Map resultMap = new HashMap<>();
+
+ try {
+
+ String body = HttpUtils.getBody(request);
+ JsonMapper mapper = JsonMapper.builder().build();
+ JsonNode jsonNode = mapper.reader().readTree(body);
+
+ String label = jsonNode.get("label").asText();
+ Map> tableToPartition = mapper.reader()
+ .readValue(jsonNode.get("tableToPartition").traverse(),
+ new TypeReference
+ * Load data file which has been pre-processed
+ *
+ * There are 4 steps in IngestionLoadJob:
+ * Step1: Outside system execute ingestion etl job.
+ * Step2: LoadEtlChecker will check ingestion etl job status periodically
+ * and send push tasks to be when ingestion etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
+ */
+public class IngestionLoadJob extends LoadJob {
+
+ public static final Logger LOG = LogManager.getLogger(IngestionLoadJob.class);
+
+ @Setter
+ @SerializedName("ests")
+ private EtlStatus etlStatus;
+
+ // members below updated when job state changed to loading
+ // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
+ @SerializedName(value = "tm2fi")
+ private final Map> tabletMetaToFileInfo = Maps.newHashMap();
+
+ @SerializedName(value = "hp")
+ private final Map hadoopProperties = new HashMap<>();
+
+ @SerializedName(value = "i2sv")
+ private final Map indexToSchemaVersion = new HashMap<>();
+
+ private final Map indexToSchemaHash = Maps.newHashMap();
+
+ private final Map filePathToSize = new HashMap<>();
+
+ private final Set finishedReplicas = Sets.newHashSet();
+ private final Set quorumTablets = Sets.newHashSet();
+ private final Set fullTablets = Sets.newHashSet();
+
+ private final List commitInfos = Lists.newArrayList();
+
+ private final Map> tableToLoadPartitions = Maps.newHashMap();
+
+ private final Map> tabletToSentReplicaPushTask = Maps.newHashMap();
+
+ private long etlStartTimestamp = -1;
+
+ private long quorumFinishTimestamp = -1;
+
+ private List loadTableIds = new ArrayList<>();
+
+ public IngestionLoadJob() {
+ super(EtlJobType.INGESTION);
+ }
+
+ public IngestionLoadJob(long dbId, String label, List tableNames, UserIdentity userInfo)
+ throws LoadException {
+ super(EtlJobType.INGESTION, dbId, label);
+ this.loadTableIds = getLoadTableIds(tableNames);
+ this.userInfo = userInfo;
+ }
+
+ @Override
+ public Set getTableNamesForShow() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set getTableNames() throws MetaNotFoundException {
+ Set result = Sets.newHashSet();
+ Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+ for (long tableId : loadTableIds) {
+ Table table = database.getTableOrMetaException(tableId);
+ result.add(table.getName());
+ }
+ return result;
+ }
+
+ @Override
+ public void afterVisible(TransactionState txnState, boolean txnOperated) {
+ super.afterVisible(txnState, txnOperated);
+ clearJob();
+ }
+
+ @Override
+ public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
+ throws UserException {
+ super.afterAborted(txnState, txnOperated, txnStatusChangeReason);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
+ super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJob(FailMsg failMsg) throws DdlException {
+ super.cancelJob(failMsg);
+ clearJob();
+ }
+
+ private List getLoadTableIds(List tableNames) throws LoadException {
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
+ List list = new ArrayList<>(tableNames.size());
+ for (String tableName : tableNames) {
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
+ s -> new LoadException("table does not exist. id: " + s));
+ list.add(olapTable.getId());
+ }
+ return list;
+ }
+
+ @Override
+ protected long getEtlStartTimestamp() {
+ return etlStartTimestamp;
+ }
+
+ public long beginTransaction()
+ throws BeginTransactionException, MetaNotFoundException, AnalysisException, QuotaExceedException,
+ LabelAlreadyUsedException, DuplicatedRequestException {
+ this.transactionId = Env.getCurrentGlobalTransactionMgr()
+ .beginTransaction(dbId, loadTableIds, label, null,
+ new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()),
+ TransactionState.LoadJobSourceType.FRONTEND, id, getTimeout());
+ return transactionId;
+ }
+
+ public Map getLoadMeta(Map> tableToPartitionMap)
+ throws LoadException {
+
+ if (tableToPartitionMap == null || tableToPartitionMap.isEmpty()) {
+ throw new IllegalArgumentException("tableToPartitionMap is empty");
+ }
+
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
+ Map loadMeta = new HashMap<>();
+ loadMeta.put("dbId", db.getId());
+ Long signature = Env.getCurrentEnv().getNextId();
+ loadMeta.put("signature", signature);
+
+ List tables;
+ try {
+ tables = db.getTablesOnIdOrderOrThrowException(loadTableIds);
+ } catch (MetaNotFoundException e) {
+ throw new LoadException(e.getMessage());
+ }
+
+ MetaLockUtils.readLockTables(tables);
+ try {
+ Map> tableMeta = new HashMap<>(tableToPartitionMap.size());
+ for (Map.Entry> entry : tableToPartitionMap.entrySet()) {
+ String tableName = entry.getKey();
+ Map meta = tableMeta.getOrDefault(tableName, new HashMap<>());
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
+ s -> new LoadException("table does not exist. id: " + s));
+ meta.put("id", olapTable.getId());
+ List indices = createEtlIndexes(olapTable);
+ meta.put("indexes", indices);
+ List partitionNames = entry.getValue();
+ Set partitionIds;
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ partitionIds = new HashSet<>(partitionNames.size());
+ for (String partitionName : partitionNames) {
+ Partition partition = olapTable.getPartition(partitionName);
+ if (partition == null) {
+ throw new LoadException(String.format("partition %s is not exists", partitionName));
+ }
+ partitionIds.add(partition.getId());
+ }
+ } else {
+ partitionIds =
+ olapTable.getAllPartitions().stream().map(Partition::getId).collect(Collectors.toSet());
+ }
+ EtlJobConfig.EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(olapTable, partitionIds);
+ meta.put("partitionInfo", etlPartitionInfo);
+ tableMeta.put(tableName, meta);
+
+ if (tableToLoadPartitions.containsKey(olapTable.getId())) {
+ tableToLoadPartitions.get(olapTable.getId()).addAll(partitionIds);
+ } else {
+ tableToLoadPartitions.put(olapTable.getId(), partitionIds);
+ }
+
+ }
+ loadMeta.put("tableMeta", tableMeta);
+ } finally {
+ MetaLockUtils.readUnlockTables(tables);
+ }
+ return loadMeta;
+
+ }
+
+ private List createEtlIndexes(OlapTable table) throws LoadException {
+ List etlIndexes = Lists.newArrayList();
+
+ for (Map.Entry> entry : table.getIndexIdToSchema().entrySet()) {
+ long indexId = entry.getKey();
+ // todo(liheng): get schema hash and version from materialized index meta directly
+ MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(indexId);
+ int schemaHash = indexMeta.getSchemaHash();
+ int schemaVersion = indexMeta.getSchemaVersion();
+
+ boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
+ && table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
+
+ // columns
+ List etlColumns = Lists.newArrayList();
+ for (Column column : entry.getValue()) {
+ etlColumns.add(createEtlColumn(column, changeAggType));
+ }
+
+ // check distribution type
+ DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+ if (distributionInfo.getType() != DistributionInfo.DistributionInfoType.HASH) {
+ // RANDOM not supported
+ String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name();
+ LOG.warn(errMsg);
+ throw new LoadException(errMsg);
+ }
+
+ // index type
+ String indexType;
+ KeysType keysType = table.getKeysTypeByIndexId(indexId);
+ switch (keysType) {
+ case DUP_KEYS:
+ indexType = "DUPLICATE";
+ break;
+ case AGG_KEYS:
+ indexType = "AGGREGATE";
+ break;
+ case UNIQUE_KEYS:
+ indexType = "UNIQUE";
+ break;
+ default:
+ String errMsg = "unknown keys type. type: " + keysType.name();
+ LOG.warn(errMsg);
+ throw new LoadException(errMsg);
+ }
+
+ indexToSchemaVersion.put(indexId, schemaVersion);
+
+ etlIndexes.add(new EtlJobConfig.EtlIndex(indexId, etlColumns, schemaHash, indexType,
+ indexId == table.getBaseIndexId(), schemaVersion));
+ }
+
+ return etlIndexes;
+ }
+
+ private EtlJobConfig.EtlColumn createEtlColumn(Column column, boolean changeAggType) {
+ // column name
+ String name = column.getName().toLowerCase(Locale.ROOT);
+ // column type
+ PrimitiveType type = column.getDataType();
+ String columnType = column.getDataType().toString();
+ // is allow null
+ boolean isAllowNull = column.isAllowNull();
+ // is key
+ boolean isKey = column.isKey();
+
+ // aggregation type
+ String aggregationType = null;
+ if (column.getAggregationType() != null) {
+ if (changeAggType && !column.isKey()) {
+ aggregationType = AggregateType.REPLACE.toSql();
+ } else {
+ aggregationType = column.getAggregationType().toString();
+ }
+ }
+
+ // default value
+ String defaultValue = null;
+ if (column.getDefaultValue() != null) {
+ defaultValue = column.getDefaultValue();
+ }
+ if (column.isAllowNull() && column.getDefaultValue() == null) {
+ defaultValue = "\\N";
+ }
+
+ // string length
+ int stringLength = 0;
+ if (type.isStringType()) {
+ stringLength = column.getStrLen();
+ }
+
+ // decimal precision scale
+ int precision = 0;
+ int scale = 0;
+ if (type.isDecimalV2Type() || type.isDecimalV3Type()) {
+ precision = column.getPrecision();
+ scale = column.getScale();
+ }
+
+ return new EtlJobConfig.EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
+ stringLength, precision, scale);
+ }
+
+ private EtlJobConfig.EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set partitionIds)
+ throws LoadException {
+ PartitionType type = table.getPartitionInfo().getType();
+
+ List partitionColumnRefs = Lists.newArrayList();
+ List etlPartitions = Lists.newArrayList();
+ if (type == PartitionType.RANGE) {
+ RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
+ for (Column column : rangePartitionInfo.getPartitionColumns()) {
+ partitionColumnRefs.add(column.getName());
+ }
+
+ for (Map.Entry entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) {
+ long partitionId = entry.getKey();
+ if (!partitionIds.contains(partitionId)) {
+ continue;
+ }
+
+ Partition partition = table.getPartition(partitionId);
+ if (partition == null) {
+ throw new LoadException("partition does not exist. id: " + partitionId);
+ }
+
+ // bucket num
+ int bucketNum = partition.getDistributionInfo().getBucketNum();
+
+ // is max partition
+ Range range = entry.getValue().getItems();
+ boolean isMaxPartition = range.upperEndpoint().isMaxValue();
+
+ // start keys
+ List rangeKeyExprs = range.lowerEndpoint().getKeys();
+ List