Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Apr 22, 2024
1 parent b520b64 commit c52664d
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 56 deletions.
8 changes: 4 additions & 4 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
st.to_thrift(&response.status);
}

void CloudBackendService::warm_up_cache_async(TPreCacheAsyncResponse& response,
const TPreCacheAsyncRequest& request) {
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
std::string brpc_addr = fmt::format("{}:{}", request.host, request.brpc_port);
Status st = Status::OK();
TStatus t_status;
Expand Down Expand Up @@ -178,8 +178,8 @@ void CloudBackendService::warm_up_cache_async(TPreCacheAsyncResponse& response,
response.status = t_status;
}

void CloudBackendService::check_warm_up_cache_async(TCheckPreCacheResponse& response,
const TCheckPreCacheRequest& request) {
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) {
std::map<int64_t, bool> task_done;
io::FileCacheBlockDownloader::instance()->check_download_task(request.tablets, &task_done);
response.__set_task_done(task_done);
Expand Down
8 changes: 4 additions & 4 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class CloudBackendService final : public BaseBackendService {
const TWarmUpTabletsRequest& request) override;

// Download the datas which load in other cluster
void warm_up_cache_async(TPreCacheAsyncResponse& response,
const TPreCacheAsyncRequest& request) override;
void warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) override;

// Check whether the tablets finish warm up or not
void check_warm_up_cache_async(TCheckPreCacheResponse& response,
const TCheckPreCacheRequest& request) override;
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) override;

private:
[[maybe_unused]] CloudStorageEngine& _engine;
Expand Down
8 changes: 4 additions & 4 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,15 +1142,15 @@ void BaseBackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
result.__set_err_msg("query_ingest_binlog is not implemented");
}

void BaseBackendService::warm_up_cache_async(TPreCacheAsyncResponse& response,
const TPreCacheAsyncRequest& request) {
void BaseBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) {
LOG(ERROR) << "warm_up_cache_async is not implemented";
response.__set_status(
Status::NotSupported("warm_up_cache_async is not implemented").to_thrift());
}

void BaseBackendService::check_warm_up_cache_async(TCheckPreCacheResponse& response,
const TCheckPreCacheRequest& request) {
void BaseBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) {
LOG(ERROR) << "check_warm_up_cache_async is not implemented";
response.__set_status(
Status::NotSupported("check_warm_up_cache_async is not implemented").to_thrift());
Expand Down
8 changes: 4 additions & 4 deletions be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ class BaseBackendService : public BackendServiceIf {
////////////////////////////////////////////////////////////////////////////
// begin cloud backend functions
////////////////////////////////////////////////////////////////////////////
void warm_up_cache_async(TPreCacheAsyncResponse& response,
const TPreCacheAsyncRequest& request) override;
void warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
const TWarmUpCacheAsyncRequest& request) override;

void check_warm_up_cache_async(TCheckPreCacheResponse& response,
const TCheckPreCacheRequest& request) override;
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) override;

// If another cluster load, FE need to notify the cluster to sync the load data
void sync_load_for_tablets(TSyncLoadForTabletsResponse& response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2734,7 +2734,7 @@ public static boolean isNotCloudMode() {
@ConfField(description = {"Stream_Load 导入时,label 被限制的最大长度",
"Stream_Load When importing, the maximum length of label is limited"})
public static int label_regex_length = 128;

@ConfField(mutable = true, masterOnly = true)
public static int history_cloud_warm_up_job_keep_max_second = 7 * 24 * 3600;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;

public class CancelCloudWarmUpStmt extends CancelStmt {
private Expr whereClause;
Expand All @@ -34,8 +35,8 @@ public long getJobId() {

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (!config.isCloudMode()) {
throw new UserException("The sql is illegal in disk mode ");
if (!Config.isCloudMode()) {
throw new AnalysisException("The sql is illegal in disk mode ");
}
if (whereClause == null) {
throw new AnalysisException("Missing job id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.InternalCatalog;
Expand Down Expand Up @@ -74,7 +75,7 @@ public ShowCacheHotSpotStmt(String url) {

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (!config.isCloudMode()) {
if (!Config.isCloudMode()) {
throw new UserException("The sql is illegal in disk mode ");
}
super.analyze(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -63,7 +64,7 @@ public WarmUpClusterStmt(String dstClusterName, List<Map<TableName, String>> tab

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
if (!config.isCloudMode()) {
if (!Config.isCloudMode()) {
throw new UserException("The sql is illegal in disk mode ");
}
super.analyze(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;

import org.apache.commons.text.StringSubstitutor;
Expand Down Expand Up @@ -177,7 +176,7 @@ public static void execUpdate(String sql) throws Exception {
}
}

private static void execCreateDatabase() throws Exception {ClusterNamespace
private static void execCreateDatabase() throws Exception {
CreateDbStmt createDbStmt = new CreateDbStmt(true,
new DbName(null, FeConstants.INTERNAL_DB_NAME),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TCheckPreCacheRequest;
import org.apache.doris.thrift.TCheckPreCacheResponse;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPreCacheAsyncRequest;
import org.apache.doris.thrift.TPreCacheAsyncResponse;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -182,7 +182,7 @@ protected void runAfterCatalogReady() {
}

// 3 check whether the inflight preheating task has been completed
checkInflghtPreCache();
checkInflghtWarmUpCacheAsync();

// TODO(merge-cloud): wait add cloud upgrade mgr
// 4 migrate tablet for smooth upgrade
Expand Down Expand Up @@ -301,7 +301,7 @@ public void globalBalance() {
}
}

public void checkInflghtPreCache() {
public void checkInflghtWarmUpCacheAsync() {
Map<Long, List<Long>> beToTabletIds = new HashMap<Long, List<Long>>();

for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) {
Expand All @@ -319,9 +319,10 @@ public void checkInflghtPreCache() {
continue;
}

Map<Long, Boolean> taskDone = sendCheckPreCacheRpc(entry.getValue(), entry.getKey());
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey());
if (taskDone == null) {
LOG.warn("sendCheckPreCacheRpc return null be {}, inFight tasks {}", entry.getKey(), entry.getValue());
LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}",
entry.getKey(), entry.getValue());
continue;
}

Expand Down Expand Up @@ -571,13 +572,13 @@ private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) thr
try {
address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TPreCacheAsyncRequest req = new TPreCacheAsyncRequest();
TWarmUpCacheAsyncRequest req = new TWarmUpCacheAsyncRequest();
req.setHost(srcBackend.getHost());
req.setBrpcPort(srcBackend.getBrpcPort());
List<Long> tablets = new ArrayList<Long>();
tablets.add(pickedTablet.getId());
req.setTabletIds(tablets);
TPreCacheAsyncResponse result = client.preCacheAsync(req);
TWarmUpCacheAsyncResponse result = client.warmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
LOG.warn("pre cache failed status {} {}", result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
Expand All @@ -591,16 +592,16 @@ private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long destBe) thr
}
}

private Map<Long, Boolean> sendCheckPreCacheRpc(List<Long> tabletIds, long be) {
private Map<Long, Boolean> sendCheckWarmUpCacheAsyncRpc(List<Long> tabletIds, long be) {
BackendService.Client client = null;
TNetworkAddress address = null;
Backend destBackend = cloudSystemInfoService.getBackend(be);
try {
address = new TNetworkAddress(destBackend.getHost(), destBackend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TCheckPreCacheRequest req = new TCheckPreCacheRequest();
TCheckWarmUpCacheAsyncRequest req = new TCheckWarmUpCacheAsyncRequest();
req.setTablets(tabletIds);
TCheckPreCacheResponse result = client.checkPreCache(req);
TCheckWarmUpCacheAsyncResponse result = client.checkWarmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
LOG.warn("check pre cache status {} {}", result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.load.sync.SyncJobManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckPreCacheRequest;
import org.apache.doris.thrift.TCheckPreCacheResponse;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
Expand All @@ -38,8 +38,6 @@
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPreCacheAsyncRequest;
import org.apache.doris.thrift.TPreCacheAsyncResponse;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TPublishTopicResult;
import org.apache.doris.thrift.TQueryIngestBinlogRequest;
Expand All @@ -60,6 +58,8 @@
import org.apache.doris.thrift.TTransmitDataParams;
import org.apache.doris.thrift.TTransmitDataResult;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TWarmUpTabletsRequest;
import org.apache.doris.thrift.TWarmUpTabletsResponse;
import org.apache.doris.utframe.UtFrameUtils;
Expand Down Expand Up @@ -251,12 +251,12 @@ public TQueryIngestBinlogResult queryIngestBinlog(TQueryIngestBinlogRequest quer
}

@Override
public TPreCacheAsyncResponse preCacheAsync(TPreCacheAsyncRequest request) throws TException {
public TWarmUpCacheAsyncResponse warmUpCacheAsync(TWarmUpCacheAsyncRequest request) throws TException {
return null;
}

@Override
public TCheckPreCacheResponse checkPreCache(TCheckPreCacheRequest request) throws TException {
public TCheckWarmUpCacheAsyncResponse checkWarmUpCacheAsync(TCheckWarmUpCacheAsyncRequest request) throws TException {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.apache.doris.thrift.TBackendInfo;
import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckPreCacheRequest;
import org.apache.doris.thrift.TCheckPreCacheResponse;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TCloneReq;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TDropTabletReq;
Expand All @@ -57,8 +57,6 @@
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPreCacheAsyncRequest;
import org.apache.doris.thrift.TPreCacheAsyncResponse;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TPublishTopicResult;
import org.apache.doris.thrift.TQueryIngestBinlogRequest;
Expand All @@ -83,6 +81,8 @@
import org.apache.doris.thrift.TTransmitDataParams;
import org.apache.doris.thrift.TTransmitDataResult;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
import org.apache.doris.thrift.TWarmUpTabletsRequest;
import org.apache.doris.thrift.TWarmUpTabletsResponse;

Expand Down Expand Up @@ -424,13 +424,13 @@ public TCheckStorageFormatResult checkStorageFormat() throws TException {
}

@Override
public TPreCacheAsyncResponse preCacheAsync(TPreCacheAsyncRequest request) throws TException {
return new TPreCacheAsyncResponse();
public TWarmUpCacheAsyncResponse warmUpCacheAsync(TWarmUpCacheAsyncRequest request) throws TException {
return new TWarmUpCacheAsyncResponse();
}

@Override
public TCheckPreCacheResponse checkPreCache(TCheckPreCacheRequest request) throws TException {
return new TCheckPreCacheResponse();
public TCheckWarmUpCacheAsyncResponse checkWarmUpCacheAsync(TCheckWarmUpCacheAsyncRequest request) throws TException {
return new TCheckWarmUpCacheAsyncResponse();
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,21 @@ struct TCheckStorageFormatResult {
2: optional list<i64> v2_tablets;
}

struct TPreCacheAsyncRequest {
struct TWarmUpCacheAsyncRequest {
1: required string host
2: required i32 brpc_port
3: required list<i64> tablet_ids
}

struct TPreCacheAsyncResponse {
struct TWarmUpCacheAsyncResponse {
1: required Status.TStatus status
}

struct TCheckPreCacheRequest {
struct TCheckWarmUpCacheAsyncRequest {
1: optional list<i64> tablets
}

struct TCheckPreCacheResponse {
struct TCheckWarmUpCacheAsyncResponse {
1: required Status.TStatus status
2: optional map<i64, bool> task_done;
}
Expand Down Expand Up @@ -385,9 +385,9 @@ service BackendService {
// check tablet rowset type
TCheckStorageFormatResult check_storage_format();

TPreCacheAsyncResponse warm_up_cache_async(1: TPreCacheAsyncRequest request);
TWarmUpCacheAsyncResponse warm_up_cache_async(1: TWarmUpCacheAsyncRequest request);

TCheckPreCacheResponse check_warm_up_cache_async(1: TCheckPreCacheRequest request);
TCheckWarmUpCacheAsyncResponse check_warm_up_cache_async(1: TCheckWarmUpCacheAsyncRequest request);

TSyncLoadForTabletsResponse sync_load_for_tablets(1: TSyncLoadForTabletsRequest request);

Expand Down

0 comments on commit c52664d

Please sign in to comment.