Skip to content

Commit e1b4e8c

Browse files
committed
[+] Add async processing in kafka
1 parent 210e36b commit e1b4e8c

File tree

2 files changed

+201
-1
lines changed

2 files changed

+201
-1
lines changed

kafka/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,22 @@ edition = "2018"
1010
tokio = { version = "1.8.1", features = ["full"] }
1111
clap = "2.33.3"
1212
log = "0.4.14"
13+
rand = "0.8.4"
14+
futures = "0.3.15"
1315
chrono = "0.4.19"
1416
env_logger = "0.9.0"
1517
rdkafka = { version = "0.26", features = ["cmake-build"] }
18+
reqwest = { version = "0.11", features = ["blocking", "json"] }
19+
serde = { version = "1.0.126", features = ["derive"] }
1620

1721
[[bin]]
1822
name = "producer"
1923
path = "src/producer.rs"
2024

2125
[[bin]]
2226
name = "consumer"
23-
path = "src/consumer.rs"
27+
path = "src/consumer.rs"
28+
29+
[[bin]]
30+
name = "async_processing"
31+
path = "src/async_processing.rs"

kafka/src/async_processing.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use clap::{value_t, App, Arg};
2+
use futures::stream::FuturesUnordered;
3+
use futures::{StreamExt, TryStreamExt};
4+
use log::info;
5+
use rand::Rng;
6+
use rdkafka::config::ClientConfig;
7+
use rdkafka::consumer::stream_consumer::StreamConsumer;
8+
use rdkafka::consumer::Consumer;
9+
use rdkafka::message::{BorrowedMessage, OwnedMessage};
10+
use rdkafka::producer::{FutureProducer, FutureRecord};
11+
use rdkafka::Message;
12+
use serde::Deserialize;
13+
use std::time::Duration;
14+
15+
use crate::utils::setup_logger;
16+
17+
mod utils;
18+
19+
#[derive(Deserialize, Debug)]
20+
struct ResponseJSONPlaceholder {
21+
title: String,
22+
}
23+
24+
fn fetch_json_placeholder() -> Result<String, Box<dyn std::error::Error>> {
25+
let num = rand::thread_rng().gen_range(0..100);
26+
let url = format!("https://jsonplaceholder.typicode.com/todos/{}", &num);
27+
let resp = reqwest::blocking::Client::new()
28+
.get(url)
29+
.send()?
30+
.json::<ResponseJSONPlaceholder>()?;
31+
32+
Ok(resp.title)
33+
}
34+
35+
async fn record_borrowed_message_receipt(msg: &BorrowedMessage<'_>) {
36+
// Simulate some work that must be done in the same order as messages are
37+
// received; i.e., before truly parallel processing can begin.
38+
info!("Message received: {}", msg.offset());
39+
}
40+
41+
async fn record_owned_message_receipt(_msg: &OwnedMessage) {
42+
// Like `record_borrowed_message_receipt`, but takes an `OwnedMessage`
43+
// instead, as in a real-world use case an `OwnedMessage` might be more
44+
// convenient than a `BorrowedMessage`.
45+
}
46+
47+
fn get_title(msg: OwnedMessage) -> String {
48+
match msg.payload_view::<str>() {
49+
Some(Ok(_)) => match fetch_json_placeholder() {
50+
Ok(title) => format!("Payload title: {}", title),
51+
Err(_) => "Error: fetch_json_placeholder somethings".to_owned(),
52+
},
53+
Some(Err(_)) => "Message payload is not a string".to_owned(),
54+
None => "No payload".to_owned(),
55+
}
56+
}
57+
58+
async fn run_async_processor(
59+
brokers: String,
60+
group_id: String,
61+
input_topic: String,
62+
output_topic: String,
63+
) {
64+
let consumer: StreamConsumer = ClientConfig::new()
65+
.set("group.id", &group_id)
66+
.set("bootstrap.servers", &brokers)
67+
.set("enable.partition.eof", "false")
68+
.set("session.timeout.ms", "6000")
69+
.set("enable.auto.commit", "false")
70+
.create()
71+
.expect("Consumer creation failed");
72+
73+
consumer
74+
.subscribe(&[&input_topic])
75+
.expect("Can't subscribe to specified topic");
76+
77+
// Create the `FutureProducer` to produce asynchronously.
78+
let producer: FutureProducer = ClientConfig::new()
79+
.set("bootstrap.servers", &brokers)
80+
.set("message.timeout.ms", "5000")
81+
.create()
82+
.expect("Producer creation error");
83+
84+
// Create the outer pipeline on the message stream.
85+
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
86+
let producer = producer.clone();
87+
let output_topic = output_topic.to_string();
88+
async move {
89+
// Process each message
90+
record_borrowed_message_receipt(&borrowed_message).await;
91+
// Borrowed messages can't outlive the consumer they are received from, so they need to
92+
// be owned in order to be sent to a separate thread.
93+
let owned_message = borrowed_message.detach();
94+
record_owned_message_receipt(&owned_message).await;
95+
tokio::spawn(async move {
96+
// The body of this block will be executed on the main thread pool,
97+
// but we perform `get_title` on a separate thread pool
98+
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
99+
let computation_result = tokio::task::spawn_blocking(|| get_title(owned_message))
100+
.await
101+
.expect("failed to wait for expensive computation");
102+
let produce_future = producer.send(
103+
FutureRecord::to(&output_topic)
104+
.key("some key")
105+
.payload(&computation_result),
106+
Duration::from_secs(0),
107+
);
108+
match produce_future.await {
109+
Ok(delivery) => println!("Sent: {:?}", delivery),
110+
Err((e, _)) => println!("Error: {:?}", e),
111+
}
112+
});
113+
Ok(())
114+
}
115+
});
116+
117+
info!("Starting event loop");
118+
stream_processor.await.expect("stream processing failed");
119+
info!("Stream processing terminated");
120+
}
121+
122+
#[tokio::main]
123+
async fn main() {
124+
let matches = App::new("Async example")
125+
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
126+
.about("Asynchronous processing example")
127+
.arg(
128+
Arg::with_name("brokers")
129+
.short("b")
130+
.long("brokers")
131+
.help("Broker list in kafka format")
132+
.takes_value(true)
133+
.default_value("localhost:9092"),
134+
)
135+
.arg(
136+
Arg::with_name("group-id")
137+
.short("g")
138+
.long("group-id")
139+
.help("Consumer group id")
140+
.takes_value(true)
141+
.default_value("consumer_group_id"),
142+
)
143+
.arg(
144+
Arg::with_name("log-conf")
145+
.long("log-conf")
146+
.help("Configure the logging format (example: 'rdkafka=trace')")
147+
.takes_value(true),
148+
)
149+
.arg(
150+
Arg::with_name("input-topic")
151+
.long("input-topic")
152+
.help("Input topic")
153+
.takes_value(true)
154+
.required(true),
155+
)
156+
.arg(
157+
Arg::with_name("output-topic")
158+
.long("output-topic")
159+
.help("Output topic")
160+
.takes_value(true)
161+
.required(true),
162+
)
163+
.arg(
164+
Arg::with_name("num-workers")
165+
.long("num-workers")
166+
.help("Number of workers")
167+
.takes_value(true)
168+
.default_value("1"),
169+
)
170+
.get_matches();
171+
172+
setup_logger(true, matches.value_of("log-conf"));
173+
174+
let brokers = matches.value_of("brokers").unwrap();
175+
let group_id = matches.value_of("group-id").unwrap();
176+
let input_topic = matches.value_of("input-topic").unwrap();
177+
let output_topic = matches.value_of("output-topic").unwrap();
178+
let num_workers = value_t!(matches, "num-workers", usize).unwrap();
179+
180+
(0..num_workers)
181+
.map(|_| {
182+
tokio::spawn(run_async_processor(
183+
brokers.to_owned(),
184+
group_id.to_owned(),
185+
input_topic.to_owned(),
186+
output_topic.to_owned(),
187+
))
188+
})
189+
.collect::<FuturesUnordered<_>>()
190+
.for_each(|_| async { () })
191+
.await
192+
}

0 commit comments

Comments
 (0)