Skip to content

Commit

Permalink
update the PR.
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Jul 24, 2024
1 parent c811629 commit ed3968e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-protos = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

File store backfiller.

# Why you need a backfiller?

In rare case, upstream changes that don't reflect in proto schema require backfill.
To mitigate the issue, you need a backfiller to reply transactions against the new proto
schema so that the changes can surface up.


## How to run it.

Example of config:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod metrics;
pub mod processor;

use anyhow::Result;
Expand All @@ -25,6 +24,10 @@ pub struct IndexerGrpcFileStoreBackfillerConfig {
pub transactions_count: Option<u64>,
#[serde(default = "default_validation_mode")]
pub validation_mode: bool,
#[serde(default = "default_backfill_processing_task_count")]
pub backfill_processing_task_count: usize,
#[serde(default = "default_validating_task_count")]
pub validating_task_count: usize,
}

const fn default_enable_cache_compression() -> bool {
Expand All @@ -35,6 +38,14 @@ const fn default_validation_mode() -> bool {
false
}

const fn default_backfill_processing_task_count() -> usize {
20
}

const fn default_validating_task_count() -> usize {
50
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcFileStoreBackfillerConfig {
async fn run(&self) -> Result<()> {
Expand All @@ -47,6 +58,8 @@ impl RunnableConfig for IndexerGrpcFileStoreBackfillerConfig {
self.starting_version,
self.transactions_count,
self.validation_mode,
self.backfill_processing_task_count,
self.validating_task_count,
)
.await
.expect("Failed to create file store processor");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct Processor {
progress_file_path: String,
ending_version: Option<u64>,
validation_mode: bool,
backfill_processing_task_count: usize,
validating_task_count: usize,
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -49,6 +51,8 @@ impl Processor {
starting_version: Option<u64>,
transactions_count: Option<u64>,
validation_mode: bool,
backfill_processing_task_count: usize,
validating_task_count: usize,
) -> Result<Self> {
let _cache_storage_format = if enable_cache_compression {
StorageFormat::Lz4CompressedProto
Expand All @@ -64,6 +68,8 @@ impl Processor {
progress_file_path,
ending_version: transactions_count.map(|c| starting_version.unwrap_or(0) + c),
validation_mode,
backfill_processing_task_count,
validating_task_count,
});
}

Expand All @@ -84,8 +90,13 @@ impl Processor {
},
};
let expected_starting_version = std::cmp::max(starting_version, progress_file.version);
tracing::info!(
"Starting backfill from version {}",
expected_starting_version
);
if let Some(expected_end_version) = expected_end_version {
if expected_starting_version >= expected_end_version {
tracing::info!("Backfill is already done.");
// Backfill is already done.
exit(0);
}
Expand Down Expand Up @@ -114,6 +125,8 @@ impl Processor {
progress_file_path,
ending_version: expected_end_version,
validation_mode,
backfill_processing_task_count,
validating_task_count,
})
}

Expand All @@ -129,6 +142,7 @@ impl Processor {
pub async fn backfill(&mut self) -> Result<()> {
let (sender, receiver) = tokio::sync::mpsc::channel::<Vec<Transaction>>(1000);
// Get the stream out.
// This is required, since the batch returned by the stream is not guaranteed to be 1000.
let mut transactions_buffer = BTreeMap::new();
let mut next_version_to_process = self.starting_version;
let finished_starting_versions = Arc::new(Mutex::new(BTreeSet::new()));
Expand All @@ -155,7 +169,7 @@ impl Processor {
let mut tasks = Vec::new();
let receiver_ref = std::sync::Arc::new(Mutex::new(receiver));
let file_store_operator = self.file_store_operator.clone_box();
for _ in 0..20 {
for _ in 0..self.backfill_processing_task_count {
tracing::info!("Creating a new task");
let mut current_file_store_operator = file_store_operator.clone_box();
let current_finished_starting_versions = finished_starting_versions.clone();
Expand Down Expand Up @@ -254,7 +268,7 @@ impl Processor {
Ok(response) => response,
Err(e) => {
tracing::error!("Failed to get response: {:?}", e);
panic!();
panic!("Failed to get response: {:?}", e);
},
};

Expand All @@ -264,6 +278,7 @@ impl Processor {
let transactions = txns.transactions;
for txn in transactions {
let version = txn.version;
// Partial batch may be received; split and insert into buffer.
transactions_buffer.insert(version, txn);
}
},
Expand Down Expand Up @@ -308,7 +323,7 @@ impl Processor {
let gap_detector = Arc::new(Mutex::new(BTreeSet::new()));
let mut tasks = Vec::new();

for _ in 0..50 {
for _ in 0..self.validating_task_count {
let version_allocator = version_allocator.clone();
let gap_detector = gap_detector.clone();
let file_operator = self.file_store_operator.clone_box();
Expand Down Expand Up @@ -339,7 +354,7 @@ impl Processor {
// Check the gap detector and update the progress file.
let task = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(30000)).await;
tokio::time::sleep(Duration::from_millis(5000)).await;
loop {
if current_version >= expected_end_version {
return Ok(());
Expand Down

0 comments on commit ed3968e

Please sign in to comment.