-
Notifications
You must be signed in to change notification settings - Fork 12
Informer Code Structure
Handler is responsible for run websocket and also manages subscription.
If a user connects to Handler, handler will call a function like this: informer_service.add_newConnetion(Connection::new(sink, interested_events))
struct Handler{
handler: PubSubHandler<T>
}
impl Handler{
pub fn start_ws() {
/// run websocket and RPC
}
pub fn event_subsribtion{
/// Manage Subscriptions
/// Create Connection
/// Send the Connection to InformerService
}
struct Connection {
pub status: bool,
pub sink: Sink,
pub intrested_event: Vec<Events>,
}
- IManage who wants listen which events.
- Receive events from event sources (like network module) and sends or broadcast it to clients.
- Handle its own thread and manage two
mpsc channel
to recieve Events and send Events to subscribers. - If event source calls a function like
write_event(event_name: &str, params: ??)
, InformerService will send events to connections like this:sink.send(event_name, ??)
. - InformerService
struct InformerService {
connections: Vec<Connection>,
pub sender : NotifySender,
receiver: Receiver<Connection>
}
impl InformerService{
pub fn run_service(){
/// establish the thread
/// manange channels in a single point
}
pub fn notify_clients(event){
/// send events to all clients that are subscribed to that event
}
enum Events {
PeerConnection {
message: String
socket_address: SocketAddr
token: usize
}
...
}
-
How other code send Informer events. This is what I
struct Handler{ connecting_lock: Mutex<()>, channel: IoChannel<Message>, ... pub sender : NotifySender, .... } impl Handler{ ... pub fn connect(...){ cinfo!(NETWORK, "New connection to {}({})", socket_address, token); peerConnection:Event = Event::PeerConnection{message:"New connection", socket_adrress: socket_address, token: token) self.sender.notify(PeerConnection); ... }
- Handler should notify InformerService when connection is established or terminated. This relation is managed by a
mpsc channel
that will send new connection to InformerService. - NotifySender is a struct to wrap
mpsc::channel::Sender
. NotifySender will deliever events to InformerService connections. - NotifySender has a function notify that manages sending events over the channel that is defined for sending events.
We call an event cold that is in the history of blocks. Examples of this type of events are A block is generate
, A transaction is mined
, An account's balance is changed
. A client can subscribe itself to a specific event and also specify the start block point, foundry will recover event history and inform the client up until now. Afterward, the client can be kept informed by foundry for pop-up events. This type of event subscription can not be covered by the above structure. Therefore, we need a new structure for cold events.
The subscription handler is the same as the above.
The informer service part needs a strategy.
- The first strategy is to catch all events history in advance; then, as soon as a new client subscribed to an event, foundry informs the client from the point the clients wish.
- The second one is to look up the history of events as soon as the subscription begins.
- The last one could be the combination of these two strategies. Storing partial history, if the startpoint was not covered in the stored file, we can look up the history of blocks and find the specific time point and then update both the stored file and inform the client.
for the first one, we need to store all history of events, it could be stored as a file in a node, then look up the file for a specific time point and then fetch data and inform the client. In addition, as soon as a new event happens, update the stored file. for the second on, we do not need to store additional data, but we need to look up the blocks for each subscription request.
Foundry takes advantage of the second strategy to inform clients.
The handler of cold events is the same as the handler of Hot events. Clients send a similar event subscription request to foundry. However, the way that foundry process the request is different.
ColdEvents enum is designed for referring and storing cold events like BlockGeneration
.
enum ColdEvents {
BlockGeneration {
hash: String,
...
},
}
When a new cold event is added to the client's favorite events, add_event
function calls generate_cold_repsonse
function and notify clients to the client in the process of adding the event. generate_cold_repsonse
is a client of RPC and gain the necessary information to process the request and produce the response.
fn add_events(params:Vec<String>){
match event {
"BlockGeneration" =>{
block_event = ColdEvents::BlockGeneration(..)
params = generate_cold_response(event)
hot_event = Events::BlockGeneration(..)
intrested_events.push(hot_event)
}
}
fn generate_cold_response(event){
// read data of a specific block and its
// successors by sending RPC request to foundry
// Notifying clients with limited rate
}
struct InformerService {
cold_event_generators: Vec<(Connection, dyn ColdEventGenerator)>
}
impl InformerService {
fn register_cold_event(&mut self, event: Event) {
self.cold_event_generators.push(...);
}
fn deregister_cold_event(&mut self, event: Event) {
self.cold_event_generator.remove(...)
}
}
// In a thread like cold event worker thread
loop {
for (connection, generator) in self.cold_event_generators {
let events = generator.try_gen();
if !events.is_empty() {
connection.send_events(events);
}
}
sleep();
}
trait ColdEventGenerator {
fn try_gen(&mut self) -> Vec<ColdEvent>;
}
struct BlockCreatedEventGenerator {
current_blocknumber: u64;
rate_limiter: RateLimiter;
}
impl ColdEventGenerator for BlockCreatedEventGenerator {
fn try_gen(&mut self) -> Vec<ColdEvent> {
if self.rate_limiter.too_fast() {
return Vec::new()
}
let block = chain.get_block(self.current_blocknumber);
rate_limiter.push(1);
self.current_blocknumber += 1;
return vec![block]
}
}
The client can choose the start point in the time for its favorite event. After the client received all the history of the event, foundry will keep informing the client for new event occurrence. Here is the responsibility of InformerService
to manage this connection and notify it whenever the event happens.
enum Events {
PeerConnection(..),
//indeed, it is a cold event
BlockGeneration(..)
}
Todo