Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.x develop limit storage #12492

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.service.sql.QueryType;
import com.alibaba.nacos.config.server.service.sql.SelectRequest;
import com.alibaba.nacos.config.server.service.sql.limit.SqlLimiter;
import com.alibaba.nacos.config.server.service.sql.limit.SqlTypeLimiter;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.consistency.SerializeFactory;
Expand All @@ -57,8 +59,8 @@
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.utils.ClassUtils;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.springframework.context.annotation.Conditional;
Expand All @@ -71,6 +73,7 @@
import org.springframework.transaction.support.TransactionTemplate;

import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -167,10 +170,13 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen

private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

private final SqlLimiter sqlLimiter;

public DistributedDatabaseOperateImpl(ServerMemberManager memberManager, ProtocolManager protocolManager)
throws Exception {
this.memberManager = memberManager;
this.protocol = protocolManager.getCpProtocol();
sqlLimiter = new SqlTypeLimiter();
init();
}

Expand Down Expand Up @@ -223,8 +229,8 @@ public <R> R queryOne(String sql, Class<R> cls) {
SelectRequest.builder().queryType(QueryType.QUERY_ONE_NO_MAPPER_NO_ARGS).sql(sql)
.className(cls.getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -247,8 +253,8 @@ public <R> R queryOne(String sql, Object[] args, Class<R> cls) {
SelectRequest.builder().queryType(QueryType.QUERY_ONE_NO_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(cls.getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -271,8 +277,8 @@ public <R> R queryOne(String sql, Object[] args, RowMapper<R> mapper) {
SelectRequest.builder().queryType(QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(mapper.getClass().getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -296,8 +302,8 @@ public <R> List<R> queryMany(String sql, Object[] args, RowMapper<R> mapper) {
SelectRequest.builder().queryType(QueryType.QUERY_MANY_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(mapper.getClass().getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -320,8 +326,8 @@ public <R> List<R> queryMany(String sql, Object[] args, Class<R> rClass) {
SelectRequest.builder().queryType(QueryType.QUERY_MANY_NO_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(rClass.getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -344,8 +350,8 @@ public List<Map<String, Object>> queryMany(String sql, Object[] args) {
SelectRequest.builder().queryType(QueryType.QUERY_MANY_WITH_LIST_WITH_ARGS).sql(sql).args(args)
.build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand Down Expand Up @@ -390,9 +396,10 @@ public CompletableFuture<RestResult<String>> dataImport(File file) {
if (submit) {
List<ModifyRequest> requests = batchUpdate.stream().map(ModifyRequest::new)
.collect(Collectors.toList());
CompletableFuture<Response> future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
CompletableFuture<Response> future = protocol.writeAsync(
WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
futures.add(future);
batchUpdate.clear();
}
Expand Down Expand Up @@ -463,8 +470,8 @@ public List<SnapshotOperation> loadSnapshotOperate() {
@SuppressWarnings("all")
@Override
public Response onRequest(final ReadRequest request) {
final SelectRequest selectRequest = serializer
.deserialize(request.getData().toByteArray(), SelectRequest.class);
final SelectRequest selectRequest = serializer.deserialize(request.getData().toByteArray(),
SelectRequest.class);

LoggerUtils.printIfDebugEnabled(LogUtil.DEFAULT_LOG, "getData info : selectRequest : {}", selectRequest);

Expand All @@ -473,6 +480,7 @@ public Response onRequest(final ReadRequest request) {
readLock.lock();
Object data;
try {
sqlLimiter.doLimitForSelectRequest(selectRequest);
switch (type) {
case QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS:
data = queryOne(jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), mapper);
Expand Down Expand Up @@ -515,10 +523,11 @@ public Response onApply(WriteRequest log) {
LoggerUtils.printIfDebugEnabled(LogUtil.DEFAULT_LOG, "onApply info : log : {}", log);
final ByteString byteString = log.getData();
Preconditions.checkArgument(byteString != null, "Log.getData() must not null");
List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
final Lock lock = readLock;
lock.lock();
try {
List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
sqlLimiter.doLimitForModifyRequest(sqlContext);
boolean isOk = false;
if (log.containsExtendInfo(DATA_IMPORT_KEY)) {
isOk = doDataImport(jdbcTemplate, sqlContext);
Expand All @@ -539,6 +548,9 @@ public Response onApply(WriteRequest log) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
} catch (DataAccessException e) {
throw new ConsistencyException(e.toString());
} catch (SQLException e) {
LogUtil.FATAL_LOG.error("onApply warn : log : {}", log, e);
return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
} catch (Throwable t) {
throw t;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.alibaba.nacos.config.server.service.datasource.DataSourceService;
import com.alibaba.nacos.config.server.service.datasource.DynamicDataSource;
import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.service.sql.limit.SqlLimiter;
import com.alibaba.nacos.config.server.service.sql.limit.SqlTypeLimiter;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.sys.utils.DiskUtils;
import org.apache.commons.lang3.BooleanUtils;
Expand Down Expand Up @@ -55,11 +57,14 @@ public class StandaloneDatabaseOperateImpl implements BaseDatabaseOperate {

private TransactionTemplate transactionTemplate;

private SqlLimiter sqlLimiter;

@PostConstruct
protected void init() {
DataSourceService dataSourceService = DynamicDataSource.getInstance().getDataSource();
jdbcTemplate = dataSourceService.getJdbcTemplate();
transactionTemplate = dataSourceService.getTransactionTemplate();
sqlLimiter = new SqlTypeLimiter();
LogUtil.DEFAULT_LOG.info("use StandaloneDatabaseOperateImpl");
}

Expand Down Expand Up @@ -104,6 +109,7 @@ public CompletableFuture<RestResult<String>> dataImport(File file) {
while (iterator.hasNext()) {
String sql = iterator.next();
if (StringUtils.isNotBlank(sql)) {
sqlLimiter.doLimit(sql);
batchUpdate.add(sql);
}
if (batchUpdate.size() == batchSize || !iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.config.server.service.sql.limit;

import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.service.sql.SelectRequest;

import java.sql.SQLException;
import java.util.List;

/**
* SQL limiter.
*
* @author xiweng.yy
*/
public interface SqlLimiter {

/**
* Do SQL limit for modify request.
*
* @param modifyRequest modify request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForModifyRequest(ModifyRequest modifyRequest) throws SQLException;

/**
* Do SQL limit for modify request.
*
* @param modifyRequests modify request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForModifyRequest(List<ModifyRequest> modifyRequests) throws SQLException;

/**
* Do SQL limit for select request.
*
* @param selectRequest select request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForSelectRequest(SelectRequest selectRequest) throws SQLException;

/**
* Do SQL limit for select request.
*
* @param selectRequests select request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForSelectRequest(List<SelectRequest> selectRequests) throws SQLException;

/**
* Do SQL limit for sql.
*
* @param sql SQL
* @throws SQLException when SQL match the limit rule.
*/
void doLimit(String sql) throws SQLException;

/**
* Do SQL limit for sql.
*
* @param sql SQL
* @throws SQLException when SQL match the limit rule.
*/
void doLimit(List<String> sql) throws SQLException;
}
Loading
Loading