Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replay logs of different tables in parallel #1492

Merged
merged 10 commits into from
Mar 11, 2024
Merged

Conversation

Lethannn
Copy link
Contributor

@Lethannn Lethannn commented Mar 4, 2024

Rationale

Related with #1466

Detailed Changes

Replay logs of different tables in parallel

Test Plan

CI

Cargo.toml Outdated
@@ -107,6 +107,7 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
dashmap = "5.5.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is a required dependency for this task?

If HashMap works, I prefer to stick with it first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to run 'replay_table_log_entries' concurrently, but I faced an issue with 'serial_exec_ctxs', which is a mutable reference HashMap. I had to wrap this by Arc and Mutex, then every time I grap a mutable reference to a value from the map, it locks the entire map.

DashMap allowing concurrent access to different keys. I wonder if there's an appoach to make Hashmap work in this case.

Copy link
Contributor

@jiacai2050 jiacai2050 Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then every time I grap a mutable reference to a value from the map, it locks the entire map.

I think it's fine to use plain Mutex here, since they are not the bottle neck, replay_table_log_entries is the most heavy task in this place.

Also, there is a partitioned lock in our codebase, you can use if you want to optimize here:
https://github.com/apache/incubator-horaedb/blob/9f166f3daa9a02ef8af1e733c22f956ab97e7aaf/src/components/partitioned_lock/src/lib.rs#L130

Copy link
Contributor Author

@Lethannn Lethannn Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then every time I grap a mutable reference to a value from the map, it locks the entire map.

I think it's fine to use plain Mutex here, since they are not the bottle neck, replay_table_log_entries is the most heavy task in this place.

Also, there is a partitioned lock in our codebase, you can use if you want to optimize here:

https://github.com/apache/incubator-horaedb/blob/9f166f3daa9a02ef8af1e733c22f956ab97e7aaf/src/components/partitioned_lock/src/lib.rs#L130

Awesome! I'm gonna check it out. Thx for the heads up

@apache apache deleted a comment from CLAassistant Mar 5, 2024
@CLAassistant
Copy link

CLAassistant commented Mar 5, 2024

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ Lethannn
❌ jiacai2050
You have signed the CLA already but the status is still pending? Let us recheck it.

let alter_failed_tables = HashMap::new();
let alter_failed_tables_ref = Arc::new(Mutex::new(alter_failed_tables));

let mut serial_exec_ctxs_dash_map = DashMap::new();
Copy link
Contributor

@jiacai2050 jiacai2050 Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map seems unnecessary, what I think of is like this:

modified   src/analytic_engine/src/instance/wal_replayer.rs
@@ -29,6 +29,7 @@ use common_types::{
     schema::{IndexInWriterSchema, Schema},
     table::ShardId,
 };
+use futures::StreamExt;
 use generic_error::BoxError;
 use lazy_static::lazy_static;
 use logger::{debug, error, info, trace, warn};
@@ -374,6 +375,7 @@ impl RegionBasedReplay {
         // TODO: No `group_by` method in `VecDeque`, so implement it manually here...
         Self::split_log_batch_by_table(log_batch, &mut table_batches);
 
+        let mut replay_tasks = Vec::with_capacity(table_batches.len());
         // TODO: Replay logs of different tables in parallel.
         for table_batch in table_batches {
             // Some tables may have failed in previous replay, ignore them.
@@ -384,22 +386,27 @@ impl RegionBasedReplay {
             // Replay all log entries of current table.
             // Some tables may have been moved to other shards or dropped, ignore such logs.
             if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) {
-                let result = replay_table_log_entries(
+                replay_tasks.push(replay_table_log_entries(
                     &context.flusher,
                     context.max_retry_flush_limit,
                     &mut ctx.serial_exec,
                     &ctx.table_data,
                     log_batch.range(table_batch.range),
-                )
-                .await;
+                ));
 
-                // If occur error, mark this table as failed and store the cause.
-                if let Err(e) = result {
-                    failed_tables.insert(table_batch.table_id, e);
-                }
+                // If occur error, mark this table as failed and store the
+                // cause. if let Err(e) = result {
+                //     failed_tables.insert(table_batch.table_id, e);
+                // }
             }
         }
-
+        for ret in futures::stream::iter(replay_tasks)
+            .buffer_unordered(20)
+            .collect::<Vec<_>>()
+            .await
+        {
+            // insert to failed_tables in there are errors
+        }
         Ok(())
     }

But this compile failed due to mutable reference

error[E0499]: cannot borrow `*serial_exec_ctxs` as mutable more than once at a time
   --> src/analytic_engine/src/instance/wal_replayer.rs:388:32
    |
388 |             if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) {
    |                                ^^^^^^^^^^^^^^^^ `*serial_exec_ctxs` was mutably borrowed here in the previous iteration of the loop
...
403 |         for ret in futures::stream::iter(replay_tasks)
    |                                          ------------ first borrow used here, in later iteration of loop


So the first step to do this task is to remove those mutable references.

The fix should be easy, just define serial_exec_ctxs with Arc<Mutex<HashMap>> type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async fn replay_single_batch(
    context: &ReplayContext,
    log_batch:  VecDeque<LogEntry<ReadPayload>>,
    serial_exec_ctxs: Arc<tokio::sync::Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
    failed_tables: &mut FailedTables,
) -> Result<()> {
    let mut table_batches = Vec::new();
    // TODO: No `group_by` method in `VecDeque`, so implement it manually here...
    Self::split_log_batch_by_table(log_batch, &mut table_batches);

    // TODO: Replay logs of different tables in parallel.
    let mut replay_tasks = Vec::with_capacity(table_batches.len());

    for table_batch in table_batches {
        // Some tables may have failed in previous replay, ignore them.
        if failed_tables.contains_key(&table_batch.table_id) {
            continue;
        }

        let serial_exec_ctxs = serial_exec_ctxs.clone();
        replay_tasks.push(async move {
            if let Some(ctx) = serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
                let result = replay_table_log_entries(
                    &context.flusher,
                    context.max_retry_flush_limit,
                    &mut ctx.serial_exec,
                    &ctx.table_data,
                    log_batch.range(table_batch.range),
                )
                    .await;
                (table_batch.table_id, result)
            } else {
                (table_batch.table_id, Ok(()))
            }
        });
    }

    for (table_id, ret) in futures::stream::iter(replay_tasks)
        .buffer_unordered(20)
        .collect::<Vec<_>>()
        .await
    {
        // If occur error, mark this table as failed and store the cause.
        if let Err(e) = ret {
            failed_tables.insert(table_id, e);
        }
    }

    Ok(())
}

I ran into the same compile failed before. Here is my code. Is this what you were expecting? However, My concern is, wouldn't serial_exec_ctxs.lock().await.get_mut break concurrency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into the same compile failed before.

I push my commits to your branch, it compile OK.

wouldn't serial_exec_ctxs.lock().await.get_mut break concurrency?

Yes, this step will be run in serially, but we make replay_table_log_entries concurrent, which is what we want.

@jiacai2050 jiacai2050 changed the title [WIP]fix: Replay logs of different tables in parallel feat: replay logs of different tables in parallel Mar 11, 2024
@jiacai2050
Copy link
Contributor

I will merge this PR once CI pass.

@Lethannn If you have more free time, you can measure how much time is reduced in your deployment.

Leave an comment here if you have the numbers.

Copy link
Contributor

@jiacai2050 jiacai2050 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jiacai2050 jiacai2050 merged commit 66d7a0d into apache:main Mar 11, 2024
9 checks passed
zealchen pushed a commit to zealchen/incubator-horaedb that referenced this pull request Apr 9, 2024
## Rationale

Related with apache#1466

## Detailed Changes
Replay logs of different tables in parallel

## Test Plan
CI

---------

Co-authored-by: jiacai2050 <dev@liujiacai.net>
zealchen pushed a commit to zealchen/incubator-horaedb that referenced this pull request Apr 9, 2024
## Rationale

Related with apache#1466

## Detailed Changes
Replay logs of different tables in parallel

## Test Plan
CI

---------

Co-authored-by: jiacai2050 <dev@liujiacai.net>
zealchen pushed a commit to zealchen/incubator-horaedb that referenced this pull request Apr 9, 2024
## Rationale

Related with apache#1466

## Detailed Changes
Replay logs of different tables in parallel

## Test Plan
CI

---------

Co-authored-by: jiacai2050 <dev@liujiacai.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants