Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sql-execute): refactor SQL async execute api into streaming return #2246

Merged
merged 18 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.oceanbase.odc.core.sql.execute.model.SqlExecuteStatus;
import com.oceanbase.odc.core.sql.execute.model.SqlTuple;
import com.oceanbase.odc.service.session.interceptor.NlsFormatInterceptor;
import com.oceanbase.odc.service.session.model.AsyncExecuteContext;
import com.oceanbase.odc.service.session.model.SqlExecuteResult;

/**
Expand All @@ -43,7 +44,7 @@ public void afterCompletion_mysql_notingSet() throws Exception {
ConnectionSession session = getConnectionSession(ConnectType.OB_MYSQL);
NlsFormatInterceptor interceptor = new NlsFormatInterceptor();
SqlExecuteResult r = getResponse("set session nls_date_format='DD-MON-RR'", SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -52,7 +53,7 @@ public void afterCompletion_failedSqlResult_notingSet() throws Exception {
ConnectionSession session = getConnectionSession(ConnectType.OB_ORACLE);
NlsFormatInterceptor interceptor = new NlsFormatInterceptor();
SqlExecuteResult r = getResponse("set session nls_date_format='DD-MON-RR'", SqlExecuteStatus.FAILED);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -65,7 +66,7 @@ public void afterCompletion_multiSqls_notingSet() throws Exception {
+ "begin\n"
+ "dbms_output.put_line('aaaa');\n"
+ "end;", SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -74,7 +75,7 @@ public void afterCompletion_noSetVarExists_notingSet() throws Exception {
ConnectionSession session = getConnectionSession(ConnectType.OB_ORACLE);
NlsFormatInterceptor interceptor = new NlsFormatInterceptor();
SqlExecuteResult r = getResponse("-- comment\nselect 123 from dual;", SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertNull(ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -85,7 +86,7 @@ public void afterCompletion_commentWithSetVar_setSucceed() throws Exception {
String expect = "DD-MON-RR";
SqlExecuteResult r = getResponse("-- comment\nset session nls_date_format='" + expect + "';",
SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertEquals(expect, ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -96,7 +97,7 @@ public void afterCompletion_multiCommentsWithSetVar_setSucceed() throws Exceptio
String expect = "DD-MON-RR";
SqlExecuteResult r = getResponse("/*asdasdasd*/ set session nls_date_format='" + expect + "';",
SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertEquals(expect, ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -107,7 +108,7 @@ public void afterCompletion_nlsTimestampFormat_setSucceed() throws Exception {
String expect = "DD-MON-RR";
SqlExecuteResult r = getResponse("/*asdasdasd*/ set session nls_timestamp_format='" + expect + "';",
SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertEquals(expect, ConnectionSessionUtil.getNlsTimestampFormat(session));
}

Expand All @@ -118,7 +119,7 @@ public void afterCompletion_nlsTimestampTZFormat_setSucceed() throws Exception {
String expect = "DD-MON-RR";
SqlExecuteResult r = getResponse("/*asdasdasd*/ set session nls_timestamp_tz_format='" + expect + "';",
SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertEquals(expect, ConnectionSessionUtil.getNlsTimestampTZFormat(session));
}

Expand All @@ -129,7 +130,7 @@ public void afterCompletion_setGlobal_nothingSet() throws Exception {
String expect = "DD-MON-RR";
SqlExecuteResult r = getResponse("/*asdasdasd*/ set global nls_timestamp_tz_format='" + expect + "';",
SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertNull(ConnectionSessionUtil.getNlsTimestampTZFormat(session));
}

Expand All @@ -140,7 +141,7 @@ public void afterCompletion_alterSession_setSucceed() throws Exception {
String expect = "DD-MON-RR";
SqlExecuteResult r = getResponse("/*asdsd*/ alter session \n\t\r set \"nls_date_format\"='" + expect + "';",
SqlExecuteStatus.SUCCESS);
interceptor.afterCompletion(r, session, new HashMap<>());
interceptor.afterCompletion(r, session, getContext());
Assert.assertEquals(expect, ConnectionSessionUtil.getNlsDateFormat(session));
}

Expand All @@ -159,4 +160,8 @@ private ConnectionSession getConnectionSession(ConnectType type) {
return session;
}

private AsyncExecuteContext getContext() {
return new AsyncExecuteContext(null, new HashMap<>());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ public class ConnectionSessionConstants {
*/
public static final String QUERY_CACHE_KEY = "QUERY_CACHE";
public static final String FUTURE_JDBC_RESULT_KEY = "FUTURE_JDBC_RESULT";
public static final String ASYNC_EXECUTE_CONTEXT_KEY = "ASYNC_EXECUTE_CONTEXT";
/**
* The connection_id current database session needs to be stored in the database session in the form
* of attributes, this is the key
*/
public static final String CONNECTION_ID_KEY = "CONNECTION_ID";
public static final String OB_PROXY_SESSID_KEY = "PROXY_SESSID";
/**
* The {@link BinaryDataManager} session needs to be stored in the database session in the form of
* attributes, this is the key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,52 @@ public static <T> void removeFutureJdbc(@NonNull ConnectionSession connectionSes
id2FutureResult.remove(requestId);
}

@SuppressWarnings("all")
public static <T> String setExecuteContext(@NonNull ConnectionSession connectionSession,
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
@NonNull Object context) {
Object value = connectionSession.getAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY);
Map<String, Object> id2ExecuteContext;
if (value instanceof Map) {
id2ExecuteContext = (Map<String, Object>) value;
} else {
id2ExecuteContext = new ConcurrentHashMap<>();
connectionSession.setAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY, id2ExecuteContext);
}
String uuid = UUID.randomUUID().toString();
id2ExecuteContext.putIfAbsent(uuid, context);
return uuid;
}

@SuppressWarnings("all")
public static <T> Object getExecuteContext(@NonNull ConnectionSession connectionSession,
yhilmare marked this conversation as resolved.
Show resolved Hide resolved
@NonNull String requestId) {
Object value = connectionSession.getAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY);
Map<String, Object> id2ExecuteContext;
if (value instanceof Map) {
id2ExecuteContext = (Map<String, Object>) value;
} else {
throw new NotFoundException(ResourceType.ODC_ASYNC_SQL_RESULT, "session id", connectionSession.getId());
}
Object context = id2ExecuteContext.get(requestId);
if (context == null) {
throw new NotFoundException(ResourceType.ODC_ASYNC_SQL_RESULT, "request id", requestId);
}
return context;
}

@SuppressWarnings("all")
public static <T> void removeExecuteContext(@NonNull ConnectionSession connectionSession,
@NonNull String requestId) {
Object value = connectionSession.getAttribute(ConnectionSessionConstants.ASYNC_EXECUTE_CONTEXT_KEY);
Map<String, Object> id2ExecuteContext;
if (!(value instanceof Map)) {
throw new NullPointerException("Result not found by session id " + connectionSession.getId());
} else {
id2ExecuteContext = (Map<String, Object>) value;
}
id2ExecuteContext.remove(requestId);
}

public static void setUserId(@NonNull ConnectionSession connectionSession, @NonNull Long userId) {
connectionSession.setAttribute(ConnectionSessionConstants.USER_ID_KEY, userId);
}
Expand Down Expand Up @@ -535,6 +581,11 @@ public static String getConsoleConnectionId(@NonNull ConnectionSession connectio
return (String) connectionSession.getAttribute(ConnectionSessionConstants.CONNECTION_ID_KEY);
}

public static String getConsoleConnectionProxySessId(@NonNull ConnectionSession connectionSession) {
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
Object proxySessId = connectionSession.getAttribute(ConnectionSessionConstants.OB_PROXY_SESSID_KEY);
return proxySessId == null ? null : (String) proxySessId;
}

public static String getVersion(@NonNull ConnectionSession connectionSession) {
return (String) connectionSession.getAttribute(ConnectionSessionConstants.OB_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
Expand Down Expand Up @@ -380,4 +382,78 @@ public static String queryDBMSOutput(@NonNull Connection connection, Integer max
}
}

public static String queryOBProxySessId(@NonNull Statement statement, @NonNull DialectType dialectType,
@NonNull String connectionId) throws SQLException {
String proxySessId = null;
String sql = "select proxy_sessid from "
+ (dialectType.isMysql() ? "oceanbase" : "sys")
+ ".v$ob_processlist where id = "
+ connectionId;
try (ResultSet rs = statement.executeQuery(sql)) {
if (rs.next()) {
proxySessId = rs.getString(1);
}
}
return proxySessId;
}

public static List<String> querySessionIdsByProxySessId(@NonNull Statement statement,
@NonNull String proxySessId, ConnectType connectType) throws SQLException {
DialectType dialectType = connectType.getDialectType();
SqlBuilder sqlBuilder = getBuilder(connectType)
.append("select id from ")
.append(dialectType.isMysql() ? "oceanbase" : "sys")
.append(".v$ob_processlist where proxy_sessid = ")
.append(proxySessId);
List<String> ids = new ArrayList<>();
try (ResultSet rs = statement.executeQuery(sqlBuilder.toString())) {
while (rs.next()) {
ids.add(rs.getString(1));
}
}
return ids;
}

/**
* OceanBase only supports ASH views in versions higher than 4.0. Therefore, this method is not
* applicable to earlier versions, please use sql_audit instead.
*/
public static String queryTraceIdFromASH(@NonNull Statement statement,
@NonNull List<String> sessionIds, ConnectType connectType) throws SQLException {
DialectType dialectType = connectType.getDialectType();
SqlBuilder sqlBuilder = getBuilder(connectType)
.append("select trace_id from ")
.append(dialectType.isMysql() ? "oceanbase" : "sys")
.append(".v$active_session_history where session_id in (")
.append(String.join(",", sessionIds))
.append(")")
.append(dialectType.isMysql() ? " limit 1" : " and rownum=1");
try (ResultSet rs = statement.executeQuery(sqlBuilder.toString())) {
if (!rs.next()) {
throw new SQLException("No result found in ASH.");
}
return rs.getString(1);
}
}

/**
* OceanBase only supports ASH views in versions higher than 4.0. Therefore, this method is not
* applicable to earlier versions, please use sql_audit instead.
*/
public static String queryPlanIdByTraceId(@NonNull Statement statement, String traceId, ConnectType connectType)
throws SQLException {
DialectType dialectType = connectType.getDialectType();
SqlBuilder sqlBuilder = getBuilder(connectType)
.append("select plan_id from ")
.append(dialectType.isMysql() ? "oceanbase" : "sys")
.append(".v$active_session_history where trace_id=")
.value(traceId);
try (ResultSet rs = statement.executeQuery(sqlBuilder.toString())) {
if (!rs.next()) {
throw new SQLException("No result found in ASH.");
}
return rs.getString(1);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.oceanbase.odc.service.partitionplan.model.PartitionPlanPreviewReq;
import com.oceanbase.odc.service.session.ConnectConsoleService;
import com.oceanbase.odc.service.session.ConnectSessionService;
import com.oceanbase.odc.service.session.model.AsyncExecuteResultResp;
import com.oceanbase.odc.service.session.model.BinaryContent;
import com.oceanbase.odc.service.session.model.QueryTableOrViewDataReq;
import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq;
Expand Down Expand Up @@ -120,6 +121,13 @@ public SuccessResponse<SqlAsyncExecuteResp> asyncSqlExecute(@PathVariable String
return Responses.success(consoleService.execute(SidUtils.getSessionId(sessionId), req));
}

@RequestMapping(value = {"/sessions/{sessionId}/sqls/streamExecute"}, method = RequestMethod.POST)
@StatefulRoute(stateName = StateName.DB_SESSION, stateIdExpression = "#sessionId")
public SuccessResponse<SqlAsyncExecuteResp> streamExecute(@PathVariable String sessionId,
@RequestBody SqlAsyncExecuteReq req) throws Exception {
return Responses.success(consoleService.streamExecute(SidUtils.getSessionId(sessionId), req, true));
}

/**
* 获取异步执行sql的结果 Todo 这里的sqlIds后续需要改成一个string类型的requestId,异步api请求需要有超时机制
*
Expand All @@ -134,6 +142,13 @@ public SuccessResponse<List<SqlExecuteResult>> getAsyncSqlExecute(@PathVariable
return Responses.success(consoleService.getAsyncResult(SidUtils.getSessionId(sessionId), requestId, null));
}

@RequestMapping(value = "/sessions/{sessionId}/sqls/getMoreResults", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.DB_SESSION, stateIdExpression = "#sessionId")
public SuccessResponse<AsyncExecuteResultResp> getMoreResults(@PathVariable String sessionId,
@RequestParam String requestId) {
return Responses.success(consoleService.getMoreResults(SidUtils.getSessionId(sessionId), requestId));
}

/**
* 对 sql 脚本的内容进行静态检查
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.StatementCallback;

import com.oceanbase.odc.common.util.VersionUtils;
import com.oceanbase.odc.core.datasource.DataSourceFactory;
import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.core.session.ConnectionSessionConstants;
import com.oceanbase.odc.core.session.ConnectionSessionUtil;
import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.core.shared.constant.DialectType;
import com.oceanbase.odc.core.sql.execute.GeneralSyncJdbcExecutor;
import com.oceanbase.odc.core.sql.execute.SyncJdbcExecutor;
import com.oceanbase.odc.core.sql.util.OBUtils;
import com.oceanbase.odc.plugin.connect.api.InformationExtensionPoint;
import com.oceanbase.odc.service.plugin.ConnectionPluginUtil;

Expand Down Expand Up @@ -61,6 +64,12 @@ public static void initConnectionId(@NonNull Statement statement, @NonNull Conne
String sessionId = queryConnectionId(statement, connectionSession.getDialectType());
Verify.notNull(sessionId, "SessionId");
connectionSession.setAttribute(ConnectionSessionConstants.CONNECTION_ID_KEY, sessionId);
if (connectionSession.getDialectType().isOceanbase() && VersionUtils.isGreaterThanOrEqualsTo(
ConnectionSessionUtil.getVersion(connectionSession), "4.2")) {
String proxySessId =
OBUtils.queryOBProxySessId(statement, connectionSession.getDialectType(), sessionId);
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
connectionSession.setAttribute(ConnectionSessionConstants.OB_PROXY_SESSID_KEY, proxySessId);
}
} catch (Exception exception) {
log.warn("Failed to get database session ID, session={}", connectionSession, exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -40,6 +39,7 @@
import com.oceanbase.odc.service.datasecurity.util.DataMaskingUtil;
import com.oceanbase.odc.service.db.browser.DBSchemaAccessors;
import com.oceanbase.odc.service.session.interceptor.BaseTimeConsumingInterceptor;
import com.oceanbase.odc.service.session.model.AsyncExecuteContext;
import com.oceanbase.odc.service.session.model.DBResultSetMetaData;
import com.oceanbase.odc.service.session.model.SqlAsyncExecuteReq;
import com.oceanbase.odc.service.session.model.SqlAsyncExecuteResp;
Expand All @@ -64,14 +64,14 @@ public class DataMaskingInterceptor extends BaseTimeConsumingInterceptor {

@Override
public boolean preHandle(@NonNull SqlAsyncExecuteReq request, @NonNull SqlAsyncExecuteResp response,
@NonNull ConnectionSession session, @NonNull Map<String, Object> context) {
@NonNull ConnectionSession session, @NonNull AsyncExecuteContext context) {
return true;
}

@Override
@SuppressWarnings("all")
public void doAfterCompletion(@NonNull SqlExecuteResult response, @NonNull ConnectionSession session,
@NonNull Map<String, Object> context) throws Exception {
@NonNull AsyncExecuteContext context) throws Exception {
// TODO: May intercept sensitive column operation (WHERE / ORDER BY / HAVING)
if (!maskingService.isMaskingEnabled()) {
return;
Expand Down
Loading
Loading