Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom module for fil pubsub #3978

Merged
merged 76 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
208c139
Add custom module for fil pubsub
elmattic Feb 20, 2024
c34a152
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 20, 2024
f416ff5
Update module documentation
elmattic Feb 20, 2024
0de0da3
Remove static lifetime
elmattic Feb 20, 2024
11a38c7
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 20, 2024
b8557ae
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 21, 2024
241bed0
improve error handling
lemmih Feb 21, 2024
ae32d31
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 21, 2024
6f0a154
Directly use notif and cancel method names
elmattic Feb 21, 2024
ec30207
Allow unused or dead code
elmattic Feb 21, 2024
7387045
ignore channel cancel requests
lemmih Feb 21, 2024
93868ee
Fix spellcheck
elmattic Feb 21, 2024
9f77646
Add dummy method for testing
elmattic Feb 21, 2024
e51c05e
Increase interval
elmattic Feb 21, 2024
ed369e0
remove unnecessary context
lemmih Feb 21, 2024
8699194
template for `register_subscription`
lemmih Feb 21, 2024
f34c6ed
simplify type signature for 'register_subscription'
lemmih Feb 21, 2024
8956f63
Remove unnecessary clone constraint
lemmih Feb 21, 2024
4265bca
dummy ChainNotify that listens to head changes
lemmih Feb 21, 2024
305b03f
construct proper chain notification messages
lemmih Feb 22, 2024
b09c804
define 'register_subscription' and simplify ChainNotify
lemmih Feb 22, 2024
ee1a889
simplify state handling
lemmih Feb 22, 2024
eae55ea
rename subscription -> channel
lemmih Feb 22, 2024
34b587e
Rename method
elmattic Feb 22, 2024
a79dcc2
move 'channel' map to RpcModule
lemmih Feb 22, 2024
938c45b
remove unused connection id
lemmih Feb 22, 2024
e2f8032
Move tokio task to chain_notify
elmattic Feb 22, 2024
63511ff
support cancelling channels
lemmih Feb 22, 2024
cbf9e75
remove unused code
lemmih Feb 22, 2024
3e24906
send close channel messages
lemmih Feb 22, 2024
37164df
assign unique channel ids
lemmih Feb 22, 2024
2d91d51
Better module-level comment
elmattic Feb 22, 2024
9debe7b
remove more unused code
lemmih Feb 22, 2024
3de81a8
Send current head when channel is opened
elmattic Feb 22, 2024
1d25116
Add debug traces
elmattic Feb 22, 2024
06bdf68
Add top-level documentation for channel lifetime
elmattic Feb 26, 2024
364b08b
Make doc more compact
elmattic Feb 26, 2024
766e8b1
Reorder some fields
elmattic Feb 26, 2024
d81b0f6
Change wording
elmattic Feb 26, 2024
364296c
Add Cancel method to access map
elmattic Feb 26, 2024
6cd72f0
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 26, 2024
bb9561f
Fix typo
elmattic Feb 26, 2024
de5aa95
Fix params parsing that was expecting a list
elmattic Feb 26, 2024
15c3e60
Update top-level documentation
elmattic Feb 26, 2024
6e11a10
remove unused function
lemmih Feb 26, 2024
75776af
simplify channel registration
lemmih Feb 26, 2024
266992e
rename subscription to channel
lemmih Feb 26, 2024
4bd924f
Move spaces
elmattic Feb 26, 2024
3a2e826
Move RpcModule inside channel.rs
elmattic Feb 26, 2024
515f040
Add debug trace for close response
elmattic Feb 26, 2024
a748621
Add support for close message (wip)
elmattic Feb 26, 2024
04e9292
Fix send of closing message
elmattic Feb 26, 2024
d2ae2d9
Better debug messages
elmattic Feb 26, 2024
9c3fdf6
Remove clone
elmattic Feb 26, 2024
72f5619
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 26, 2024
feb5981
Remove the todos
elmattic Feb 26, 2024
b19fd1d
Remove public visibility
elmattic Feb 26, 2024
a9e9f31
Remove expect
elmattic Feb 26, 2024
7417349
Apply clippy suggestions
elmattic Feb 27, 2024
065d09a
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 27, 2024
0268c12
Fix spellcheck error
elmattic Feb 27, 2024
318cb1d
Cleanup
elmattic Feb 27, 2024
0aed8ad
Refactor chain_notify function
elmattic Feb 27, 2024
69e3590
Change doc comment
elmattic Feb 27, 2024
fa6a0a0
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 27, 2024
007bef7
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 27, 2024
5323434
Remove usage of expect
elmattic Feb 27, 2024
15a697e
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 27, 2024
735c0ed
Remove need of fork of jsonrpsee
elmattic Feb 27, 2024
cdf2483
Merge branch 'main' into elmattic/filecoin-pubsub
elmattic Feb 27, 2024
22122d9
Remove Current variant
elmattic Feb 27, 2024
8b11eb9
remove unused code
lemmih Feb 27, 2024
7ecec84
check result value for errors
lemmih Feb 27, 2024
e8ad2f3
remove direct rustc_hash dependency
lemmih Feb 27, 2024
f2ad092
various clean up
lemmih Feb 27, 2024
d5c4177
more clean up
lemmih Feb 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ reqwest = { version = "0.11.24", default-features = false, features = [
] } # use rustls instead of native (openSSL) tls to drop the number of build dependencies
rlimit = "0.10.1"
rs-car-ipfs = "0.3"
rustc-hash = "1.1.0"
rustyline = "13"
scopeguard = "1.1.0"
semver = "1.0"
Expand Down
1 change: 1 addition & 0 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ pub mod headchange_json {
#[serde(tag = "type", content = "val")]
pub enum HeadChangeJson {
Apply(LotusJson<Tipset>),
Current(LotusJson<Tipset>),
}

impl From<HeadChange> for HeadChangeJson {
Expand Down
31 changes: 30 additions & 1 deletion src/rpc/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey};
use crate::chain::index::ResolveNullTipset;
use crate::chain::ChainStore;
use crate::chain::HeadChange;
use crate::cid_collections::CidHashSet;
use crate::lotus_json::LotusJson;
use crate::message::ChainMessage;
use crate::rpc::error::JsonRpcError;
use crate::rpc_api::data_types::{ApiMessage, ApiReceipt};
use crate::rpc_api::data_types::{ApiHeadChange, ApiMessage, ApiReceipt};
use crate::rpc_api::{
chain_api::*,
data_types::{ApiTipsetKey, BlockMessages, Data, RPCState},
Expand All @@ -28,6 +29,7 @@ use jsonrpsee::types::Params;
use once_cell::sync::Lazy;
use sha2::Sha256;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as Subscriber};
use tokio::sync::Mutex;

pub async fn chain_get_message<DB: Blockstore>(
Expand Down Expand Up @@ -445,6 +447,33 @@ pub(crate) async fn chain_get_min_base_fee<DB: Blockstore>(
Ok(min_base_fee.atto().to_string())
}

pub(crate) fn chain_notify<DB: Blockstore>(
_params: Params<'_>,
data: Arc<RPCState<DB>>,
) -> Subscriber<ApiHeadChange> {
let (sender, receiver) = broadcast::channel(100);

// As soon as the channel is created, send the current tipset
let current = data.chain_store.heaviest_tipset();
let (change, headers) = ("current".into(), current.block_headers().clone().into());
let _ = sender.send(ApiHeadChange { change, headers });
lemmih marked this conversation as resolved.
Show resolved Hide resolved

let mut subscriber = data.chain_store.publisher().subscribe();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let (change, headers) = match v {
HeadChange::Apply(ts) => ("apply".into(), ts.block_headers().clone().into()),
};

if sender.send(ApiHeadChange { change, headers }).is_err() {
break;
}
}
});
receiver
}

fn load_api_messages_from_tipset(
store: &impl Blockstore,
tipset: &Tipset,
Expand Down
Loading
Loading