Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3602,7 +3602,7 @@ public List<Type> getReturnTypes(Queriable stmt) {
}

private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
LOG.info("TUniqueId: {} generate stream load plan", queryId);
LOG.info("TUniqueId: {} generate stream load plan", DebugUtil.printId(queryId));
context.setQueryId(queryId);
context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());

Expand All @@ -3611,7 +3611,6 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
"Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName());
context.getState().setNereids(true);
InsertIntoTableCommand insert = (InsertIntoTableCommand) ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
HttpStreamParams httpStreamParams = new HttpStreamParams();

try {
if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
Expand All @@ -3621,6 +3620,7 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
context.setGroupCommit(true);
}
OlapInsertExecutor insertExecutor = (OlapInsertExecutor) insert.initPlan(context, this);
HttpStreamParams httpStreamParams = new HttpStreamParams();
httpStreamParams.setTxnId(insertExecutor.getTxnId());
httpStreamParams.setDb(insertExecutor.getDatabase());
httpStreamParams.setTable(insertExecutor.getTable());
Expand All @@ -3637,6 +3637,7 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
if (!isValidPlan) {
throw new AnalysisException("plan is invalid: " + planRoot.getExplainString());
}
return httpStreamParams;
} catch (QueryStateException e) {
LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e);
context.setState(e.getQueryState());
Expand All @@ -3655,7 +3656,6 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
throw new NereidsException("Command (" + originStmt.originStmt + ") process failed.",
new AnalysisException(e.getMessage(), e));
}
return httpStreamParams;
}

private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) throws Exception {
Expand Down Expand Up @@ -3687,12 +3687,11 @@ private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) throws

public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
HttpStreamParams httpStreamParams = null;
try {
try {
// disable shuffle for http stream (only 1 sink)
sessionVariable.setVarOnce(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, "false");
httpStreamParams = generateHttpStreamNereidsPlan(queryId);
return generateHttpStreamNereidsPlan(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId()));
Expand All @@ -3704,8 +3703,8 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti
}
if (e instanceof NereidsException) {
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
throw ((NereidsException) e).getException();
}
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -3721,7 +3720,6 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
}
}
return httpStreamParams;
}

public SummaryProfile getSummaryProfile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2213,8 +2213,8 @@ private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, Conne
LOG.warn("exec sql error", e);
throw e;
} catch (Throwable e) {
LOG.warn("exec sql error catch unknown result.", e);
throw new UserException("exec sql error catch unknown result." + e);
LOG.warn("exec sql: {} catch unknown result. ", originStmt, e);
throw new UserException("exec sql error catch unknown result. " + e.getMessage());
}
return httpStreamParams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ suite("test_http_stream", "p0") {
}
}

// test error sql
streamLoad {
set 'version', '1'
set 'sql', """
insert into ${db}.${tableName1} (id, name) select
"""
time 10000
file 'test_http_stream.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("http_stream result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("Nereids parse failed"))
}
}

qt_sql1 "select id, name from ${tableName1}"
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName1}"
Expand Down
Loading