Skip to content

Commit

Permalink
Kafka sink for raw events (#405)
Browse files Browse the repository at this point in the history
* Add librdkafka-based Kafka sink

* rustfmt run

* Add internal telemetry for kafka sink

* Add config reading for kafka

* Add setup for Kafkas in bin/cernan

* Fix build error

* Put in valve control for messages vector growing out of control

* Add warn message when we need to retry a message - also use iter::filter_map

* Add config tests for kafka

* Kafka sink testability changes and a couple of tests to take advantage

* Go ahead and test all the retryable error types

* Add a test for the failed retry path

* Add unrecoverable kafka error test

* Testability improvements for global statics and remaining untested paths

* Add a test for the default stats collector
  • Loading branch information
dparton authored Feb 2, 2018
1 parent ae43d98 commit ecc2fdf
Show file tree
Hide file tree
Showing 10 changed files with 1,030 additions and 4 deletions.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ elastic = "0.20"
elastic_types = "0.20"
fern = "0.5"
flate2 = "1.0"
futures = "0.1.17"
glob = "0.2.11"
hopper = "0.3"
hyper = "0.10" # 0.10 specifically required by rusoto_kinesis' KinesisClient
Expand All @@ -37,6 +38,7 @@ openssl-probe = "0.1"
protobuf = "1.4"
quantiles = { version = "0.7", features = ["serde_support"] }
rand = "0.4"
rdkafka = "0.14.0"
regex = "0.2"
rusoto_core = "0.30"
rusoto_firehose = "0.30"
Expand Down
26 changes: 26 additions & 0 deletions src/bin/cernan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,19 @@ fn main() {
config_topology.insert(config_path.clone(), Default::default());
}
}
if let Some(ref configs) = args.kafkas {
for config in configs {
let config_path = cfg_conf!(config);
let (send, recv) = hopper::channel_with_max_bytes(
&config_path,
&args.data_directory,
args.max_hopper_queue_bytes,
).unwrap();
senders.insert(config_path.clone(), send);
receivers.insert(config_path.clone(), recv);
config_topology.insert(config_path.clone(), Default::default());
}
}
if let Some(ref configs) = args.kinesises {
for config in configs {
let config_path = cfg_conf!(config);
Expand Down Expand Up @@ -455,6 +468,19 @@ fn main() {
);
}
}
if let Some(cfgs) = mem::replace(&mut args.kafkas, None) {
for config in cfgs {
let recv = receivers
.remove(&config.config_path.clone().unwrap())
.unwrap();
let sources =
adjacency_matrix.pop_nodes(&config.config_path.clone().unwrap());
sinks.insert(
config.config_path.clone().unwrap(),
cernan::sink::Kafka::new(recv, sources, config).run(),
);
}
}
if let Some(cfgs) = mem::replace(&mut args.kinesises, None) {
for config in cfgs {
let recv = receivers
Expand Down
Loading

0 comments on commit ecc2fdf

Please sign in to comment.