Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -331,6 +332,15 @@ public StmtType stmtType() {
return StmtType.INSERT;
}

@Override
public RedirectStatus toRedirectStatus() {
if (ConnectContext.get().isGroupCommit()) {
return RedirectStatus.NO_FORWARD;
} else {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}

private static class BuildInsertExecutorResult {
private final NereidsPlanner planner;
private final AbstractInsertExecutor executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.planner.GroupCommitPlanner;
Expand Down Expand Up @@ -63,6 +64,20 @@ public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table,
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

/**
* check if the sql can run in group commit mode
* @param logicalPlan plan of sql
*/
public static void analyzeGroupCommit(LogicalPlan logicalPlan) {
ConnectContext ctx = ConnectContext.get();
if (ctx.getSessionVariable().isEnableInsertGroupCommit() && logicalPlan instanceof InsertIntoTableCommand) {
LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery();
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery,
Optional.empty());
}
}

protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, LogicalPlan logicalQuery,
Optional<InsertCommandContext> insertCtx) {
// The flag is set to false before execute sql, if it is true, this is a http stream
Expand Down Expand Up @@ -91,8 +106,10 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Logi
conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext
&& ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
conditions.add(Pair.of(
() -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion,
() -> "not one row relation or union, class: " + tableSink.child().getClass().getName()));
() -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion
|| tableSink.child() instanceof LogicalInlineTable,
() -> "not one row relation or union or inline table, class: " + tableSink.child().getClass()
.getName()));
ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean()));
if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
Expand Down Expand Up @@ -723,6 +724,7 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
}
if (logicalPlan instanceof Command) {
if (logicalPlan instanceof Forward) {
OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan);
redirectStatus = ((Forward) logicalPlan).toRedirectStatus();
if (isForwardToMaster()) {
// before forward to master, we also need to set profileType in this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,8 +1361,12 @@ class Suite implements GroovyInterceptable {
}

String getServerPrepareJdbcUrl(String jdbcUrl, String database) {
return getServerPrepareJdbcUrl(jdbcUrl, database, true)
}

String getServerPrepareJdbcUrl(String jdbcUrl, String database, boolean useMasterIp) {
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
def sql_ip = getMasterIp()
def sql_ip = useMasterIp ? getMasterIp() : urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
def sql_port
if (urlWithoutSchema.indexOf("/") >= 0) {
// e.g: jdbc:mysql://locahost:8080/?a=b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ suite("insert_group_commit_with_exception") {

// prepare insert
def db = context.config.defaultDb + "_insert_p0"
String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db, false)

try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) {
Statement statement = connection.createStatement();
statement.execute("use ${db}");
statement.execute("set group_commit = eventual_consistency;");
statement.execute("set group_commit = sync_mode");
statement.execute("set enable_server_side_prepared_statement = true")
// without column
try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ suite("insert_group_commit_with_prepare_stmt") {
return serverStatementIds
}

def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb)
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb, false)
logger.info("url: " + url)

def result1 = connect(user, password, url + "&sessionVariables=group_commit=async_mode") {
Expand Down
Loading