|
17 | 17 |
|
18 | 18 | #include "agent/task_worker_pool.h" |
19 | 19 |
|
| 20 | +#include <brpc/controller.h> |
20 | 21 | #include <fmt/format.h> |
21 | 22 | #include <gen_cpp/AgentService_types.h> |
22 | 23 | #include <gen_cpp/DataSinks_types.h> |
|
84 | 85 | #include "runtime/memory/global_memory_arbitrator.h" |
85 | 86 | #include "runtime/snapshot_loader.h" |
86 | 87 | #include "service/backend_options.h" |
| 88 | +#include "util/brpc_client_cache.h" |
87 | 89 | #include "util/debug_points.h" |
88 | 90 | #include "util/doris_metrics.h" |
89 | 91 | #include "util/jni-util.h" |
@@ -291,7 +293,7 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age |
291 | 293 | job.process_alter_tablet(agent_task_req.alter_tablet_req_v2), |
292 | 294 | [&](const doris::Exception& ex) { |
293 | 295 | DorisMetrics::instance()->create_rollup_requests_failed->increment(1); |
294 | | - job.clean_up_on_failed(); |
| 296 | + job.clean_up_on_failure(); |
295 | 297 | }); |
296 | 298 | return Status::OK(); |
297 | 299 | }(); |
@@ -604,6 +606,52 @@ Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { |
604 | 606 | }); |
605 | 607 | } |
606 | 608 |
|
| 609 | +Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const TAgentTaskRequest& task) { |
| 610 | + const TTaskType::type task_type = task.task_type; |
| 611 | + int64_t signature = task.signature; |
| 612 | + std::string type_str; |
| 613 | + EnumToString(TTaskType, task_type, type_str); |
| 614 | + auto req = std::make_unique<TAgentTaskRequest>(task); |
| 615 | + |
| 616 | + DCHECK(req->__isset.priority && req->priority == TPriority::HIGH); |
| 617 | + do { |
| 618 | + std::lock_guard lock(s_task_signatures_mtx); |
| 619 | + auto& set = s_task_signatures[task_type]; |
| 620 | + if (!set.contains(signature)) { |
| 621 | + // If it doesn't exist, put it directly into the priority queue |
| 622 | + add_task_count(*req, 1); |
| 623 | + set.insert(signature); |
| 624 | + std::lock_guard lock(_mtx); |
| 625 | + _high_prior_queue.push_back(std::move(req)); |
| 626 | + _high_prior_condv.notify_one(); |
| 627 | + _normal_condv.notify_one(); |
| 628 | + break; |
| 629 | + } else { |
| 630 | + std::lock_guard lock(_mtx); |
| 631 | + for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { |
| 632 | + // If it exists in the normal queue, cancel the task in the normal queue |
| 633 | + if ((*it)->signature == signature) { |
| 634 | + _normal_queue.erase(it); // cancel the original task |
| 635 | + _high_prior_queue.push_back(std::move(req)); // add the new task to the queue |
| 636 | + _high_prior_condv.notify_one(); |
| 637 | + _normal_condv.notify_one(); |
| 638 | + break; |
| 639 | + } else { |
| 640 | + ++it; // doesn't meet the condition, continue to the next one |
| 641 | + } |
| 642 | + } |
| 643 | + // If it exists in the high priority queue, no operation is needed |
| 644 | + LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); |
| 645 | + } |
| 646 | + } while (false); |
| 647 | + |
| 648 | + // Set the receiving time of task so that we can determine whether it is timed out later |
| 649 | + (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr)); |
| 650 | + |
| 651 | + LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); |
| 652 | + return Status::OK(); |
| 653 | +} |
| 654 | + |
607 | 655 | void PriorTaskWorkerPool::normal_loop() { |
608 | 656 | while (true) { |
609 | 657 | std::unique_ptr<TAgentTaskRequest> req; |
@@ -1432,23 +1480,15 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr |
1432 | 1480 | DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name; |
1433 | 1481 | auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder(); |
1434 | 1482 | auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param); |
1435 | | - S3ClientConf conf { |
1436 | | - .endpoint {}, |
1437 | | - .region {}, |
1438 | | - .ak = std::move(new_s3_conf.client_conf.ak), |
1439 | | - .sk = std::move(new_s3_conf.client_conf.sk), |
1440 | | - .token = std::move(new_s3_conf.client_conf.token), |
1441 | | - .bucket {}, |
1442 | | - .provider = new_s3_conf.client_conf.provider, |
1443 | | - }; |
| 1483 | + S3ClientConf conf = std::move(new_s3_conf.client_conf); |
1444 | 1484 | st = client->reset(conf); |
1445 | 1485 | fs = std::move(existed_fs); |
1446 | 1486 | } |
1447 | 1487 |
|
1448 | 1488 | if (!st.ok()) { |
1449 | 1489 | LOG(WARNING) << "update s3 resource failed: " << st; |
1450 | 1490 | } else { |
1451 | | - LOG_INFO("successfully update hdfs resource") |
| 1491 | + LOG_INFO("successfully update s3 resource") |
1452 | 1492 | .tag("resource_id", param.id) |
1453 | 1493 | .tag("resource_name", param.name); |
1454 | 1494 | put_storage_resource(param.id, {std::move(fs)}, param.version); |
|
0 commit comments