diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ddd9ed971d408f..f75d0da9ed9fad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -3608,7 +3608,7 @@ public List getReturnTypes(Queriable stmt) { } private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) { - LOG.info("TUniqueId: {} generate stream load plan", queryId); + LOG.info("TUniqueId: {} generate stream load plan", DebugUtil.printId(queryId)); context.setQueryId(queryId); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); @@ -3617,7 +3617,6 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) { "Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName()); context.getState().setNereids(true); InsertIntoTableCommand insert = (InsertIntoTableCommand) ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); - HttpStreamParams httpStreamParams = new HttpStreamParams(); try { if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) { @@ -3627,6 +3626,7 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) { context.setGroupCommit(true); } OlapInsertExecutor insertExecutor = (OlapInsertExecutor) insert.initPlan(context, this); + HttpStreamParams httpStreamParams = new HttpStreamParams(); httpStreamParams.setTxnId(insertExecutor.getTxnId()); httpStreamParams.setDb(insertExecutor.getDatabase()); httpStreamParams.setTable(insertExecutor.getTable()); @@ -3643,6 +3643,7 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) { if (!isValidPlan) { throw new AnalysisException("plan is invalid: " + planRoot.getExplainString()); } + return httpStreamParams; } catch (QueryStateException e) { LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e); context.setState(e.getQueryState()); @@ -3661,7 +3662,6 @@ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) { throw new NereidsException("Command (" + originStmt.originStmt + ") process failed.", new AnalysisException(e.getMessage(), e)); } - return httpStreamParams; } private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) throws Exception { @@ -3693,12 +3693,11 @@ private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) throws public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Exception { SessionVariable sessionVariable = context.getSessionVariable(); - HttpStreamParams httpStreamParams = null; try { try { // disable shuffle for http stream (only 1 sink) sessionVariable.setVarOnce(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, "false"); - httpStreamParams = generateHttpStreamNereidsPlan(queryId); + return generateHttpStreamNereidsPlan(queryId); } catch (NereidsException | ParseException e) { if (context.getMinidump() != null && context.getMinidump().toString(4) != null) { MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId())); @@ -3710,8 +3709,8 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti } if (e instanceof NereidsException) { LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); - throw ((NereidsException) e).getException(); } + throw e; } catch (Exception e) { throw new RuntimeException(e); } @@ -3727,7 +3726,6 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); } } - return httpStreamParams; } public SummaryProfile getSummaryProfile() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6057a20d9f389f..e4c81da4f0660b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2269,8 +2269,8 @@ private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, Conne LOG.warn("exec sql error", e); throw e; } catch (Throwable e) { - LOG.warn("exec sql error catch unknown result.", e); - throw new UserException("exec sql error catch unknown result." + e); + LOG.warn("exec sql: {} catch unknown result. ", originStmt, e); + throw new UserException("exec sql error catch unknown result. " + e.getMessage()); } return httpStreamParams; } diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index 8a7d5763604e4b..81ad780a663b46 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -63,6 +63,25 @@ suite("test_http_stream", "p0") { } } + // test error sql + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName1} (id, name) select + """ + time 10000 + file 'test_http_stream.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("http_stream result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("Nereids parse failed")) + } + } + qt_sql1 "select id, name from ${tableName1}" } finally { try_sql "DROP TABLE IF EXISTS ${tableName1}"