Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync_mem_pool): use tokio based kafka impl #563

Merged
merged 1 commit into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/mem-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
custodian::AvailableCustodians,
mem_block::MemBlock,
restore_manager::RestoreManager,
sync::{mq::gw_kafka, publish::MemPoolPublishService},
sync::{mq::tokio_kafka, publish::MemPoolPublishService},
traits::{MemPoolErrorTxHandler, MemPoolProvider},
types::EntryList,
withdrawal::Generator as WithdrawalGenerator,
Expand Down Expand Up @@ -167,7 +167,7 @@ impl MemPool {
.publish
.map(|config| -> Result<MemPoolPublishService> {
log::info!("Setup fan out mem_block handler.");
let producer = gw_kafka::Producer::connect(config.hosts, config.topic)?;
let producer = tokio_kafka::Producer::connect(config.hosts, config.topic)?;
let handler = MemPoolPublishService::start(producer);
Ok(handler)
})
Expand Down
16 changes: 5 additions & 11 deletions crates/mem-pool/src/sync/mq/gw_kafka.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use anyhow::Result;
use async_trait::async_trait;
use gw_types::{
bytes::Bytes,
packed::{RefreshMemBlockMessage, RefreshMemBlockMessageUnion},
prelude::{Builder, Entity, Reader},
};
use rdkafka::{
consumer::{BaseConsumer, CommitMode, Consumer as RdConsumer},
message::ToBytes,
producer::{BaseRecord, ProducerContext, ThreadedProducer},
ClientConfig, ClientContext, Message,
};

use crate::sync::subscribe::SubscribeMemPoolService;
use crate::sync::{mq::RefreshMemBlockMessageFacade, subscribe::SubscribeMemPoolService};

use super::{Consume, Produce};

Expand Down Expand Up @@ -50,6 +48,7 @@ pub(crate) struct Producer {
}

impl Producer {
#[allow(dead_code)]
pub(crate) fn connect(hosts: Vec<String>, topic: String) -> Result<Self> {
let brokers = hosts.join(",");
let producer: ThreadedProducer<ProducerContextLogger> = ClientConfig::new()
Expand All @@ -61,10 +60,11 @@ impl Producer {
}
}

#[async_trait]
impl Produce for Producer {
type Msg = RefreshMemBlockMessageUnion;

fn send(&mut self, message: Self::Msg) -> Result<()> {
async fn send(&mut self, message: Self::Msg) -> Result<()> {
let msg = RefreshMemBlockMessage::new_builder().set(message).build();
let bytes = msg.as_bytes();
log::trace!("Producer send msg: {:?}", &bytes.to_vec());
Expand All @@ -86,6 +86,7 @@ pub(crate) struct Consumer {
}

impl Consumer {
#[allow(dead_code)]
pub(crate) fn start(
hosts: Vec<String>,
topic: String,
Expand Down Expand Up @@ -171,10 +172,3 @@ impl Consume for Consumer {
Ok(())
}
}

struct RefreshMemBlockMessageFacade(Bytes);
impl ToBytes for RefreshMemBlockMessageFacade {
fn to_bytes(&self) -> &[u8] {
&self.0
}
}
13 changes: 12 additions & 1 deletion crates/mem-pool/src/sync/mq/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use anyhow::Result;
use async_trait::async_trait;
use gw_types::bytes::Bytes;
use rdkafka::message::ToBytes;

pub(crate) mod gw_kafka;
pub(crate) mod tokio_kafka;

#[async_trait]
pub trait Produce {
type Msg;
fn send(&mut self, message: Self::Msg) -> Result<()>;
async fn send(&mut self, message: Self::Msg) -> Result<()>;
}

#[async_trait]
pub trait Consume {
async fn poll(&mut self) -> Result<()>;
}

pub(crate) struct RefreshMemBlockMessageFacade(Bytes);
impl ToBytes for RefreshMemBlockMessageFacade {
fn to_bytes(&self) -> &[u8] {
&self.0
}
}
140 changes: 140 additions & 0 deletions crates/mem-pool/src/sync/mq/tokio_kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use gw_types::{
packed::{RefreshMemBlockMessage, RefreshMemBlockMessageUnion},
prelude::{Builder, Entity, Reader},
};
use rdkafka::{
consumer::{CommitMode, Consumer as RdConsumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message,
};

use crate::sync::{mq::RefreshMemBlockMessageFacade, subscribe::SubscribeMemPoolService};

use super::{Consume, Produce};

pub(crate) struct Producer {
producer: FutureProducer,
topic: String,
}

impl Producer {
pub(crate) fn connect(hosts: Vec<String>, topic: String) -> Result<Self> {
let brokers = hosts.join(",");
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.set("enable.idempotence", "true")
.create()?;
// .create_with_context(ProducerContextLogger)?;
Ok(Self { producer, topic })
}
}

#[async_trait]
impl Produce for Producer {
type Msg = RefreshMemBlockMessageUnion;

async fn send(&mut self, message: Self::Msg) -> Result<()> {
let msg = RefreshMemBlockMessage::new_builder().set(message).build();
let bytes = msg.as_bytes();
log::trace!("Producer send msg: {:?}", &bytes.to_vec());
let message = RefreshMemBlockMessageFacade(bytes);

let record = FutureRecord::to(&self.topic).key("").payload(&message);
if let Err((err, _)) = self.producer.send(record, Duration::from_millis(0)).await {
log::error!("[kafka] send message failed: {:?}", &err);
}
Ok(())
}
}

pub(crate) struct Consumer {
consumer: StreamConsumer,
subscribe: SubscribeMemPoolService,
topic: String,
}

impl Consumer {
pub(crate) fn start(
hosts: Vec<String>,
topic: String,
group: String,
subscribe: SubscribeMemPoolService,
) -> Result<Self> {
let brokers = hosts.join(",");
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set("group.id", group)
.create()?;
Ok(Self {
consumer,
topic,
subscribe,
})
}
}

#[async_trait]
impl Consume for Consumer {
async fn poll(&mut self) -> Result<()> {
self.consumer.subscribe(&[&self.topic])?;
let msg = self.consumer.recv().await;
match msg {
Ok(msg) => {
let topic = msg.topic();
let partition = msg.partition();
let offset = msg.offset();
let payload = msg.payload();
log::trace!(
"Recv kafka msg: {}:{}@{}: {:?}",
topic,
partition,
offset,
&payload
);
if let Some(payload) = payload {
let refresh_msg = RefreshMemBlockMessage::from_slice(payload)?;
let reader = refresh_msg.as_reader();
let refresh_msg = reader.to_enum();
match &refresh_msg {
gw_types::packed::RefreshMemBlockMessageUnionReader::NextL2Transaction(
next,
) => {
if let Err(err) = self.subscribe.next_tx(next.to_entity()).await {
log::error!("[Subscribe tx] error: {:?}", err);
}
}
gw_types::packed::RefreshMemBlockMessageUnionReader::NextMemBlock(next) => {
match self.subscribe.next_mem_block(next.to_entity()).await {
Ok(None) => {
log::debug!("Invalid tip. Wait for syncing to the new tip.");
//Postpone this message, consume it later.
return Ok(());
}
Ok(Some(block_number)) => {
log::debug!("Refresh mem pool to {}", block_number);
}
Err(err) => {
log::error!("[Refresh mem pool] error: {:?}", err);
}
};
}
};
self.consumer.commit_message(&msg, CommitMode::Async)?;
log::trace!("Kafka commit offset: {}", offset);
};
}
Err(err) => {
log::error!("Receive error from kafka: {:?}", err);
}
}
Ok(())
}
}
14 changes: 7 additions & 7 deletions crates/mem-pool/src/sync/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ use gw_types::{
};
use tokio::sync::mpsc::{Receiver, Sender};

use super::mq::{gw_kafka, Produce};
use super::mq::{tokio_kafka, Produce};

const CHANNEL_BUFFER_SIZE: usize = 1000;
pub(crate) struct PublishMemPoolActor {
receiver: Receiver<RefreshMemBlockMessageUnion>,
producer: gw_kafka::Producer,
producer: tokio_kafka::Producer,
}

impl PublishMemPoolActor {
pub(crate) fn new(
receiver: Receiver<RefreshMemBlockMessageUnion>,
producer: gw_kafka::Producer,
producer: tokio_kafka::Producer,
) -> Self {
Self { receiver, producer }
}

fn handle(&mut self, msg: RefreshMemBlockMessageUnion) {
if let Err(err) = self.producer.send(msg) {
async fn handle(&mut self, msg: RefreshMemBlockMessageUnion) {
if let Err(err) = self.producer.send(msg).await {
log::error!("[Fan out mem block] message failed: {:?}", err);
}
}
Expand All @@ -34,7 +34,7 @@ impl PublishMemPoolActor {
async fn publish_handle(mut actor: PublishMemPoolActor) {
log::info!("Fanout handle is started.");
while let Some(msg) = actor.receiver.recv().await {
actor.handle(msg);
actor.handle(msg).await;
}
}

Expand All @@ -43,7 +43,7 @@ pub(crate) struct MemPoolPublishService {
}

impl MemPoolPublishService {
pub(crate) fn start(producer: gw_kafka::Producer) -> Self {
pub(crate) fn start(producer: tokio_kafka::Producer) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE);

let actor = PublishMemPoolActor::new(receiver, producer);
Expand Down
4 changes: 2 additions & 2 deletions crates/mem-pool/src/sync/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::sync::Mutex;

use crate::pool::MemPool;

use super::mq::{gw_kafka, Consume};
use super::mq::{tokio_kafka, Consume};

const CONSUME_LATENCY: u64 = 200;

Expand Down Expand Up @@ -63,7 +63,7 @@ pub fn spawn_sub_mem_pool_task(
topic,
group,
} = mem_block_config;
let mut consumer = gw_kafka::Consumer::start(hosts, topic, group, fan_in)?;
let mut consumer = tokio_kafka::Consumer::start(hosts, topic, group, fan_in)?;
tokio::spawn(async move {
log::info!("Spawn fan in mem_block task");
loop {
Expand Down