Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Farming cluster #2758

Merged
merged 5 commits into from
May 21, 2024
Merged

Farming cluster #2758

merged 5 commits into from
May 21, 2024

Conversation

nazar-pc
Copy link
Member

This introduces farming cluster. General architecture is described in module description of crates/subspace-farmer/src/cluster.rs.

On CLI level there is a new cluster command with a few generic options and 4 subcommands with their specific options: controller, cache, plotter and farmer.

They may be running independently or together like this:

subspace-farmer cluster --nats-server nats://IP:4222 \
    controller \
        --base-path /path/to/controller-dir \
        --node-rpc-url ws://IP:9944 \
    -- \
    cache \
        path=/path/to/cache,size=SIZE \
    -- \
    plotter \
    -- \
    farmer \
        --reward-address REWARD_ADDRESS \
        path=/path/to/farm,size=SIZE

-- can be used to separate instantiation of different components if desired.

Farming cluster uses NATS for communication between components, so NATS server or cluster or one of the more complex setups is necessary for all of this to work. Annoyingly, max_payload = 2MB configuration option is needed to be specified, which in turn is only possible with config file right now, so config file with max_payload = 2MB in it needs to be created and then its path specified as an argument to NATS server:

nats -c nats.config

Most of the code of crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster is fairly straightforward and is mostly refactored from farmer code. crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs's submodules caches.rs and farms.rs show how discovery and maintenance of caches and farms is done. I considered moving them under library code, but not fully sure how it will change when metrics are added, so left it in CLI part of the codebase for now.

All of the cluster-specific logic (including cluster versions of farms, caches, plotter and other things) are in submodules of crates/subspace-farmer/src/cluster.rs. Requests, broadcasts and notifications in there are made private as much as possible and structured according to ownership/origin rules.

Most of the refactoring that was necessary to make this happen already landed earlier, so this PR is focused on farming cluster code.

Yes, the code is quite lengthy, but it is mostly a boilerplate with a bunch of things happening concurrently in numerous async functions at the same time.

Early testing happened on the forum and allowed to resolve some of the pain points alrady, but more experimentation is necessary: https://forum.subspace.network/t/farming-cluster/3041?u=nazar-pc

Resolves #2402

Code contributor checklist:

@tediou5
Copy link
Contributor

tediou5 commented May 13, 2024

I've been dealing with the high availability issues of nodes in the farmer cluster, hoping I can contribute in some way. I've read through the relevant code, and there are a few areas that I find confusing.

  1. I noticed that in the current implementation, the reward signature isn't necessarily returned to the requesting node.
  /// Submit a block signature
    async fn submit_reward_signature(
        &self,
        reward_signature: RewardSignatureResponse,
    ) -> Result<(), NodeClientError> {
        let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
        Ok(self.nats_client .notification(
                &ClusterControllerRewardSignatureNotification { reward_signature },
                Some(&last_slot_info_instance)).await?)
    }

Would this approach cause any issues?
2. the update of the current last_slot_info_instance doesn't check its number, which might result in a scenario where a node consistently lags behind the highest block height, causing it to continuously broadcast incorrect slots and consequently resulting in an incorrect last_slot_info_instance.

async fn subscribe_slot_info(
        &self,
    ) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, NodeClientError> {
        let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
        let subscription = self.nats_client.subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None).await?
            .map(move |broadcast| {
                *last_slot_info_instance.lock() = broadcast.instance;
                broadcast.slot_info
            });
        Ok(Box::pin(subscription))
    }

I think we can maintain a data structure for node status, which stores the latest SlotInfo for each node and sorts them based on SlotInfo.number. By providing a range method, we can retrieve available nodes during submission and trigger notifications when larger slots occur. I've implemented a data structure for this purpose(https://github.com/tediou5/valord-map). Regarding reward signature, perhaps we can record the instance when receiving the request.
may i do sth for this :)

@nazar-pc
Copy link
Member Author

Thanks for reviewing the code!

I noticed that in the current implementation, the reward signature isn't necessarily returned to the requesting node.

I can't answer that because I have no idea what issues you have exactly, but it doesn't matter which node produces block/vote.

the update of the current last_slot_info_instance doesn't check its number, which might result in a scenario where a node consistently lags behind the highest block height, causing it to continuously broadcast incorrect slots and consequently resulting in an incorrect last_slot_info_instance.

In this case you need to fix the node that is lagging behind. Slots are de-duplicated by NATS and within narrow time window it should not matter at all. The expectation is that you have multiple synced nodes connected to corresponding controllers and it doesn't matter which one you're talking to exactly.

If one of the node goes out of sync it'll stop producing slot info notifications until it is back in sync, so storing any instance as last is perfectly fine, it is just important to submit signature back to the node that we sent solution to, which I think where the issue might be.

I just pushed a commit with a fix for this that sends reward signature to all controllers. There should not be too many rewards happening concurrently, so this shouldn't be an issue, and nodes that don't expect reward signature will simply ignore it.

Triggered new build with this change here: https://github.com/subspace/subspace/actions/runs/9070721118

Thanks!

…ncy and full segment headers cache on controller
@tediou5
Copy link
Contributor

tediou5 commented May 16, 2024

lgtm, thanks for your work

@shamil-gadelshin
Copy link
Member

This PR is overwhelmingly complex as it is (the latest discussion showed how much I missed during the first reviews). Please, split it into 3-5 parts similar to the fast sync PR series. Don't close this one - it will be a good reference to grasp the whole picture.

@nazar-pc
Copy link
Member Author

This PR is overwhelmingly complex as it is (the latest discussion showed how much I missed during the first reviews). Please, split it into 3-5 parts similar to the fast sync PR series. Don't close this one - it will be a good reference to grasp the whole picture.

The only reason I opened it this way is because there are no parts that really make a lot of sense on their own. I can split individual files or directories into separate PRs, but I do not think it will necessarily help with reviews.

Open for suggestions on how exactly you see that.

@shamil-gadelshin
Copy link
Member

The only reason I opened it this way is because there are no parts that really make a lot of sense on their own. I can split individual files or directories into separate PRs, but I do not think it will necessarily help with reviews.

Open for suggestions on how exactly you see that.

I see two options here:

  • workflow-based PRs (vertical split): each PR contains only the code related to some workflow (e.g.: plotting or caching) across all components. Likely will demand more effort to split the existing codebase.
  • component-based PRs: separate code related to components and "the rest" (infrastructure code for all components). It will make less sense but at least will decrease the PR chunks to a manageable size.

Probably, both options require some "accumulating branch" before the final merge to keep the main branch consistent.

@nazar-pc
Copy link
Member Author

The thing is code here doesn't have any infrastructure, it all went into previous PRs. This PR contains mostly boiler-plate.

The split is already done over logical files, you can just review them individually if it makes sense that way for you, but it will not be complete since the only truly standalone thing here is plotter, everything else depends on each other at least a little bit due to message types.

This PR doesn't modify existing code significantly and only introduces new feature as an atomic consistent change set. I do not understand how splitting this up and breaking full picture (epecially after explaining during walkthrough how it is organized) would help with review. I really don't.

@shamil-gadelshin
Copy link
Member

I got these confusing log lines on start:

2024-05-20T06:38:07.531973Z  INFO async_nats: event: connected
2024-05-20T06:38:07.531973Z  INFO async_nats: event: connected
2024-05-20T06:38:07.532076Z  INFO async_nats: event: connected
2024-05-20T06:38:07.532117Z  INFO async_nats: event: connected
2024-05-20T06:38:07.532116Z  INFO async_nats: event: connected
2024-05-20T06:38:07.532316Z  INFO async_nats: event: connected
2024-05-20T06:38:07.532361Z  INFO async_nats: event: connected
2024-05-20T06:38:07.532525Z  INFO async_nats: event: connected

Is there a way to get rid of them?

crates/subspace-farmer/src/cluster/plotter.rs Show resolved Hide resolved
/// Might be `None` if instance had to respond, but turned out it was fully occupied already
type Response = Option<String>;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State machine description would be helpful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really a state machine, more like a stream of events and they all happen exactly in the order defined here and match general sector creation workflow (download pieces, encode sector, write sector to disk). This enum is simply used for data transfer over the network.

pub(super) additional_components: Vec<String>,
}

pub(super) async fn farmer<PosTable>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is very complex. Consider splitting it into several methods with self-documenting names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to agree, but so is farm method that this is largely based on. I'd rather look at them both and refactor them together. In fact the code here is almost identical to farm with the difference being that node client and plotter are remote rather than local and plotted/cached contents no longer needs to be maintained because controller does it in this case.

/// Read piece index from cache
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterCacheReadPieceIndexRequest {
offset: PieceCacheOffset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question as below: why do we need an offset as part of the API on the cluster level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained during walkthrough both cluster and non-cluster versions are having the same exact API. Cache doesn't store its mapping internally and only does contents scanning on farmer cache's request (see ClusterCacheContentsRequest). Since method read_piece_index has offset as an argument, we have an identical property in this request here.

&self,
) -> Result<
Box<
dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we expose inner details (offsets and slot occupation) here instead of available piece indexes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that is an API cache is working with. If we just know what indices are present, but not where they are, we'll not be able to read them later.

anyhow!("Failed to subscribe to archived segment header notifications: {error}")
})?;

info!("Downloading all segment headers from node...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block seems like an unextracted method.

})
}

fn piece_cache(&self) -> Arc<dyn PieceCache + 'static> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the only place for DummyPieceCache here. Do we need to implement piece_cache for ClusterFarm in the first place? Can't we split the Farm train in two and don't implement unused methods? The same question for another dummy struct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piece cache is a used method, it is just this kind of farm doesn't contain piece cache (though it technically could). We could refactor things and maybe will in the future, but not sure how exactly and what benefit it'll bring. I'll think about it though, seems like a logical thing to do.

Copy link
Member Author

@nazar-pc nazar-pc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get rid of them?

Not right now, but hopefully soon, see discussion at nats-io/nats.rs#1260

/// Read piece index from cache
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterCacheReadPieceIndexRequest {
offset: PieceCacheOffset,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained during walkthrough both cluster and non-cluster versions are having the same exact API. Cache doesn't store its mapping internally and only does contents scanning on farmer cache's request (see ClusterCacheContentsRequest). Since method read_piece_index has offset as an argument, we have an identical property in this request here.

&self,
) -> Result<
Box<
dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that is an API cache is working with. If we just know what indices are present, but not where they are, we'll not be able to read them later.

})
}

fn piece_cache(&self) -> Arc<dyn PieceCache + 'static> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piece cache is a used method, it is just this kind of farm doesn't contain piece cache (though it technically could). We could refactor things and maybe will in the future, but not sure how exactly and what benefit it'll bring. I'll think about it though, seems like a logical thing to do.

/// Might be `None` if instance had to respond, but turned out it was fully occupied already
type Response = Option<String>;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really a state machine, more like a stream of events and they all happen exactly in the order defined here and match general sector creation workflow (download pieces, encode sector, write sector to disk). This enum is simply used for data transfer over the network.

crates/subspace-farmer/src/cluster/plotter.rs Show resolved Hide resolved
pub(super) additional_components: Vec<String>,
}

pub(super) async fn farmer<PosTable>(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to agree, but so is farm method that this is largely based on. I'd rather look at them both and refactor them together. In fact the code here is almost identical to farm with the difference being that node client and plotter are remote rather than local and plotted/cached contents no longer needs to be maintained because controller does it in this case.

@shamil-gadelshin shamil-gadelshin self-requested a review May 21, 2024 08:45
Copy link
Member

@shamil-gadelshin shamil-gadelshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we're not going to refactor the cluster much or change the PR structure.

Here are my comments:

  • it would benefit from some form of distributed tracing (could be added later when we confirm whether it's necessary and the current logs are not enough)
  • we shouldn't preserve the cluster API to be similar to the local API (cache is a good example with exposed inner abstractions)
  • stream of smaller PRs could simplify the review in a similar fashion that we had for fast-sync
  • avoid large god-methods
  • add more documentation explaining the structures/enums/methods (warn missing docs directive might help in this case)
  • dataflows diagrams could help a lot during the review.

In general, the new component NATS (as well as the bus communication pattern) is responsible for the most complexity of the PR. The code itself is easy to read and rather straightforward.

@nazar-pc
Copy link
Member Author

I'll address some of this feedback in the upcoming PRs

@nazar-pc nazar-pc added this pull request to the merge queue May 21, 2024
Merged via the queue into main with commit 873e5e8 May 21, 2024
9 checks passed
@nazar-pc nazar-pc deleted the farming-cluster branch May 21, 2024 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Farming cluster
3 participants