Skip to content

Conversation

@whankinsiv
Copy link
Collaborator

@whankinsiv whankinsiv commented Dec 5, 2025

Description

This PR extends the custom_indexer module to support multiple independent indexes within a single indexer instance. Each index is registered using add_index, which creates a dedicated channel for receiving transaction and rollback events. All messages include a oneshot response channel, allowing the manager task to update cursor state and remove corrupted indexes.

The error handling model is explicit and prevents corrupted index state from persisting across runs:

  • Decode failure: Indicates invalid transaction bytes (should never occur for valid blocks). The index is halted immediately and awaits the next rollback event.
  • Handle failure: Indicates an error in user-defined index logic. The index is halted and will be reset on the next process start due to the persisted halted = true flag.
  • Rollback failure: If an index cannot roll back cleanly, the index attempts a full reset.
    • If reset succeeds, the chain sync point is rewound.
    • If reset fails, the index is removed from the runtime (senders). On next startup, the persisted halt flag causes a fresh reset.

Related Issue(s)

Completes #380

How was this tested?

  • Added full coverage for index_actor (apply tx, rollback, reset paths).
  • Verified that the example indexer runs multiple indexes with different starting point correctly.

Checklist

  • My code builds and passes local tests
  • I added/updated tests for my changes, where applicable
  • I updated documentation (if applicable)
  • CI is green for this PR

Impact / Side effects

Adds safe recovery behavior and multi-index support to the custom indexer.

Reviewer notes / Areas to focus

Rollback failure behavior and the halt/persist/reset flow

Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
…ndex channels

Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
Signed-off-by: William Hankins <william@sundae.fi>
@whankinsiv whankinsiv marked this pull request as ready for review December 6, 2025 00:27
Copy link
Collaborator

@lowhung lowhung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand the flow...

  1. When a new index is added, it spawns an actor per index, each with its own channel
  2. ChainSync messages come in on the subscription and fan out to all actors
  3. The actors process independently, then return a result with updated cursor entries
  4. Main loop collects responses + persists cursors + handles failures

let raw = self.cursor.get("cursor")?;
async fn load(&self) -> Result<HashMap<String, CursorEntry>> {
let mut out = HashMap::new();
let iter = self.partition.prefix("cursor/");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] We reference this prefix "cursor/" in a few places, should we keep it as a const in this file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added "cursor/" as a const in 7b5d429.


self.cursor.insert("cursor", raw)?;
for (name, point) in tips {
let key = format!("cursor/{name}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] A method in the cursor store to format the key and return it / get the prefix could be nice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added key_for, name_from_key, and prefix_iter helper methods in 7b5d429.

}

#[derive(Debug)]
pub struct CursorSaveError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Could leverage thiserror here

Suggested change
pub struct CursorSaveError {
#[derive(Debug, thiserror::Error)]
#[error("Failed to save cursor tips for: {failed:?}")]
pub struct CursorSaveError {
pub failed: Vec<String>,
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this suggestion! Switched to using thiserror in 7b5d429.

halted: false,
});

if force_restart || entry.halted {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍🏻

new_tips.insert(name.clone(), entry.clone());
change_sync_point(entry.tip, run_context.clone(), &sync_topic.to_string()).await?;
}
Ok(IndexResult::FatalResetError { entry, reason }) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clear flow here 👍🏻

}

#[tokio::test]
async fn rollback_fails_then_reset_succeeds_clears_halt_and_updates_tip() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clear tests 🔥 nicely done

@whankinsiv
Copy link
Collaborator Author

Just so I understand the flow...

  1. When a new index is added, it spawns an actor per index, each with its own channel
  2. ChainSync messages come in on the subscription and fan out to all actors
  3. The actors process independently, then return a result with updated cursor entries
  4. Main loop collects responses + persists cursors + handles failures

Yes, that's the flow 😄 One extra detail: the index actors themselves handle halting on error and attempt their own reset on rollback failure.

The main loop/manager just forwards events to the actors, persists tip/halting state into the cursor store, and removes any actors from the runtime that couldn't recover.

Comment on lines 81 to 85
if force_restart || entry.halted {
index.reset(&default_start).await?;
entry.tip = default_start.clone();
entry.halted = false;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by this block. If an index started on slot 50000 and then halted with an error on slot 60000, why are we resetting back to 50000? That implies we successfully processed a few thousand blocks, and I don't think we need to undo that work.

In general, we should avoid resetting indexes if the caller didn't explicitly ask. In production, it will almost certainly cause downtime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we reset to default_start when an index stops in a halted state is that we can't rely on receiving a corrective rollback. If a rollback fails or a block fails to decode, ChainSync won't send a second rollback message on the next run which leaves the index in a corrupted state with no further way to recover. Given that limitation, resetting on startup is the only reliable way to bring the index back to a healthy state.

Comment on lines +125 to +126
// If the rollback failed, attempt to reset the index
Err(_) => match wrapper.index.reset(&wrapper.default_start).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about resetting if a rollback fails; it could fail for arbitrary reasons, including for transient issues like a DB failure.

But I think in practice it'll be fine to do this:

  • rollbacks are relatively rare compared to roll forwards
  • most transient failures which happen in a rollback would probably happen in a rollforward, sooner rather than later
  • resets will almost certainly result in downtime in production, but the system will eventually recover by itself

So if it's possible to recover from a rollback without triggering this reset, that'd be ideal. But we can live without it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason the rollback failure path triggers a reset is that we don't get a second rollback message if the first one fails. Without that, the index is stuck in a corrupted state with no further corrective signal coming from ChainSync. A reset is the only guaranteed way to bring it back to a healthy state. If you think a retry inside the module would meaningfully reduce resets from transient failures I can add that.


let mut entry = cursors.get(&name).cloned().unwrap_or(CursorEntry {
tip: default_start.clone(),
halted: false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little thing, can we default this to true? That way, we call reset every time an index is created, which means callers can add (re)initialization logic in the reset method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to default to halted: true in 100d2d4.

Comment on lines +19 to +20

async fn reset(&mut self, start: &Point) -> Result<Point>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we return a point from this? I'm not sure what it makes sense to return besides the start it was passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent here was to give implementations the option to adjust their replay point on reset. For example, an index that only needs the last N blocks could implement reset to resume from a later point than the one provided. If we don't expect any index to diverge from the provided start, then returning a point isn't needed. I can switch it to () unless we want to support this kind of behavior.

Signed-off-by: William Hankins <william@sundae.fi>
@whankinsiv
Copy link
Collaborator Author

Merging. @SupernaviX and I discussed a refactor to improve halt recovery on subsequent runs which is outlined in #461. This will be implemented in a follow up PR after our milestones have completed as these are not required changes.

@whankinsiv whankinsiv merged commit 2e90625 into main Dec 8, 2025
2 checks passed
@whankinsiv whankinsiv deleted the whankinsiv/multi-index-custom-indexer branch December 8, 2025 20:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants