Skip to content

Commit

Permalink
Merge pull request #5548 from ruchirK/rk-kgen
Browse files Browse the repository at this point in the history
kgen: allow avro keys and avro key distributions
  • Loading branch information
ruchirK authored Feb 1, 2021
2 parents f496786 + 2909068 commit 6725cfc
Showing 1 changed file with 63 additions and 20 deletions.
83 changes: 63 additions & 20 deletions src/kafka-util/src/bin/kgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::rc::Rc;
use std::thread;
use std::time::Duration;

use anyhow::bail;
use chrono::{NaiveDate, NaiveDateTime};
use clap::{App, Arg};
use rand::distributions::{
Expand Down Expand Up @@ -458,9 +457,8 @@ async fn main() -> anyhow::Result<()> {
)
.arg(
// default 1
Arg::with_name("partitions")
.short("p")
.long("partitions")
Arg::with_name("partitions-round-robin")
.long("partitions-round-robin")
.takes_value(true),
)
.arg(
Expand Down Expand Up @@ -491,11 +489,25 @@ async fn main() -> anyhow::Result<()> {
.takes_value(true)
.required_if("values", "avro"),
)
.arg(
Arg::with_name("avro-key-schema")
.long("avro-key-schema")
.takes_value(true)
.required_if("keys", "avro")
.conflicts_with_all(&["key-min", "key-max"]),
)
.arg(
Arg::with_name("avro-distribution")
.long("avro-distribution")
.takes_value(true)
.required_if("values", "avro"),
.required_if("values", "avro")
.conflicts_with_all(&["key-min", "key-max"]),
)
.arg(
Arg::with_name("avro-key-distribution")
.long("avro-key-distribution")
.takes_value(true)
.required_if("keys", "avro"),
)
.arg(
Arg::with_name("bootstrap")
Expand All @@ -509,7 +521,7 @@ async fn main() -> anyhow::Result<()> {
.long("keys")
.short("k")
.takes_value(true)
.possible_values(&["sequential", "random"])
.possible_values(&["avro", "sequential", "random"])
.default_value("sequential"),
)
.arg(
Expand All @@ -527,16 +539,14 @@ async fn main() -> anyhow::Result<()> {
.get_matches();
let topic = matches.value_of("topic").unwrap();
let n: usize = matches.value_of("num-records").unwrap().parse()?;
let partitions: usize = matches
.value_of("partitions")
let partitions_round_robin: usize = matches
.value_of("partitions-round-robin")
.map(str::parse)
.transpose()?
.unwrap_or(1);
if partitions == 0 {
bail!("Partitions must a positive number.");
}
.unwrap_or(0);
let bootstrap = matches.value_of("bootstrap").unwrap();
let mut _schema_holder = None;
let mut _key_schema_holder = None;
let mut value_gen = match matches.value_of("values").unwrap() {
"bytes" => {
let min: usize = matches.value_of("min").unwrap().parse()?;
Expand Down Expand Up @@ -569,6 +579,29 @@ async fn main() -> anyhow::Result<()> {
_ => unreachable!(),
};

let mut key_gen = match matches.value_of("keys").unwrap() {
"avro" => {
let key_schema = matches.value_of("avro-key-schema").unwrap();
let ccsr =
ccsr::ClientConfig::new(Url::parse("http://localhost:8081").unwrap()).build();
let key_schema_id = ccsr
.publish_schema(&format!("{}-key", topic), key_schema)
.await?;
_key_schema_holder = Some(Schema::parse_str(key_schema)?);
let key_schema = _key_schema_holder.as_ref().unwrap();
let annotations = matches.value_of("avro-key-distribution").unwrap();
let annotations: serde_json::Value = serde_json::from_str(annotations)?;
let generator = RandomAvroGenerator::new(key_schema, &annotations);

Some(ValueGenerator::RandomAvro {
inner: generator,
schema: key_schema,
schema_id: key_schema_id,
})
}
_ => None,
};

let producer: ThreadedProducer<DefaultProducerContext> = ClientConfig::new()
.set("bootstrap.servers", bootstrap)
.create()?;
Expand All @@ -588,16 +621,26 @@ async fn main() -> anyhow::Result<()> {
eprintln!("Generating message {}", i);
}
value_gen.next_value(&mut buf);
let key_i = if let Some(key_dist) = key_dist.as_ref() {
key_dist.sample(&mut rng)
let key = if let Some(key_gen) = key_gen.as_mut() {
// TODO: find a nicer way of doing this. The API is designed to
// reuse the same buffer over and over again for random data but the
// alternative key generation methods currently want to allocate their
// a new buffer each time so for now, we're going with a mildly silly
// workaround.
let mut key_buf = vec![];
key_gen.next_value(&mut key_buf);
key_buf
} else if let Some(key_dist) = key_dist.as_ref() {
key_dist.sample(&mut rng).to_be_bytes().to_vec()
} else {
i as u64
(i as u64).to_be_bytes().to_vec()
};
let key = key_i.to_be_bytes();
let mut rec = BaseRecord::to(topic)
.key(&key)
.payload(&buf)
.partition((i % partitions) as i32);
let mut rec = BaseRecord::to(topic).key(&key).payload(&buf);

if partitions_round_robin != 0 {
rec = rec.partition((i % partitions_round_robin) as i32);
}

loop {
match producer.send(rec) {
Ok(()) => break,
Expand Down

0 comments on commit 6725cfc

Please sign in to comment.