Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,26 @@ under the License.
<version>0.8.13</version>
</dependency>

<!-- spark -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-launcher_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
28 changes: 18 additions & 10 deletions fe/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ nonterminal List<DataDescription> data_desc_list;
nonterminal LabelName job_label;
nonterminal String opt_with_label;
nonterminal String opt_system;
nonterminal String opt_cluster;
nonterminal BrokerDesc opt_broker;
nonterminal ResourceDesc resource_desc;
nonterminal List<String> opt_col_list, col_list, opt_dup_keys, opt_columns_from_path;
nonterminal List<ColWithComment> opt_col_with_comment_list, col_with_comment_list;
nonterminal ColWithComment col_with_comment;
Expand Down Expand Up @@ -1197,6 +1197,13 @@ load_stmt ::=
{:
RESULT = new LoadStmt(label, dataDescList, broker, system, properties);
:}
| KW_LOAD KW_LABEL job_label:label
LPAREN data_desc_list:dataDescList RPAREN
resource_desc:resource
opt_properties:properties
{:
RESULT = new LoadStmt(label, dataDescList, resource, properties);
:}
;

job_label ::=
Expand Down Expand Up @@ -1364,15 +1371,16 @@ opt_broker ::=
:}
;

opt_cluster ::=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you just removed this grammar?

Copy link
Contributor Author

@wyb wyb Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoop uses opt_system, opt_cluster is no longer used

{:
RESULT = null;
:}
| KW_BY ident_or_text:cluster
{:
RESULT = cluster;
:}
;
resource_desc ::=
KW_WITH KW_RESOURCE ident_or_text:resourceName
{:
RESULT = new ResourceDesc(resourceName, null);
:}
| KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN
{:
RESULT = new ResourceDesc(resourceName, properties);
:}
;

// Routine load statement
create_routine_load_stmt ::=
Expand Down
11 changes: 9 additions & 2 deletions fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
import java.util.Map;

// Broker descriptor
//
// Broker example:
// WITH BROKER "broker0"
// (
// "username" = "user0",
// "password" = "password0"
// )
public class BrokerDesc implements Writable {
private String name;
private Map<String, String> properties;
Expand Down Expand Up @@ -82,9 +89,9 @@ public static BrokerDesc read(DataInput in) throws IOException {

public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" WITH BROKER ").append(name);
sb.append("WITH BROKER ").append(name);
if (properties != null && !properties.isEmpty()) {
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false);
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false, true);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();
Expand Down
83 changes: 75 additions & 8 deletions fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.doris.analysis;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.SparkLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Function;
Expand All @@ -39,7 +43,9 @@
// syntax:
// LOAD LABEL load_label
// (data_desc, ...)
// [broker_desc]
// [BY cluster]
// [resource_desc]
// [PROPERTIES (key1=value1, )]
//
// load_label:
Expand All @@ -53,6 +59,14 @@
// [COLUMNS TERMINATED BY separator ]
// [(col1, ...)]
// [SET (k1=f1(xx), k2=f2(xx))]
//
// broker_desc:
// WITH BROKER name
// (key2=value2, ...)
//
// resource_desc:
// WITH RESOURCE name
// (key3=value3, ...)
public class LoadStmt extends DdlStmt {
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
Expand Down Expand Up @@ -80,11 +94,16 @@ public class LoadStmt extends DdlStmt {
private final List<DataDescription> dataDescriptions;
private final BrokerDesc brokerDesc;
private final String cluster;
private final ResourceDesc resourceDesc;
private final Map<String, String> properties;
private String user;
private EtlJobType etlJobType = EtlJobType.UNKNOWN;

private String version = "v2";

// TODO(wyb): spark-load
public static boolean disableSparkLoad = true;

// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(TIMEOUT_PROPERTY)
Expand All @@ -96,13 +115,25 @@ public class LoadStmt extends DdlStmt {
.add(VERSION)
.add(TIMEZONE)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
BrokerDesc brokerDesc, String cluster, Map<String, String> properties) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = brokerDesc;
this.cluster = cluster;
this.resourceDesc = null;
this.properties = properties;
this.user = null;
}

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
ResourceDesc resourceDesc, Map<String, String> properties) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = null;
this.cluster = null;
this.resourceDesc = resourceDesc;
this.properties = properties;
this.user = null;
}
Expand All @@ -123,6 +154,10 @@ public String getCluster() {
return cluster;
}

public ResourceDesc getResourceDesc() {
return resourceDesc;
}

public Map<String, String> getProperties() {
return properties;
}
Expand All @@ -131,12 +166,21 @@ public String getUser() {
return user;
}

public EtlJobType getEtlJobType() {
return etlJobType;
}

public static void checkProperties(Map<String, String> properties) throws DdlException {
if (properties == null) {
return;
}

for (Entry<String, String> entry : properties.entrySet()) {
// temporary use for global dict
if (entry.getKey().startsWith(SparkLoadJob.BITMAP_DATA_PROPERTY)) {
continue;
}

if (!PROPERTIES_SET.contains(entry.getKey())) {
throw new DdlException(entry.getKey() + " is invalid property");
}
Expand Down Expand Up @@ -224,11 +268,34 @@ public void analyze(Analyzer analyzer) throws UserException {
throw new AnalysisException("No data file in load statement.");
}
for (DataDescription dataDescription : dataDescriptions) {
if (brokerDesc == null) {
if (brokerDesc == null && resourceDesc == null) {
dataDescription.setIsHadoopLoad(true);
}
dataDescription.analyze(label.getDbName());
}

if (resourceDesc != null) {
resourceDesc.analyze();
etlJobType = resourceDesc.getEtlJobType();
// TODO(wyb): spark-load
if (disableSparkLoad) {
throw new AnalysisException("Spark Load is comming soon");
}
// check resource usage privilege
if (!Catalog.getCurrentCatalog().getAuth().checkResourcePriv(ConnectContext.get(),
resourceDesc.getName(),
PrivPredicate.USAGE)) {
throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
+ "'@'" + ConnectContext.get().getRemoteIP()
+ "' for resource '" + resourceDesc.getName() + "'");
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
} else {
// if cluster is null, use default hadoop cluster
// if cluster is not null, use this hadoop cluster
etlJobType = EtlJobType.HADOOP;
}

try {
checkProperties(properties);
Expand All @@ -242,7 +309,7 @@ public void analyze(Analyzer analyzer) throws UserException {

@Override
public boolean needAuditEncryption() {
if (brokerDesc != null) {
if (brokerDesc != null || resourceDesc != null) {
return true;
}
return false;
Expand All @@ -263,16 +330,16 @@ public Object apply(DataDescription dataDescription) {
return dataDescription.toSql();
}
})).append(")");
if (brokerDesc != null) {
sb.append("\n").append(brokerDesc.toSql());
}
if (cluster != null) {
sb.append("\nBY '");
sb.append(cluster);
sb.append("'");
}

if (brokerDesc != null) {
sb.append("\n WITH BROKER '").append(brokerDesc.getName()).append("' (");
sb.append(new PrintableMap<String, String>(brokerDesc.getProperties(), "=", true, false, true));
sb.append(")");
if (resourceDesc != null) {
sb.append("\n").append(resourceDesc.toSql());
}

if (properties != null && !properties.isEmpty()) {
Expand Down
90 changes: 90 additions & 0 deletions fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.load.EtlJobType;

import com.google.common.collect.Maps;

import java.util.Map;

// Resource descriptor
//
// Spark example:
// WITH RESOURCE "spark0"
// (
// "spark.jars" = "xxx.jar,yyy.jar",
// "spark.files" = "/tmp/aaa,/tmp/bbb",
// "spark.executor.memory" = "1g",
// "spark.yarn.queue" = "queue0"
// )
public class ResourceDesc {
protected String name;
protected Map<String, String> properties;
protected EtlJobType etlJobType;

// Only used for recovery
private ResourceDesc() {
}

public ResourceDesc(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
}
this.etlJobType = EtlJobType.UNKNOWN;
}

public String getName() {
return name;
}

public Map<String, String> getProperties() {
return properties;
}

public EtlJobType getEtlJobType() {
return etlJobType;
}

public void analyze() throws AnalysisException {
// check resource exist or not
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(getName());
if (resource == null) {
throw new AnalysisException("Resource does not exist. name: " + getName());
}
if (resource.getType() == Resource.ResourceType.SPARK) {
etlJobType = EtlJobType.SPARK;
}
}

public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("WITH RESOURCE '").append(name).append("'");
if (properties != null && !properties.isEmpty()) {
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false, true);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();
}
}
Loading