diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/session/NlsFormatInterceptorTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/session/NlsFormatInterceptorTest.java index 13b3ff6fb9..5bfdf17fcf 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/session/NlsFormatInterceptorTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/session/NlsFormatInterceptorTest.java @@ -27,6 +27,7 @@ import com.oceanbase.odc.core.sql.execute.model.SqlExecuteStatus; import com.oceanbase.odc.core.sql.execute.model.SqlTuple; import com.oceanbase.odc.service.session.interceptor.NlsFormatInterceptor; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlExecuteResult; /** @@ -43,7 +44,7 @@ public void afterCompletion_mysql_notingSet() throws Exception { ConnectionSession session = getConnectionSession(ConnectType.OB_MYSQL); NlsFormatInterceptor interceptor = new NlsFormatInterceptor(); SqlExecuteResult r = getResponse("set session nls_date_format='DD-MON-RR'", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -52,7 +53,7 @@ public void afterCompletion_failedSqlResult_notingSet() throws Exception { ConnectionSession session = getConnectionSession(ConnectType.OB_ORACLE); NlsFormatInterceptor interceptor = new NlsFormatInterceptor(); SqlExecuteResult r = getResponse("set session nls_date_format='DD-MON-RR'", SqlExecuteStatus.FAILED); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -65,7 +66,7 @@ public void afterCompletion_multiSqls_notingSet() throws Exception { + "begin\n" + "dbms_output.put_line('aaaa');\n" + "end;", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -74,7 +75,7 @@ public void afterCompletion_noSetVarExists_notingSet() throws Exception { ConnectionSession session = getConnectionSession(ConnectType.OB_ORACLE); NlsFormatInterceptor interceptor = new NlsFormatInterceptor(); SqlExecuteResult r = getResponse("-- comment\nselect 123 from dual;", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -85,7 +86,7 @@ public void afterCompletion_commentWithSetVar_setSucceed() throws Exception { String expect = "DD-MON-RR"; SqlExecuteResult r = getResponse("-- comment\nset session nls_date_format='" + expect + "';", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertEquals(expect, ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -96,7 +97,7 @@ public void afterCompletion_multiCommentsWithSetVar_setSucceed() throws Exceptio String expect = "DD-MON-RR"; SqlExecuteResult r = getResponse("/*asdasdasd*/ set session nls_date_format='" + expect + "';", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertEquals(expect, ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -107,7 +108,7 @@ public void afterCompletion_nlsTimestampFormat_setSucceed() throws Exception { String expect = "DD-MON-RR"; SqlExecuteResult r = getResponse("/*asdasdasd*/ set session nls_timestamp_format='" + expect + "';", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertEquals(expect, ConnectionSessionUtil.getNlsTimestampFormat(session)); } @@ -118,7 +119,7 @@ public void afterCompletion_nlsTimestampTZFormat_setSucceed() throws Exception { String expect = "DD-MON-RR"; SqlExecuteResult r = getResponse("/*asdasdasd*/ set session nls_timestamp_tz_format='" + expect + "';", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertEquals(expect, ConnectionSessionUtil.getNlsTimestampTZFormat(session)); } @@ -129,7 +130,7 @@ public void afterCompletion_setGlobal_nothingSet() throws Exception { String expect = "DD-MON-RR"; SqlExecuteResult r = getResponse("/*asdasdasd*/ set global nls_timestamp_tz_format='" + expect + "';", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertNull(ConnectionSessionUtil.getNlsTimestampTZFormat(session)); } @@ -140,7 +141,7 @@ public void afterCompletion_alterSession_setSucceed() throws Exception { String expect = "DD-MON-RR"; SqlExecuteResult r = getResponse("/*asdsd*/ alter session \n\t\r set \"nls_date_format\"='" + expect + "';", SqlExecuteStatus.SUCCESS); - interceptor.afterCompletion(r, session, new HashMap<>()); + interceptor.afterCompletion(r, session, getContext()); Assert.assertEquals(expect, ConnectionSessionUtil.getNlsDateFormat(session)); } @@ -159,4 +160,8 @@ private ConnectionSession getConnectionSession(ConnectType type) { return session; } + private AsyncExecuteContext getContext() { + return new AsyncExecuteContext(null, new HashMap<>()); + } + } diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionConstants.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionConstants.java index a969eca792..4182dac5e1 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionConstants.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionConstants.java @@ -80,11 +80,13 @@ public class ConnectionSessionConstants { */ public static final String QUERY_CACHE_KEY = "QUERY_CACHE"; public static final String FUTURE_JDBC_RESULT_KEY = "FUTURE_JDBC_RESULT"; + public static final String ASYNC_EXECUTE_CONTEXT_KEY = "ASYNC_EXECUTE_CONTEXT"; /** * The connection_id current database session needs to be stored in the database session in the form * of attributes, this is the key */ public static final String CONNECTION_ID_KEY = "CONNECTION_ID"; + public static final String OB_PROXY_SESSID_KEY = "PROXY_SESSID"; /** * The {@link BinaryDataManager} session needs to be stored in the database session in the form of * attributes, this is the key diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionUtil.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionUtil.java index 18763dd959..3d6771c33d 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionUtil.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/session/ConnectionSessionUtil.java @@ -200,6 +200,52 @@ public static void removeFutureJdbc(@NonNull ConnectionSession connectionSes id2FutureResult.remove(requestId); } + @SuppressWarnings("all") + public static String setExecuteContext(@NonNull ConnectionSession connectionSession, + @NonNull Object context) { + Object value = connectionSession.getAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY); + Map id2ExecuteContext; + if (value instanceof Map) { + id2ExecuteContext = (Map) value; + } else { + id2ExecuteContext = new ConcurrentHashMap<>(); + connectionSession.setAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY, id2ExecuteContext); + } + String uuid = UUID.randomUUID().toString(); + id2ExecuteContext.putIfAbsent(uuid, context); + return uuid; + } + + @SuppressWarnings("all") + public static Object getExecuteContext(@NonNull ConnectionSession connectionSession, + @NonNull String requestId) { + Object value = connectionSession.getAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY); + Map id2ExecuteContext; + if (value instanceof Map) { + id2ExecuteContext = (Map) value; + } else { + throw new NotFoundException(ResourceType.ODC_ASYNC_SQL_RESULT, "session id", connectionSession.getId()); + } + Object context = id2ExecuteContext.get(requestId); + if (context == null) { + throw new NotFoundException(ResourceType.ODC_ASYNC_SQL_RESULT, "request id", requestId); + } + return context; + } + + @SuppressWarnings("all") + public static void removeExecuteContext(@NonNull ConnectionSession connectionSession, + @NonNull String requestId) { + Object value = connectionSession.getAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY); + Map id2ExecuteContext; + if (!(value instanceof Map)) { + throw new NullPointerException("Result not found by session id " + connectionSession.getId()); + } else { + id2ExecuteContext = (Map) value; + } + id2ExecuteContext.remove(requestId); + } + public static void setUserId(@NonNull ConnectionSession connectionSession, @NonNull Long userId) { connectionSession.setAttribute(ConnectionSessionConstants.USER_ID_KEY, userId); } @@ -535,6 +581,11 @@ public static String getConsoleConnectionId(@NonNull ConnectionSession connectio return (String) connectionSession.getAttribute(ConnectionSessionConstants.CONNECTION_ID_KEY); } + public static String getConsoleConnectionProxySessId(@NonNull ConnectionSession connectionSession) { + Object proxySessId = connectionSession.getAttribute(ConnectionSessionConstants.OB_PROXY_SESSID_KEY); + return proxySessId == null ? null : (String) proxySessId; + } + public static String getVersion(@NonNull ConnectionSession connectionSession) { return (String) connectionSession.getAttribute(ConnectionSessionConstants.OB_VERSION); } diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/util/OBUtils.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/util/OBUtils.java index fc794bf869..286024fb23 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/util/OBUtils.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/util/OBUtils.java @@ -21,6 +21,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; @@ -380,4 +382,78 @@ public static String queryDBMSOutput(@NonNull Connection connection, Integer max } } + public static String queryOBProxySessId(@NonNull Statement statement, @NonNull DialectType dialectType, + @NonNull String connectionId) throws SQLException { + String proxySessId = null; + String sql = "select proxy_sessid from " + + (dialectType.isMysql() ? "oceanbase" : "sys") + + ".v$ob_processlist where id = " + + connectionId; + try (ResultSet rs = statement.executeQuery(sql)) { + if (rs.next()) { + proxySessId = rs.getString(1); + } + } + return proxySessId; + } + + public static List querySessionIdsByProxySessId(@NonNull Statement statement, + @NonNull String proxySessId, ConnectType connectType) throws SQLException { + DialectType dialectType = connectType.getDialectType(); + SqlBuilder sqlBuilder = getBuilder(connectType) + .append("select id from ") + .append(dialectType.isMysql() ? "oceanbase" : "sys") + .append(".v$ob_processlist where proxy_sessid = ") + .append(proxySessId); + List ids = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(sqlBuilder.toString())) { + while (rs.next()) { + ids.add(rs.getString(1)); + } + } + return ids; + } + + /** + * OceanBase only supports ASH views in versions higher than 4.0. Therefore, this method is not + * applicable to earlier versions, please use sql_audit instead. + */ + public static String queryTraceIdFromASH(@NonNull Statement statement, + @NonNull List sessionIds, ConnectType connectType) throws SQLException { + DialectType dialectType = connectType.getDialectType(); + SqlBuilder sqlBuilder = getBuilder(connectType) + .append("select trace_id from ") + .append(dialectType.isMysql() ? "oceanbase" : "sys") + .append(".v$active_session_history where session_id in (") + .append(String.join(",", sessionIds)) + .append(")") + .append(dialectType.isMysql() ? " limit 1" : " and rownum=1"); + try (ResultSet rs = statement.executeQuery(sqlBuilder.toString())) { + if (!rs.next()) { + throw new SQLException("No result found in ASH."); + } + return rs.getString(1); + } + } + + /** + * OceanBase only supports ASH views in versions higher than 4.0. Therefore, this method is not + * applicable to earlier versions, please use sql_audit instead. + */ + public static String queryPlanIdByTraceId(@NonNull Statement statement, String traceId, ConnectType connectType) + throws SQLException { + DialectType dialectType = connectType.getDialectType(); + SqlBuilder sqlBuilder = getBuilder(connectType) + .append("select plan_id from ") + .append(dialectType.isMysql() ? "oceanbase" : "sys") + .append(".v$active_session_history where trace_id=") + .value(traceId); + try (ResultSet rs = statement.executeQuery(sqlBuilder.toString())) { + if (!rs.next()) { + throw new SQLException("No result found in ASH."); + } + return rs.getString(1); + } + } + } diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java index 3305741a9d..e633fb6b00 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java @@ -56,6 +56,7 @@ import com.oceanbase.odc.service.partitionplan.model.PartitionPlanPreviewReq; import com.oceanbase.odc.service.session.ConnectConsoleService; import com.oceanbase.odc.service.session.ConnectSessionService; +import com.oceanbase.odc.service.session.model.AsyncExecuteResultResp; import com.oceanbase.odc.service.session.model.BinaryContent; import com.oceanbase.odc.service.session.model.QueryTableOrViewDataReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; @@ -120,6 +121,13 @@ public SuccessResponse asyncSqlExecute(@PathVariable String return Responses.success(consoleService.execute(SidUtils.getSessionId(sessionId), req)); } + @RequestMapping(value = {"/sessions/{sessionId}/sqls/streamExecute"}, method = RequestMethod.POST) + @StatefulRoute(stateName = StateName.DB_SESSION, stateIdExpression = "#sessionId") + public SuccessResponse streamExecute(@PathVariable String sessionId, + @RequestBody SqlAsyncExecuteReq req) throws Exception { + return Responses.success(consoleService.streamExecute(SidUtils.getSessionId(sessionId), req, true)); + } + /** * 获取异步执行sql的结果 Todo 这里的sqlIds后续需要改成一个string类型的requestId,异步api请求需要有超时机制 * @@ -134,6 +142,13 @@ public SuccessResponse> getAsyncSqlExecute(@PathVariable return Responses.success(consoleService.getAsyncResult(SidUtils.getSessionId(sessionId), requestId, null)); } + @RequestMapping(value = "/sessions/{sessionId}/sqls/getMoreResults", method = RequestMethod.GET) + @StatefulRoute(stateName = StateName.DB_SESSION, stateIdExpression = "#sessionId") + public SuccessResponse getMoreResults(@PathVariable String sessionId, + @RequestParam String requestId) { + return Responses.success(consoleService.getMoreResults(SidUtils.getSessionId(sessionId), requestId)); + } + /** * 对 sql 脚本的内容进行静态检查 * diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/util/ConnectionInfoUtil.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/util/ConnectionInfoUtil.java index 48d59042cb..9764edfecc 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/util/ConnectionInfoUtil.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/util/ConnectionInfoUtil.java @@ -23,13 +23,16 @@ import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.StatementCallback; +import com.oceanbase.odc.common.util.VersionUtils; import com.oceanbase.odc.core.datasource.DataSourceFactory; import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.core.session.ConnectionSessionConstants; +import com.oceanbase.odc.core.session.ConnectionSessionUtil; import com.oceanbase.odc.core.shared.Verify; import com.oceanbase.odc.core.shared.constant.DialectType; import com.oceanbase.odc.core.sql.execute.GeneralSyncJdbcExecutor; import com.oceanbase.odc.core.sql.execute.SyncJdbcExecutor; +import com.oceanbase.odc.core.sql.util.OBUtils; import com.oceanbase.odc.plugin.connect.api.InformationExtensionPoint; import com.oceanbase.odc.service.plugin.ConnectionPluginUtil; @@ -61,6 +64,12 @@ public static void initConnectionId(@NonNull Statement statement, @NonNull Conne String sessionId = queryConnectionId(statement, connectionSession.getDialectType()); Verify.notNull(sessionId, "SessionId"); connectionSession.setAttribute(ConnectionSessionConstants.CONNECTION_ID_KEY, sessionId); + if (connectionSession.getDialectType().isOceanbase() && VersionUtils.isGreaterThanOrEqualsTo( + ConnectionSessionUtil.getVersion(connectionSession), "4.2")) { + String proxySessId = + OBUtils.queryOBProxySessId(statement, connectionSession.getDialectType(), sessionId); + connectionSession.setAttribute(ConnectionSessionConstants.OB_PROXY_SESSID_KEY, proxySessId); + } } catch (Exception exception) { log.warn("Failed to get database session ID, session={}", connectionSession, exception); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingInterceptor.java index 2cc642cf6f..6bc34cea53 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingInterceptor.java @@ -20,7 +20,6 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -40,6 +39,7 @@ import com.oceanbase.odc.service.datasecurity.util.DataMaskingUtil; import com.oceanbase.odc.service.db.browser.DBSchemaAccessors; import com.oceanbase.odc.service.session.interceptor.BaseTimeConsumingInterceptor; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.DBResultSetMetaData; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; @@ -64,14 +64,14 @@ public class DataMaskingInterceptor extends BaseTimeConsumingInterceptor { @Override public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) { return true; } @Override @SuppressWarnings("all") public void doAfterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session, - @NonNull Map context) throws Exception { + @NonNull AsyncExecuteContext context) throws Exception { // TODO: May intercept sensitive column operation (WHERE / ORDER BY / HAVING) if (!maskingService.isMaskingEnabled()) { return; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ExternalSqlInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ExternalSqlInterceptor.java index 37976a3c62..84602fd2c0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ExternalSqlInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ExternalSqlInterceptor.java @@ -16,7 +16,6 @@ package com.oceanbase.odc.service.integration; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; @@ -48,6 +47,7 @@ import com.oceanbase.odc.service.regulation.ruleset.model.Rule.RuleViolation; import com.oceanbase.odc.service.regulation.ruleset.model.SqlConsoleRules; import com.oceanbase.odc.service.session.interceptor.BaseTimeConsumingInterceptor; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -81,7 +81,7 @@ public class ExternalSqlInterceptor extends BaseTimeConsumingInterceptor { @Override public boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) { Long ruleSetId = ConnectionSessionUtil.getRuleSetId(session); if (Objects.isNull(ruleSetId) || isIndividualTeam()) { return true; @@ -147,7 +147,7 @@ public boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyn @Override public void afterCompletion(@NonNull SqlExecuteResult response, - @NonNull ConnectionSession session, @NonNull Map context) {} + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) {} @Override diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java index 7edfdfdebf..e55505d00b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java @@ -88,11 +88,14 @@ import com.oceanbase.odc.service.db.session.KillSessionResult; import com.oceanbase.odc.service.dml.ValueEncodeType; import com.oceanbase.odc.service.feature.AllFeatures; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.permission.database.model.DatabasePermissionType; import com.oceanbase.odc.service.permission.database.model.UnauthorizedDatabase; import com.oceanbase.odc.service.session.interceptor.SqlCheckInterceptor; import com.oceanbase.odc.service.session.interceptor.SqlConsoleInterceptor; import com.oceanbase.odc.service.session.interceptor.SqlExecuteInterceptorService; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; +import com.oceanbase.odc.service.session.model.AsyncExecuteResultResp; import com.oceanbase.odc.service.session.model.BinaryContent; import com.oceanbase.odc.service.session.model.OdcResultSetMetaData.OdcTable; import com.oceanbase.odc.service.session.model.QueryTableOrViewDataReq; @@ -124,7 +127,8 @@ public class ConnectConsoleService { public static final int DEFAULT_GET_RESULT_TIMEOUT_SECONDS = 3; - private static final String SHOW_TABLE_COLUMN_INFO = "SHOW_TABLE_COLUMN_INFO"; + public static final String SHOW_TABLE_COLUMN_INFO = "SHOW_TABLE_COLUMN_INFO"; + @Autowired private ConnectSessionService sessionService; @Autowired @@ -139,6 +143,8 @@ public class ConnectConsoleService { private ConnectionService connectionService; @Autowired private UserConfigFacade userConfigFacade; + @Autowired + private AuthenticationFacade authenticationFacade; public SqlExecuteResult queryTableOrViewData(@NotNull String sessionId, @NotNull @Valid QueryTableOrViewDataReq req) throws Exception { @@ -263,11 +269,12 @@ public SqlAsyncExecuteResp execute(@NotNull String sessionId, context.put(SHOW_TABLE_COLUMN_INFO, request.getShowTableColumnInfo()); context.put(SqlCheckInterceptor.NEED_SQL_CHECK_KEY, needSqlRuleCheck); context.put(SqlConsoleInterceptor.NEED_SQL_CONSOLE_CHECK, needSqlRuleCheck); + AsyncExecuteContext executeContext = new AsyncExecuteContext(sqlTuples, context); List stages = sqlTuples.stream() .map(s -> s.getSqlWatch().start(SqlExecuteStages.SQL_PRE_CHECK)) .collect(Collectors.toList()); try { - if (!sqlInterceptService.preHandle(request, response, connectionSession, context)) { + if (!sqlInterceptService.preHandle(request, response, connectionSession, executeContext)) { return response; } } finally { @@ -306,6 +313,93 @@ public SqlAsyncExecuteResp execute(@NotNull String sessionId, return response; } + public SqlAsyncExecuteResp streamExecute(@NotNull String sessionId, + @NotNull @Valid SqlAsyncExecuteReq request, boolean needSqlRuleCheck) throws Exception { + ConnectionSession connectionSession = sessionService.nullSafeGet(sessionId, true); + + long maxSqlLength = sessionProperties.getMaxSqlLength(); + if (maxSqlLength > 0) { + PreConditions.lessThanOrEqualTo("sqlLength", LimitMetric.SQL_LENGTH, + StringUtils.length(request.getSql()), maxSqlLength); + } + SqlAsyncExecuteResp result = filterKillSession(connectionSession, request); + if (result != null) { + return result; + } + List sqls = request.ifSplitSqls() + ? SqlUtils.splitWithOffset(connectionSession, request.getSql(), + sessionProperties.isOracleRemoveCommentPrefix()) + : Collections.singletonList(new OffsetString(0, request.getSql())); + if (sqls.size() == 0) { + /** + * if a sql only contains delimiter setting(eg. delimiter $$), code will do this + */ + SqlTuple sqlTuple = SqlTuple.newTuple(request.getSql()); + String id = ConnectionSessionUtil.setFutureJdbc(connectionSession, + FutureResult.successResultList(JdbcGeneralResult.successResult(sqlTuple)), null); + return SqlAsyncExecuteResp.newSqlAsyncExecuteResp(id, Collections.singletonList(sqlTuple)); + } + + long maxSqlStatementCount = sessionProperties.getMaxSqlStatementCount(); + if (maxSqlStatementCount > 0) { + PreConditions.lessThanOrEqualTo("sqlStatementCount", + LimitMetric.SQL_STATEMENT_COUNT, sqls.size(), maxSqlStatementCount); + } + + List sqlTuples = generateSqlTuple(sqls, connectionSession, request); + SqlAsyncExecuteResp response = SqlAsyncExecuteResp.newSqlAsyncExecuteResp(sqlTuples); + Map context = new HashMap<>(); + context.put(SHOW_TABLE_COLUMN_INFO, request.getShowTableColumnInfo()); + context.put(SqlCheckInterceptor.NEED_SQL_CHECK_KEY, needSqlRuleCheck); + context.put(SqlConsoleInterceptor.NEED_SQL_CONSOLE_CHECK, needSqlRuleCheck); + AsyncExecuteContext executeContext = new AsyncExecuteContext(sqlTuples, context); + List stages = sqlTuples.stream() + .map(s -> s.getSqlWatch().start(SqlExecuteStages.SQL_PRE_CHECK)) + .collect(Collectors.toList()); + try { + if (!sqlInterceptService.preHandle(request, response, connectionSession, executeContext)) { + return response; + } + } finally { + for (TraceStage stage : stages) { + try { + stage.close(); + } catch (Exception e) { + // eat exception + } + } + } + Integer queryLimit = checkQueryLimit(request.getQueryLimit()); + boolean continueExecutionOnError = + Objects.nonNull(request.getContinueExecutionOnError()) ? request.getContinueExecutionOnError() + : userConfigFacade.isContinueExecutionOnError(); + boolean stopOnError = !continueExecutionOnError; + OdcStatementCallBack statementCallBack = new OdcStatementCallBack(sqlTuples, connectionSession, + request.getAutoCommit(), queryLimit, stopOnError, executeContext); + + statementCallBack.setDbmsoutputMaxRows(sessionProperties.getDbmsOutputMaxRows()); + + boolean fullLinkTraceEnabled = + Objects.nonNull(request.getFullLinkTraceEnabled()) ? request.getFullLinkTraceEnabled() + : userConfigFacade.isFullLinkTraceEnabled(); + statementCallBack.setUseFullLinkTrace(fullLinkTraceEnabled); + + statementCallBack.setFullLinkTraceTimeout(sessionProperties.getFullLinkTraceTimeoutSeconds()); + statementCallBack.setMaxCachedSize(sessionProperties.getResultSetMaxCachedSize()); + statementCallBack.setMaxCachedLines(sessionProperties.getResultSetMaxCachedLines()); + statementCallBack.setLocale(LocaleContextHolder.getLocale()); + if (connectionSession.getDialectType().isOceanbase() && sqlTuples.size() <= 10) { + statementCallBack.getListeners().add(new OBExecutionListener(connectionSession)); + } + + Future> futureResult = connectionSession.getAsyncJdbcExecutor( + ConnectionSessionConstants.CONSOLE_DS_KEY).execute(statementCallBack); + executeContext.setFuture(futureResult); + String id = ConnectionSessionUtil.setExecuteContext(connectionSession, executeContext); + response.setRequestId(id); + return response; + } + public List getAsyncResult(@NotNull String sessionId, @NotNull String requestId) { return getAsyncResult(sessionId, requestId, DEFAULT_GET_RESULT_TIMEOUT_SECONDS); } @@ -321,10 +415,10 @@ public List getAsyncResult(@NotNull String sessionId, String r Map context = ConnectionSessionUtil.getFutureJdbcContext(connectionSession, requestId); ConnectionSessionUtil.removeFutureJdbc(connectionSession, requestId); return resultList.stream().map(jdbcGeneralResult -> { - Map cxt = context == null ? new HashMap<>() : context; - SqlExecuteResult result = generateResult(connectionSession, jdbcGeneralResult, cxt); + Map ctx = context == null ? new HashMap<>() : context; + SqlExecuteResult result = generateResult(connectionSession, jdbcGeneralResult, ctx); try (TraceStage stage = result.getSqlTuple().getSqlWatch().start(SqlExecuteStages.SQL_AFTER_CHECK)) { - sqlInterceptService.afterCompletion(result, connectionSession, cxt); + sqlInterceptService.afterCompletion(result, connectionSession, new AsyncExecuteContext(null, ctx)); } catch (Exception e) { throw new IllegalStateException(e); } @@ -341,6 +435,34 @@ public List getAsyncResult(@NotNull String sessionId, String r } } + public AsyncExecuteResultResp getMoreResults(@NotNull String sessionId, String requestId) { + PreConditions.validArgumentState(Objects.nonNull(requestId), ErrorCodes.SqlRegulationRuleBlocked, null, null); + ConnectionSession connectionSession = sessionService.nullSafeGet(sessionId); + AsyncExecuteContext context = + (AsyncExecuteContext) ConnectionSessionUtil.getExecuteContext(connectionSession, requestId); + boolean shouldRemoveContext = context.isFinished(); + try { + List resultList = context.getMoreSqlExecutionResults(); + List results = resultList.stream().map(jdbcGeneralResult -> { + SqlExecuteResult result = generateResult(connectionSession, jdbcGeneralResult, context.getContextMap()); + try (TraceStage stage = result.getSqlTuple().getSqlWatch().start(SqlExecuteStages.SQL_AFTER_CHECK)) { + sqlInterceptService.afterCompletion(result, connectionSession, context); + } catch (Exception e) { + throw new IllegalStateException(e); + } + return result; + }).collect(Collectors.toList()); + return new AsyncExecuteResultResp(shouldRemoveContext, context, results); + } catch (Exception e) { + shouldRemoveContext = true; + throw e; + } finally { + if (shouldRemoveContext) { + ConnectionSessionUtil.removeExecuteContext(connectionSession, requestId); + } + } + } + public BinaryContent getBinaryContent(@NotNull String sessionId, @NotNull String sqlId, @NotNull Long rowNum, @NotNull Integer colNum, @NotNull Long skip, @NotNull Integer len, @NotNull ValueEncodeType format) throws IOException { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OBExecutionListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OBExecutionListener.java new file mode 100644 index 0000000000..9c2792ad1f --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OBExecutionListener.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.session; + +import static com.oceanbase.odc.core.session.ConnectionSessionConstants.BACKEND_DS_KEY; +import static com.oceanbase.odc.core.session.ConnectionSessionConstants.CONSOLE_DS_KEY; + +import java.util.Collections; +import java.util.List; + +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.jdbc.core.StatementCallback; + +import com.oceanbase.odc.common.util.StringUtils; +import com.oceanbase.odc.common.util.VersionUtils; +import com.oceanbase.odc.core.session.ConnectionSession; +import com.oceanbase.odc.core.session.ConnectionSessionUtil; +import com.oceanbase.odc.core.sql.execute.model.JdbcGeneralResult; +import com.oceanbase.odc.core.sql.execute.model.SqlTuple; +import com.oceanbase.odc.core.sql.util.OBUtils; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; + +/** + * @author: liuyizhuo.lyz + * @date: 2024/4/23 + */ +public class OBExecutionListener implements SqlExecutionListener { + private static final Long DEFAULT_QUERY_TRACE_ID_WAIT_MILLIS = 1100L; + private static final String ENABLE_QUERY_PROFILE_VERSION = "4.2"; + + private final ConnectionSession session; + private final List sessionIds; + + public OBExecutionListener(ConnectionSession session) { + this.session = session; + sessionIds = getSessionIds(); + } + + @Override + public void onExecutionStart(SqlTuple sqlTuple, AsyncExecuteContext context) {} + + @Override + public void onExecutionEnd(SqlTuple sqlTuple, List results, AsyncExecuteContext context) {} + + @Override + public void onExecutionCancelled(SqlTuple sqlTuple, List results, AsyncExecuteContext context) {} + + public void onExecutionStartAfter(SqlTuple sqlTuple, AsyncExecuteContext context) { + if (CollectionUtils.isEmpty(sessionIds)) { + return; + } + String traceId = session.getSyncJdbcExecutor(BACKEND_DS_KEY).execute((StatementCallback) stmt -> OBUtils + .queryTraceIdFromASH(stmt, sessionIds, session.getConnectType())); + if (traceId != null) { + context.setCurrentExecutingSqlTraceId(traceId); + } + } + + @Override + public Long getOnExecutionStartAfterMillis() { + return DEFAULT_QUERY_TRACE_ID_WAIT_MILLIS; + } + + private List getSessionIds() { + if (VersionUtils.isLessThan(ConnectionSessionUtil.getVersion(session), ENABLE_QUERY_PROFILE_VERSION)) { + return Collections.emptyList(); + } + String proxySessId = ConnectionSessionUtil.getConsoleConnectionProxySessId(session); + if (StringUtils.isEmpty(proxySessId)) { + return Collections.singletonList(ConnectionSessionUtil.getConsoleConnectionId(session)); + } + return session.getSyncJdbcExecutor(CONSOLE_DS_KEY).execute((StatementCallback>) stmt -> OBUtils + .querySessionIdsByProxySessId(stmt, proxySessId, session.getConnectType())); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OdcStatementCallBack.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OdcStatementCallBack.java index 503c763a7e..7c8d148088 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OdcStatementCallBack.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/OdcStatementCallBack.java @@ -30,11 +30,16 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; import java.util.regex.Matcher; @@ -78,6 +83,7 @@ import com.oceanbase.odc.core.sql.util.FullLinkTraceUtil; import com.oceanbase.odc.core.sql.util.OBUtils; import com.oceanbase.odc.service.plugin.ConnectionPluginUtil; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import lombok.Getter; import lombok.NonNull; @@ -114,6 +120,9 @@ public class OdcStatementCallBack implements StatementCallback listeners = new ArrayList<>(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Setter private boolean useFullLinkTrace = false; @Setter @@ -134,8 +143,14 @@ public OdcStatementCallBack(@NonNull List sqls, @NonNull ConnectionSes this(sqls, connectionSession, autoCommit, queryLimit, true); } + public OdcStatementCallBack(@NonNull List sqls, @NonNull ConnectionSession connectionSession, Boolean autoCommit, Integer queryLimit, boolean stopWhenError) { + this(sqls, connectionSession, autoCommit, queryLimit, stopWhenError, null); + } + + public OdcStatementCallBack(@NonNull List sqls, @NonNull ConnectionSession connectionSession, + Boolean autoCommit, Integer queryLimit, boolean stopWhenError, AsyncExecuteContext context) { this.sqls = sqls; this.autoCommit = autoCommit == null ? connectionSession.getDefaultAutoCommit() : autoCommit; this.connectType = connectionSession.getConnectType(); @@ -150,6 +165,7 @@ public OdcStatementCallBack(@NonNull List sqls, @NonNull ConnectionSes this.binaryDataManager = ConnectionSessionUtil.getBinaryDataManager(connectionSession); ConnectionSessionUtil.setConsoleSessionKillQueryFlag(connectionSession, false); Validate.notNull(this.binaryDataManager, "BinaryDataManager can not be null"); + this.context = context; } @Override @@ -162,18 +178,28 @@ public List doInStatement(Statement statement) throws SQLExce return this.sqls.stream().map(sqlTuple -> { JdbcGeneralResult result = JdbcGeneralResult.canceledResult(sqlTuple); result.setConnectionReset(true); + onExecutionCancelled(sqlTuple, Collections.singletonList(result)); return result; }).collect(Collectors.toList()); } - List returnVal = new LinkedList<>(); boolean currentAutoCommit = statement.getConnection().getAutoCommit(); + List returnVal = new LinkedList<>(); try { applyStatementSettings(statement); // 对于修改表数据DML,如果是自动提交,为了保证原子性,在执行过程设置为手动,执行完成后再进行reset if (this.autoCommit ^ currentAutoCommit) { statement.getConnection().setAutoCommit(this.autoCommit); } + Future handle = null; for (SqlTuple sqlTuple : this.sqls) { + if (handle != null) { + try { + handle.get(); + } catch (Exception e) { + // eat exception + } + } + onExecutionStart(sqlTuple); try { applyConnectionSettings(statement); } catch (Exception e) { @@ -184,15 +210,16 @@ public List doInStatement(Statement statement) throws SQLExce if (Thread.currentThread().isInterrupted() || ConnectionSessionUtil.isConsoleSessionKillQuery(connectionSession)) { executeResults = Collections.singletonList(JdbcGeneralResult.canceledResult(sqlTuple)); + onExecutionCancelled(sqlTuple, executeResults); } else { - try { - executeResults = doExecuteSql(statement, sqlTuple); - } catch (Exception exp) { - executeResults = Collections.singletonList(JdbcGeneralResult.failedResult(sqlTuple, exp)); - } + CountDownLatch latch = new CountDownLatch(1); + handle = executor.submit(() -> onExecutionStartAfterMillis(sqlTuple, latch)); + executeResults = doExecuteSql(statement, sqlTuple, latch); + onExecutionEnd(sqlTuple, executeResults); } } else { executeResults = Collections.singletonList(JdbcGeneralResult.canceledResult(sqlTuple)); + onExecutionCancelled(sqlTuple, executeResults); } returnVal.addAll(executeResults); } @@ -201,7 +228,6 @@ public List doInStatement(Statement statement) throws SQLExce if (failed.isPresent()) { throw failed.get().getThrown(); } - } catch (Exception e) { try { ConnectionSessionUtil.logSocketInfo(statement.getConnection(), "console error"); @@ -218,6 +244,7 @@ public List doInStatement(Statement statement) throws SQLExce log.info("Clear dbms_output cache, dbmsInfo={}", dbmsInfo); } } + executor.shutdownNow(); } return returnVal; } @@ -310,52 +337,65 @@ private List consumeStatement(Statement statement, SqlTuple s return executeResults; } - protected List doExecuteSql(Statement statement, SqlTuple sqlTuple) throws Exception { - String sql = sqlTuple.getExecutedSql(); - if (!ifFunctionCallExists(sql)) { - // use text protocal - try (TraceStage stage = sqlTuple.getSqlWatch().start(SqlExecuteStages.EXECUTE)) { + protected List doExecuteSql(Statement statement, SqlTuple sqlTuple, CountDownLatch latch) { + try { + String sql = sqlTuple.getExecutedSql(); + if (!ifFunctionCallExists(sql)) { + // use text protocal + try (TraceStage stage = sqlTuple.getSqlWatch().start(SqlExecuteStages.EXECUTE)) { + boolean isResultSet; + try { + isResultSet = statement.execute(sql); + } catch (Exception e) { + return handleException(e, statement, sqlTuple); + } + latch.countDown(); + return consumeStatement(statement, sqlTuple, isResultSet); + } + } + // use ps protocal + String preparedSql = OBJECT_VALUE_PATTERN.matcher(sql).replaceAll("?"); + log.info( + "Load_file call is detected in sql, use ps protocol to rewrite " + + "the original sql, originalSql={}, modifiedSql={}", + sql, preparedSql); + List definitions = retrieveFunctionCalls(sql); + log.info("There is a function call in sql, functions={}", definitions); + try (PreparedStatement preparedStatement = statement.getConnection().prepareStatement(preparedSql); + TraceStage stage = sqlTuple.getSqlWatch().start(SqlExecuteStages.EXECUTE)) { + for (int i = 0; i < definitions.size(); i++) { + FunctionDefinition definition = definitions.get(i); + if ("load_file".equalsIgnoreCase(definition.getFunctionName())) { + // load binary data + String fileName = retrieveFileNameFromParameters(definition); + File file = nullSafeFindFileByName(fileName); + preparedStatement.setBinaryStream(i + 1, new FileInputStream(file)); + } else if ("load_clob_file".equalsIgnoreCase(definition.getFunctionName())) { + // load clob data + String fileName = retrieveFileNameFromParameters(definition); + File file = nullSafeFindFileByName(fileName); + preparedStatement.setClob(i + 1, new FileReader(file)); + } else { + throw new NotImplementedException("Unsupport function call " + definition.getFunctionName()); + } + } boolean isResultSet; try { - isResultSet = statement.execute(sql); + isResultSet = preparedStatement.execute(); } catch (Exception e) { return handleException(e, statement, sqlTuple); } + latch.countDown(); return consumeStatement(statement, sqlTuple, isResultSet); } - } - // use ps protocal - String preparedSql = OBJECT_VALUE_PATTERN.matcher(sql).replaceAll("?"); - log.info( - "Load_file call is detected in sql, use ps protocol to rewrite " - + "the original sql, originalSql={}, modifiedSql={}", - sql, preparedSql); - List definitions = retrieveFunctionCalls(sql); - log.info("There is a function call in sql, functions={}", definitions); - try (PreparedStatement preparedStatement = statement.getConnection().prepareStatement(preparedSql); - TraceStage stage = sqlTuple.getSqlWatch().start(SqlExecuteStages.EXECUTE)) { - for (int i = 0; i < definitions.size(); i++) { - FunctionDefinition definition = definitions.get(i); - if ("load_file".equalsIgnoreCase(definition.getFunctionName())) { - // load binary data - String fileName = retrieveFileNameFromParameters(definition); - File file = nullSafeFindFileByName(fileName); - preparedStatement.setBinaryStream(i + 1, new FileInputStream(file)); - } else if ("load_clob_file".equalsIgnoreCase(definition.getFunctionName())) { - // load clob data - String fileName = retrieveFileNameFromParameters(definition); - File file = nullSafeFindFileByName(fileName); - preparedStatement.setClob(i + 1, new FileReader(file)); - } else { - throw new NotImplementedException("Unsupport function call " + definition.getFunctionName()); - } - } - boolean isResult = preparedStatement.execute(); - return consumeStatement(preparedStatement, sqlTuple, isResult); + } catch (Exception e) { + return Collections.singletonList(JdbcGeneralResult.failedResult(sqlTuple, e)); + } finally { + latch.countDown(); } } - private List handleException(Exception exception, Statement statement, SqlTuple sqlTuple) { + protected List handleException(Exception exception, Statement statement, SqlTuple sqlTuple) { if (exception instanceof SQLTransientConnectionException && ((SQLTransientConnectionException) exception).getErrorCode() == 1094) { // ERROR 1094 (HY000) : Unknown thread id: %lu when kill a not exists session @@ -416,7 +456,7 @@ private SqlExecTime getTraceIdAndAndSetStage(Statement statement, TraceWatch tra try { StopWatch stopWatch = StopWatch.createStarted(); String version = ConnectionSessionUtil.getVersion(connectionSession); - SqlExecTime executeDetails = new SqlExecTime();; + SqlExecTime executeDetails = new SqlExecTime(); if (useFullLinkTrace && VersionUtils.isGreaterThanOrEqualsTo(version, "4.1") && connectionSession.getDialectType().isOceanbase()) { try { @@ -527,6 +567,73 @@ private void rollback(Connection connection) { } } + private void onExecutionStart(SqlTuple sqlTuple) { + if (context != null) { + context.setCurrentExecutingSql(sqlTuple.getExecutedSql()); + context.incrementTotalExecutedSqlCount(); + context.setCurrentExecutingSqlTraceId(null); + } + listeners.forEach(listener -> { + try { + listener.onExecutionStart(sqlTuple, context); + } catch (Exception e) { + log.warn("An error occurred in listener {}.", listener.getClass(), e); + } + }); + } + + private void onExecutionCancelled(SqlTuple sqlTuple, List results) { + if (context != null) { + context.addSqlExecutionResults(results); + } + listeners.forEach(listener -> { + try { + listener.onExecutionCancelled(sqlTuple, results, context); + } catch (Exception e) { + log.warn("An error occurred in listener {}.", listener.getClass(), e); + } + }); + } + + private void onExecutionEnd(SqlTuple sqlTuple, List results) { + if (context != null) { + context.addSqlExecutionResults(results); + } + listeners.forEach(listener -> { + try { + listener.onExecutionEnd(sqlTuple, results, context); + } catch (Exception e) { + log.warn("An error occurred in listener {}.", listener.getClass(), e); + } + }); + } + + private Void onExecutionStartAfterMillis(SqlTuple sqlTuple, CountDownLatch latch) { + long startTs = System.currentTimeMillis(); + List sortedListeners = listeners.stream() + .filter(listener -> listener.getOnExecutionStartAfterMillis() != null + && listener.getOnExecutionStartAfterMillis() > 0) + .sorted(Comparator + .comparingLong(SqlExecutionListener::getOnExecutionStartAfterMillis)) + .collect(Collectors.toList()); + for (SqlExecutionListener listener : sortedListeners) { + long waitTs = System.currentTimeMillis() - startTs; + Long expectedTs = listener.getOnExecutionStartAfterMillis(); + try { + if (!latch.await(expectedTs - waitTs, TimeUnit.MILLISECONDS)) { + listener.onExecutionStartAfter(sqlTuple, context); + } else { + break; + } + } catch (InterruptedException e) { + return null; + } catch (Exception e) { + log.warn("An error occurred in listener {}.", listener.getClass(), e); + } + } + return null; + } + @Getter @ToString static class FunctionDefinition { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/SqlExecutionListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/SqlExecutionListener.java new file mode 100644 index 0000000000..1143412a21 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/SqlExecutionListener.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.session; + +import java.util.List; + +import com.oceanbase.odc.core.sql.execute.model.JdbcGeneralResult; +import com.oceanbase.odc.core.sql.execute.model.SqlTuple; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; + +/** + * @author: liuyizhuo.lyz + * @date: 2024/4/23 + */ +public interface SqlExecutionListener { + + void onExecutionStart(SqlTuple sqlTuple, AsyncExecuteContext context); + + void onExecutionEnd(SqlTuple sqlTuple, List results, AsyncExecuteContext context); + + void onExecutionCancelled(SqlTuple sqlTuple, List results, AsyncExecuteContext context); + + void onExecutionStartAfter(SqlTuple sqlTuple, AsyncExecuteContext context); + + Long getOnExecutionStartAfterMillis(); + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/BaseTimeConsumingInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/BaseTimeConsumingInterceptor.java index 5ba5bd2bf2..77475f26a4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/BaseTimeConsumingInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/BaseTimeConsumingInterceptor.java @@ -17,11 +17,11 @@ package com.oceanbase.odc.service.session.interceptor; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import com.oceanbase.odc.common.util.TraceStage; import com.oceanbase.odc.core.session.ConnectionSession; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -32,7 +32,7 @@ public abstract class BaseTimeConsumingInterceptor implements SqlExecuteIntercep @Override public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception { List stageList = response.getSqls().stream() .map(v -> v.getSqlTuple().getSqlWatch().start(getExecuteStageName())) .collect(Collectors.toList()); @@ -51,19 +51,19 @@ public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncE @Override public void afterCompletion(@NonNull SqlExecuteResult response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception { try (TraceStage stage = response.getSqlTuple().getSqlWatch().start(getExecuteStageName())) { doAfterCompletion(response, session, context); } } protected boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception { return true; } protected void doAfterCompletion(@NonNull SqlExecuteResult response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception {} + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception {} protected abstract String getExecuteStageName(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/DatabasePermissionInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/DatabasePermissionInterceptor.java index 2a3fa79b3f..ba1696c133 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/DatabasePermissionInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/DatabasePermissionInterceptor.java @@ -37,6 +37,7 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.permission.database.model.DatabasePermissionType; import com.oceanbase.odc.service.permission.database.model.UnauthorizedDatabase; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -69,7 +70,7 @@ public int getOrder() { @Override public boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception { if (authenticationFacade.currentUser().getOrganizationType() == OrganizationType.INDIVIDUAL) { return true; } @@ -100,7 +101,7 @@ public boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyn @Override public void afterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session, - @NonNull Map context) {} + @NonNull AsyncExecuteContext context) {} @Override protected String getExecuteStageName() { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/NlsFormatInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/NlsFormatInterceptor.java index 1a2e79d350..53e74ef523 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/NlsFormatInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/NlsFormatInterceptor.java @@ -17,7 +17,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -36,6 +35,7 @@ import com.oceanbase.odc.core.sql.parser.AbstractSyntaxTreeFactories; import com.oceanbase.odc.core.sql.split.OffsetString; import com.oceanbase.odc.core.sql.split.SqlCommentProcessor; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -67,13 +67,13 @@ public class NlsFormatInterceptor extends BaseTimeConsumingInterceptor { @Override public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) { return true; } @Override public void doAfterCompletion(@NonNull SqlExecuteResult response, - @NonNull ConnectionSession session, @NonNull Map context) { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) { DialectType dialect = session.getDialectType(); if (response.getStatus() != SqlExecuteStatus.SUCCESS || dialect != DialectType.OB_ORACLE) { return; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlCheckInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlCheckInterceptor.java index e0d1403db4..19b8585156 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlCheckInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlCheckInterceptor.java @@ -37,6 +37,7 @@ import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.regulation.ruleset.RuleService; import com.oceanbase.odc.service.regulation.ruleset.model.Rule; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -74,11 +75,12 @@ public class SqlCheckInterceptor extends BaseTimeConsumingInterceptor { @Override public boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) { - boolean sqlCheckIntercepted = handle(request, response, session, context); - context.put(SQL_CHECK_INTERCEPTED, sqlCheckIntercepted); - if (Objects.nonNull(context.get(SqlConsoleInterceptor.SQL_CONSOLE_INTERCEPTED))) { - return sqlCheckIntercepted && (Boolean) context.get(SqlConsoleInterceptor.SQL_CONSOLE_INTERCEPTED); + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) { + Map ctx = context.getContextMap(); + boolean sqlCheckIntercepted = handle(request, response, session, ctx); + ctx.put(SQL_CHECK_INTERCEPTED, sqlCheckIntercepted); + if (Objects.nonNull(ctx.get(SqlConsoleInterceptor.SQL_CONSOLE_INTERCEPTED))) { + return sqlCheckIntercepted && (Boolean) ctx.get(SqlConsoleInterceptor.SQL_CONSOLE_INTERCEPTED); } else { return true; } @@ -132,11 +134,12 @@ protected String getExecuteStageName() { @Override @SuppressWarnings("all") public void afterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session, - @NonNull Map context) throws Exception { - if (!context.containsKey(SQL_CHECK_RESULT_KEY)) { + @NonNull AsyncExecuteContext context) throws Exception { + Map ctx = context.getContextMap(); + if (!ctx.containsKey(SQL_CHECK_RESULT_KEY)) { return; } - Map> map = (Map>) context.get(SQL_CHECK_RESULT_KEY); + Map> map = (Map>) ctx.get(SQL_CHECK_RESULT_KEY); List results = map.get(response.getSqlTuple().getOffset()); if (CollectionUtils.isEmpty(results)) { return; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlConsoleInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlConsoleInterceptor.java index 7610a3f1ff..35de0eff18 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlConsoleInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlConsoleInterceptor.java @@ -42,6 +42,7 @@ import com.oceanbase.odc.service.regulation.ruleset.model.Rule; import com.oceanbase.odc.service.regulation.ruleset.model.Rule.RuleViolation; import com.oceanbase.odc.service.regulation.ruleset.model.SqlConsoleRules; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -74,11 +75,12 @@ public class SqlConsoleInterceptor extends BaseTimeConsumingInterceptor { @Override public boolean doPreHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) { - boolean sqlConsoleIntercepted = handle(request, response, session, context); - context.put(SQL_CONSOLE_INTERCEPTED, sqlConsoleIntercepted); - if (Objects.nonNull(context.get(SqlCheckInterceptor.SQL_CHECK_INTERCEPTED))) { - return sqlConsoleIntercepted && (Boolean) context.get(SqlCheckInterceptor.SQL_CHECK_INTERCEPTED); + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) { + Map ctx = context.getContextMap(); + boolean sqlConsoleIntercepted = handle(request, response, session, ctx); + ctx.put(SQL_CONSOLE_INTERCEPTED, sqlConsoleIntercepted); + if (Objects.nonNull(ctx.get(SqlCheckInterceptor.SQL_CHECK_INTERCEPTED))) { + return sqlConsoleIntercepted && (Boolean) ctx.get(SqlCheckInterceptor.SQL_CHECK_INTERCEPTED); } else { return true; } @@ -199,7 +201,7 @@ protected String getExecuteStageName() { @Override public void doAfterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session, - @NonNull Map context) { + @NonNull AsyncExecuteContext context) { if (response.getStatus() != SqlExecuteStatus.SUCCESS) { return; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptor.java index 5fb77ecaa9..225e350614 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptor.java @@ -15,11 +15,10 @@ */ package com.oceanbase.odc.service.session.interceptor; -import java.util.Map; - import org.springframework.core.Ordered; import com.oceanbase.odc.core.session.ConnectionSession; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -44,11 +43,11 @@ public interface SqlExecuteInterceptor extends Ordered { * @return whether to execute this sql */ default boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception { return true; } default void afterCompletion(@NonNull SqlExecuteResult response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception {} + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception {} } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptorService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptorService.java index 76ad156cc9..662ed3bb5b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptorService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SqlExecuteInterceptorService.java @@ -29,6 +29,7 @@ import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.session.ConnectionSession; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq; import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp; import com.oceanbase.odc.service.session.model.SqlExecuteResult; @@ -61,7 +62,7 @@ public void init() { } public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response, - @NonNull ConnectionSession session, @NonNull Map context) throws Exception { + @NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) throws Exception { for (SqlExecuteInterceptor interceptor : interceptors) { if (interceptor.preHandle(request, response, session, context)) { continue; @@ -72,7 +73,7 @@ public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncE } public void afterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session, - @NonNull Map context) throws Exception { + @NonNull AsyncExecuteContext context) throws Exception { for (SqlExecuteInterceptor interceptor : interceptors) { interceptor.afterCompletion(response, session, context); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SwitchDatabaseInterceptor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SwitchDatabaseInterceptor.java index 73a9b31fa7..9d41f99e8e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SwitchDatabaseInterceptor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/interceptor/SwitchDatabaseInterceptor.java @@ -16,7 +16,6 @@ package com.oceanbase.odc.service.session.interceptor; import java.util.Collections; -import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -25,6 +24,7 @@ import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.core.session.ConnectionSessionUtil; import com.oceanbase.odc.core.sql.execute.model.SqlExecuteStatus; +import com.oceanbase.odc.service.session.model.AsyncExecuteContext; import com.oceanbase.odc.service.session.model.SqlExecuteResult; import com.oceanbase.odc.service.session.util.SchemaExtractor; @@ -41,7 +41,7 @@ public class SwitchDatabaseInterceptor implements SqlExecuteInterceptor { @Override public void afterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session, - @NonNull Map context) throws Exception { + @NonNull AsyncExecuteContext context) throws Exception { if (response.getStatus() != SqlExecuteStatus.SUCCESS) { return; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/model/AsyncExecuteContext.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/model/AsyncExecuteContext.java new file mode 100644 index 0000000000..11027990fe --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/model/AsyncExecuteContext.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.session.model; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; + +import com.oceanbase.odc.core.sql.execute.model.JdbcGeneralResult; +import com.oceanbase.odc.core.sql.execute.model.SqlTuple; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author liuyizhuo.lyz + * @date 2024/4/15 + */ +@Getter +@Setter +@Slf4j +public class AsyncExecuteContext { + private final List sqlTuples; + private final Queue results = new ConcurrentLinkedQueue<>(); + private final Map contextMap; + + private Future> future; + private String currentExecutingSqlTraceId; + private String currentExecutingSql; + private int totalExecutedSqlCount = 0; + + public AsyncExecuteContext(List sqlTuples, Map contextMap) { + this.sqlTuples = sqlTuples; + this.contextMap = contextMap; + } + + public boolean isFinished() { + return future != null && future.isDone(); + } + + public boolean isCancelled() { + return future != null && future.isCancelled(); + } + + public void incrementTotalExecutedSqlCount() { + totalExecutedSqlCount++; + } + + public int getToBeExecutedSqlCount() { + return sqlTuples.size(); + } + + /** + * only return the incremental results + */ + public List getMoreSqlExecutionResults() { + List copiedResults = new ArrayList<>(); + while (!results.isEmpty()) { + copiedResults.add(results.poll()); + } + return copiedResults; + } + + public void addSqlExecutionResults(List results) { + this.results.addAll(results); + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/model/AsyncExecuteResultResp.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/model/AsyncExecuteResultResp.java new file mode 100644 index 0000000000..ae94614894 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/model/AsyncExecuteResultResp.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.session.model; + +import java.util.List; + +import lombok.Data; + +/** + * @author liuyizhuo.lyz + * @date 2024/4/15 + */ +@Data +public class AsyncExecuteResultResp { + List results; + private String traceId; + private int total; + private int count; + private boolean finished; + private String sql; + + public AsyncExecuteResultResp(boolean finished, AsyncExecuteContext context, List results) { + this.finished = finished; + this.results = results; + traceId = context.getCurrentExecutingSqlTraceId(); + total = context.getToBeExecutedSqlCount(); + count = context.getTotalExecutedSqlCount(); + sql = context.getCurrentExecutingSql(); + } +}