-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
api-reviewCategorizes an issue or PR as actively needing an API review.Categorizes an issue or PR as actively needing an API review.area/loadIssues or PRs related to all kinds of loadIssues or PRs related to all kinds of loadkind/api-changeCategorizes issue or PR as related to adding, removing, or otherwise changing an APICategorizes issue or PR as related to adding, removing, or otherwise changing an APIkind/featureCategorizes issue or PR as related to a new feature.Categorizes issue or PR as related to a new feature.
Description
- User interface:
- Submit Spark load job
LOAD LABEL db_name.label_name
(
DATA INFILE "/tmp/file" FROM TABLE hive_db.table ...,
DATA INFILE "/tmp/file1" ...
-- DATA FROM TABLE hive_cluster.db.table ...
)
WITH spark.cluster_name
[PROPERTIES (key1=value1, ... )]spark.cluster_name is the name of the cluster used by Spark ETL application. You can use the following sql command to manage it.
- Spark cluster info management
-- Add import cluster for user 'user_name'
SET PROPERTY FOR 'user_name'
'load_cluster.spark.cluster_name.output_path' = '/user/output',
'load_cluster.spark.cluster_name.configs' = 'key1=value1;key2=value2';
-- Delete import cluster for user 'user_name'
SET PROPERTY FOR 'user_name' 'load_cluster.spark.cluster_name' = '';
-- Set default import cluster for user 'user_name'
SET PROPERTY FOR 'user_name' 'default_load_cluster' = 'spark.cluster_name';- Spark ETL application interface
public EtlSubmitResult submitEtlJob(long jobId, long txnId, JobConf jobConf)
public EtlStatus getEtlJobStatus(String etlJobId)
public void killEtlJob(String etlJobId)
public Map<String, Long> getEtlFiles(String outputPath)2.1 jobConf includes:
-
Spark cluster infos
-
ETL output directory and file name format
-
Schema of the imported table, including columns, partitions, and rollups
-
Infos of the source file, including split rules, corresponding columns, and conversion rules
2.2 Save config as local json file named config.json for Spark ETL app.
{
"tables": {
10014: {
"columns": {
"k1": {
"default_value": "\\N",
"column_type": "DATETIME",
"is_allow_null": true
},
"k2": {
"default_value": "0",
"column_type": "SMALLINT",
"is_allow_null": true
},
"v": {
"default_value": "0",
"column_type": "BIGINT",
"is_allow_null": false
}
},
"indexes": {
10014: {
"column_refs": [{
"name": "k1",
"is_key": true,
"aggregation_type": "NONE"
}, {
"name": "k2",
"is_key": true,
"aggregation_type": "NONE"
}, {
"name": "v",
"is_key": false,
"aggregation_type": "NONE"
}],
"schema_hash": 1294206574
},
10017: {
"column_refs": [{
"name": "k1",
"is_key": true,
"aggregation_type": "NONE"
}, {
"name": "v",
"is_key": false,
"aggregation_type": "SUM"
}],
"schema_hash": 1294206575
}
},
"partition_info": {
"partition_type": "range", // or unpartitioned
"partition_column_refs": ["k2"],
"distribution_column_refs": ["k1"],
"partitions": {
10020: {
"start_keys": [-100],
"end_keys": [10],
"is_max_partition": false,
"buckets_num": 3
}
}
},
"sources": {
"source0": {
"partitions": [10020],
"file_urls": ["hdfs://hdfs_host:port/user/palo/test/file"],
"columns": ["tmp_k1", "k2"],
"column_separator": ",",
"column_mappings": {
"k1": {
"function_name": "strftime",
"args": ["%Y-%m-%d %H:%M:%S", "tmp_k1"]
}
},
"where": "k2 > 10",
"is_negative": false,
"hive_table_name": "hive_db.table" // for global dict
}
}
}
},
"output_path": "hdfs://hdfs_host:port/user/output/10003/label1/1582599203397",
"output_file_pattern": "label1.%(table_id)d.%(index_id)d.%(bucket)d.%(schema_hash)d"
}2.3 getEtlJobStatus returns the status of Spark ETL app, including state, numCompletedTasks, numTasks, counters.
2.4 getEtlFiles returns a Map, the key is the file generated by ETL, and the value is the size of file.
Metadata
Metadata
Assignees
Labels
api-reviewCategorizes an issue or PR as actively needing an API review.Categorizes an issue or PR as actively needing an API review.area/loadIssues or PRs related to all kinds of loadIssues or PRs related to all kinds of loadkind/api-changeCategorizes issue or PR as related to adding, removing, or otherwise changing an APICategorizes issue or PR as related to adding, removing, or otherwise changing an APIkind/featureCategorizes issue or PR as related to a new feature.Categorizes issue or PR as related to a new feature.