Skip to content

Commit

Permalink
[docs] Local ingestion custom indexer docs update and example (#18957)
Browse files Browse the repository at this point in the history
## Description 

Added the local ingestion custom indexer example as well as doc
improvement

## Test plan 

Manually

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:

---------

Co-authored-by: ronny-mysten <118224482+ronny-mysten@users.noreply.github.com>
  • Loading branch information
zihehuang and ronny-mysten authored Aug 12, 2024
1 parent 7d103a9 commit 5ed526a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 77 deletions.
96 changes: 24 additions & 72 deletions docs/content/guides/developer/advanced/custom-indexer.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ The most straightforward stream source is to subscribe to a remote store of chec
- Testnet: `https://checkpoints.testnet.sui.io`
- Mainnet: `https://checkpoints.mainnet.sui.io`

The checkpoint files are stored in the following format: `https://checkpoints.testnet.sui.io/<checkpoint_id>.chk`. You can download the checkpoint file by sending an HTTP GET request to the relevant URL. Try it yourself for checkpoint 1 at [https://checkpoints.testnet.sui.io/1.chk](https://checkpoints.testnet.sui.io/1.chk).

```mermaid
flowchart LR
A("fa:fa-cloud Cloud storage(S3, GCP)");
Expand All @@ -53,6 +55,12 @@ flowchart LR
B-->External
```

The Sui data ingestion framework provides a helper function to quickly bootstrap an indexer workflow.

{@inject: examples/custom-indexer/rust/remote_reader.rs}

This is suitable for setups with a single ingestion pipeline where progress tracking is managed outside of the framework.

### Local reader

Colocate the data ingestion daemon with a Full node and enable checkpoint dumping on the latter to set up a local stream source. After enabling, the Full node starts dumping executed checkpoints as files to a local directory, and the data ingestion daemon subscribes to changes in the directory through an inotify-like mechanism. This approach allows minimizing ingestion latency (checkpoint are processed immediately after a checkpoint executor on a Full node) and getting rid of dependency on an externally managed bucket.
Expand Down Expand Up @@ -84,99 +92,43 @@ flowchart LR
C<-->D("fa:fa-floppy-disk Progress store");
```


### Hybrid mode

Specify both a local and remote store as a fallback to ensure constant data flow. The framework always prioritizes locally available checkpoint data over remote data. It's useful when you want to start utilizing your own Full node for data ingestion but need to partially backfill historical data or just have a failover.


## Examples

The Sui data ingestion framework provides a helper function to quickly bootstrap an indexer workflow.
```rust
struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
...
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let (executor, term_sender) = setup_single_workflow(
CustomWorker,
"https://checkpoints.mainnet.sui.io".to_string(),
0, /* initial checkpoint number */
5, /* concurrency */
None, /* extra reader options */
).await?;
executor.await?;
Ok(())
}
```
This is suitable for setups with a single ingestion pipeline where progress tracking is managed outside of the framework.

For more complex setups, refer to the following example:
```rust
struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
...
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let (exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());
let progress_store = FileProgressStore::new("path_to_file");
let mut executor = IndexerExecutor::new(progress_store, 1 /* number of workflow types */, metrics);
let worker_pool = WorkerPool::new(CustomWorker, "custom worker", 100);
executor.register(worker_pool).await?;
executor.run(
PathBuf::from("..."), // path to a local directory
Some("https://checkpoints.mainnet.sui.io".to_string()),
vec![], // optional remote store access options
exit_receiver,
).await?;
Ok(())
}
```
{@inject: examples/custom-indexer/rust/local_reader.rs}

Let's highlight a couple lines of code:

```rust
let worker_pool = WorkerPool::new(CustomWorker, "custom worker", 100);
let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency);
executor.register(worker_pool).await?;
```

The data ingestion executor can run multiple workflows simultaneously. For each workflow, you need to create a separate worker pool and register it in the executor. The `WorkerPool` requires an instance of the `Worker` trait, the name of the workflow (which is used for tracking the progress of the flow in the progress store and metrics), and concurrency.

The concurrency parameter specifies how many threads the workflow uses. Having a concurrency value greater than 1 is helpful when tasks are idempotent and can be processed in parallel and out of order. The executor only updates the progress/watermark to a certain checkpoint when all preceding checkpoints are processed.

## Source code for an implementation {#source-code}
### Hybrid mode

Specify both a local and remote store as a fallback to ensure constant data flow. The framework always prioritizes locally available checkpoint data over remote data. It's useful when you want to start utilizing your own Full node for data ingestion but need to partially backfill historical data or just have a failover.
```rust
executor.run(
PathBuf::from("./chk".to_string()), // path to a local directory
Some("https://checkpoints.testnet.sui.io".to_string()), // Remote Checkpoint Store
vec![], // optional remote store access options
ReaderOptions::default(),
exit_receiver,
).await?;
```

Find the following source code in the [Sui repo](https://github.com/mystenlabs/sui/tree/main/examples/custom-indexer/rust).

### Manifest

Code for the cargo.toml manifest file for the custom indexer.

{@inject: examples/custom-indexer/rust/cargo.toml}

### Rust source
## Source code for an implementation {#source-code}

Code for the main.rs file that creates the custom indexer.
Find the following source code in the [Sui repo](https://github.com/mystenlabs/sui/tree/main/examples/custom-indexer/rust).

{@inject: examples/custom-indexer/rust/main.rs}

## Related links

Expand Down
12 changes: 9 additions & 3 deletions examples/custom-indexer/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ async-trait = "0.1.81"
tokio = { version = "1.38.0", features = ["full"]}
sui_types = { git = "https://github.com/mystenlabs/sui", package = "sui-types"}
sui_data_ingestion_core = { git = "https://github.com/mystenlabs/sui", package = "sui-data-ingestion-core"}
prometheus = "0.13.3"
anyhow = "1.0.86"

[workspace]
[[bin]]
name = "local_reader"
path = "local_reader.rs"

[[bin]]
name = "custom-indexer"
path = "main.rs"
name = "remote_reader"
path = "remote_reader.rs"

[workspace]

29 changes: 27 additions & 2 deletions examples/custom-indexer/rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@ cargo build
```

## How to run
```bash
cargo run main.rs
### Remote Reader example
```sh
cargo run --bin remote_reader
```

### Local Reader example
The local reader example saves progress in a file called `/tmp/local_reader_progress` and monitors checkpoint files in the `chk` directory


To test the local reader example, create the `/tmp/local_reader_progress` file first
```sh
echo "{\"local_reader\": 1}" > /tmp/local_reader_progress
```

then, create the `chk` directory in the same level as the `local_reader.rs` file
```sh
mkdir -p chk
```

then, run the local reader example
```sh
cargo run --bin local_reader
```

Finally, copy the checkpoint files to the `chk` directory and the program should process the checkpoint files as they come in
```sh
cp $YOUR_CHECKPOINT_FILE chk/
```
46 changes: 46 additions & 0 deletions examples/custom-indexer/rust/local_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use tokio::sync::oneshot;
use anyhow::Result;
use async_trait::async_trait;
use sui_types::full_checkpoint_content::CheckpointData;
use sui_data_ingestion_core as sdic;
use sdic::{Worker, WorkerPool, ReaderOptions};
use sdic::{DataIngestionMetrics, FileProgressStore, IndexerExecutor};
use prometheus::Registry;
use std::path::PathBuf;
use std::env;

struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
println!("Processing Local checkpoint: {}", checkpoint.checkpoint_summary.to_string());
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let concurrency = 5;
let (exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());
let backfill_progress_file_path =
env::var("BACKFILL_PROGRESS_FILE_PATH").unwrap_or("/tmp/local_reader_progress".to_string());
let progress_store = FileProgressStore::new(PathBuf::from(backfill_progress_file_path));
let mut executor = IndexerExecutor::new(progress_store, 1 /* number of workflow types */, metrics);
let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency);

executor.register(worker_pool).await?;
executor.run(
PathBuf::from("./chk".to_string()), // path to a local directory
None,
vec![], // optional remote store access options
ReaderOptions::default(), /* remote_read_batch_size */
exit_receiver,
).await?;
Ok(())
}
File renamed without changes.

0 comments on commit 5ed526a

Please sign in to comment.