Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sources not setting Event::transactional #870

Merged
merged 9 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* Do not commit an empty topic-partition-list in the kafka onramp and improve logging for better debugging
* Fix kafka consumer offset lag of at least `1` continually by using offset + 1 when committing.
* Fix issue where binary not (`!`) was not getting lexed correctly [#833](https://github.com/tremor-rs/tremor-runtime/issues/833)
* Fix missing ack/fail insight events with offramps that dont support guaranteed delivery (e.g. udp, stdout) [#870](https://github.com/tremor-rs/tremor-runtime/pull/870)

## 0.10.2

Expand Down
2 changes: 1 addition & 1 deletion src/offramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait Impl {
pub fn lookup(name: &str, config: &Option<OpConfig>) -> Result<Box<dyn Offramp>> {
match name {
"blackhole" => blackhole::Blackhole::from_config(config),
"cb" => cb::CB::from_config(config),
"cb" => cb::Cb::from_config(config),
"debug" => debug::Debug::from_config(config),
"elastic" => elastic::Elastic::from_config(config),
"exit" => exit::Exit::from_config(config),
Expand Down
3 changes: 2 additions & 1 deletion src/onramp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::pipeline;
use crate::repository::ServantId;
use crate::source::prelude::*;
use crate::source::{
blaster, crononome, discord, file, kafka, metronome, postgres, rest, tcp, udp, ws,
blaster, cb, crononome, discord, file, kafka, metronome, postgres, rest, tcp, udp, ws,
};
use crate::url::TremorURL;
use async_std::task::{self, JoinHandle};
Expand Down Expand Up @@ -70,6 +70,7 @@ pub(crate) fn lookup(
) -> Result<Box<dyn Onramp>> {
match name {
"blaster" => blaster::Blaster::from_config(id, config),
"cb" => cb::Cb::from_config(id, config),
"file" => file::File::from_config(id, config),
"kafka" => kafka::Kafka::from_config(id, config),
"postgres" => postgres::Postgres::from_config(id, config),
Expand Down
7 changes: 6 additions & 1 deletion src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ pub(crate) trait Sink {
/// Is the sink active and ready to process events
fn is_active(&self) -> bool;

/// Is the sink automatically acknowleding events or engaged in some form of delivery
/// Is the sink automatically acknowledging events or engaged in some form of delivery
/// guarantee
///
/// If this sink returns `false` here, it needs to adhere to the following protocol for CB ack/fail insights:
/// send one `ack` or `fail` CB insight for each incoming event if `event.transaction == true`.
/// Otherwise dont send any.
/// For `ack` insights include a `time` field in the insight metadata with duration it took for handling the event in milliseconds, if it makes sense.
fn auto_ack(&self) -> bool;

fn default_codec(&self) -> &str;
Expand Down
8 changes: 4 additions & 4 deletions src/sink/cb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use crate::errors::Result;
use crate::sink::prelude::*;
use simd_json_derive::Serialize;
pub struct CB {}
pub struct Cb {}

///
/// CB event provoking offramp for testing sources and operators for their handling of CB events
Expand All @@ -24,14 +24,14 @@ pub struct CB {}
///
/// Examples: `{"cb": "ack"}` or `{"cb": ["fail", "close"]}`
///
impl offramp::Impl for CB {
impl offramp::Impl for Cb {
fn from_config(_config: &Option<OpConfig>) -> Result<Box<dyn Offramp>> {
Ok(SinkManager::new_box(Self {}))
}
}

#[async_trait::async_trait]
impl Sink for CB {
impl Sink for Cb {
async fn on_event(
&mut self,
_input: &str,
Expand Down Expand Up @@ -133,7 +133,7 @@ mod tests {
#[async_std::test]
async fn cb_meta() -> Result<()> {
let mut codec = crate::codec::lookup("json")?;
let mut cb = CB {};
let mut cb = Cb {};
let url = TremorURL::parse("/offramp/cb/instance")?;
let codec_map = halfbrown::HashMap::new();
let (tx, _rx) = async_channel::bounded(1);
Expand Down
81 changes: 47 additions & 34 deletions src/sink/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ impl Elastic {
let mut op_meta = OpMeta::default();
std::mem::swap(&mut op_meta, &mut event.op_meta);
let insight_id = id.clone();
let transactional = event.transactional;
let ingest_ns = event.ingest_ns;
let response_origin_uri = if is_linked {
event.origin_uri.clone()
} else {
Expand All @@ -276,22 +278,27 @@ impl Elastic {
let payload = match build_event_payload(&event) {
Ok(payload) => payload,
Err(e) => {
// send fail
self.send_insight_fail(&event).await;

// send error response about event not being able to be serialized into ES payload
let error_msg = format!("Invalid ES Payload: {}", e);
let mut data = Value::object_with_capacity(2);
let payload = event.data.suffix().value().clone_static();
data.insert("success", Value::from(false))?;
data.insert("error", Value::from("Invalid ES Payload"))?;
data.insert("error", Value::from(error_msg))?;
data.insert("payload", payload)?;
let source = build_source(&event.id, event.origin_uri.as_ref());
data.insert("source", source)?;

// send error response
if let Err(e) = response_tx.send(((data, Value::null()).into(), ERR)).await {
error!(
"[Sink::{}] Failed to send build_event_payload error response: {}",
self.sink_url, e
);
}
return Err(e); // this will bubble up the error to the calling on_event, sending a CB fail back
return Err(e);
}
};
let req = self.client.request(BulkRequest::new(payload));
Expand Down Expand Up @@ -356,7 +363,11 @@ impl Elastic {
}
Err(e) => {
// request failed
// TODO update error metric here?
// TODO how to update error metric here?
if m.insert("error", Value::from(e.to_string())).is_err() {
// ALLOW: this is ok
unreachable!()
}
if is_linked {
// send error event via ERR port
let mut error_data = Object::with_capacity(1);
Expand All @@ -375,14 +386,6 @@ impl Elastic {
CBAction::Fail
}
};
let insight = Event {
id: insight_id,
data: (Value::null(), m).into(),
ingest_ns: nanotime(),
op_meta,
cb,
..Event::default()
};

task::block_on(async move {
// send response events
Expand All @@ -392,9 +395,20 @@ impl Elastic {
}
}

// send insight
if let Err(e) = insight_tx.send(sink::Reply::Insight(insight)).await {
error!("Failed to send insight: {}", e);
// send insight - if required
// TODO: we dont have the event here, so we cant use our other convenience functions
if transactional {
let insight = Event {
id: insight_id,
data: (Value::null(), m).into(),
ingest_ns,
op_meta,
cb,
..Event::default()
};
if let Err(e) = insight_tx.send(sink::Reply::Insight(insight)).await {
error!("Failed to send insight: {}", e);
}
}

// mark this task as done in order to free a slot
Expand All @@ -410,26 +424,8 @@ impl Elastic {
async fn maybe_enque(&mut self, event: Event) -> Result<()> {
match self.queue.dequeue() {
Err(SinkDequeueError::NotReady) if !self.queue.has_capacity() => {
let mut m = Object::with_capacity(1);
let error_msg = "Dropped data due to es overload";
m.insert("error".into(), error_msg.into());

let insight = Event {
id: event.id,
data: (Value::null(), m).into(),
ingest_ns: event.ingest_ns,
cb: CBAction::Fail,
..Event::default()
};

if self
.insight_tx
.send(sink::Reply::Insight(insight))
.await
.is_err()
{
error!("[Sink::{}] Failed to send insight", &self.sink_url)
};
self.send_insight_fail(&event).await;

// send error message on overflow to ERR port
let mut data = Object::with_capacity(2);
Expand Down Expand Up @@ -462,6 +458,19 @@ impl Elastic {
}
}
}

// we swallow send errors here, we only log them
async fn send_insight_fail(&mut self, event: &Event) {
if event.transactional {
if let Err(e) = self
.insight_tx
.send(sink::Reply::Insight(event.to_fail()))
.await
{
error!("[Sink::{}] Failed to send insight: {}", &self.sink_url, e)
}
}
}
}

/// task responsible to receive raw response data, build response event and send it off.
Expand Down Expand Up @@ -513,7 +522,11 @@ impl Sink for Elastic {
"[Sink::{}] Received empty event. Won't send it to ES",
self.sink_url
);
Ok(Some(vec![Reply::Insight(event.insight_ack())]))
Ok(Some(if event.transactional {
vec![Reply::Insight(event.insight_ack())]
} else {
vec![]
}))
} else {
// we have either one event or a batched one with > 1 event
self.maybe_enque(event).await?;
Expand Down
55 changes: 46 additions & 9 deletions src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use rdkafka::{
message::OwnedHeaders,
producer::{FutureProducer, FutureRecord},
};
use std::{fmt, time::Duration};
use std::{
fmt,
time::{Duration, Instant},
};

#[derive(Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -162,10 +165,12 @@ where

/// Waits for actual delivery to kafka cluster and sends ack or fail.
/// Also sends fatal errors for handling in offramp task.
#[allow(clippy::cast_possible_truncation)]
async fn wait_for_delivery(
sink_url: String,
futures: Vec<rdkafka::producer::DeliveryFuture>,
mut insight_event: Event,
processing_start: Instant,
maybe_event: Option<Event>,
reply_tx: Sender<sink::Reply>,
error_tx: Sender<KafkaError>,
) -> Result<()> {
Expand All @@ -177,7 +182,13 @@ async fn wait_for_delivery(
sink_url, &kafka_error
);
if is_fatal(&kafka_error) {
error_tx.send(kafka_error).await?;
let err_msg = format!("{}", &kafka_error);
if error_tx.send(kafka_error).await.is_err() {
error!(
"[Sink::{}] Error notifying the system about kafka error: {}",
&sink_url, &err_msg
)
}
}
CBAction::Fail
} else {
Expand All @@ -194,8 +205,22 @@ async fn wait_for_delivery(
CBAction::Fail
}
};
insight_event.cb = cb;
reply_tx.send(sink::Reply::Insight(insight_event)).await?;
if let Some(mut insight) = maybe_event {
insight.cb = cb;
if cb == CBAction::Ack {
let time = processing_start.elapsed().as_millis() as u64;
let mut m = Object::with_capacity(1);
m.insert("time".into(), time.into());
insight.data = (Value::null(), m).into();
}

if reply_tx.send(sink::Reply::Insight(insight)).await.is_err() {
error!(
"[Sink::{}] Error sending {:?} insight after delivery",
sink_url, cb
);
}
}
Ok(())
}

Expand Down Expand Up @@ -242,7 +267,7 @@ impl Sink for Kafka {

let ingest_ns = event.ingest_ns;
let mut delivery_futures = Vec::with_capacity(event.len()); // might not be enough
let mut insight_event = event.insight_ack(); // we gonna change the success status later, if need be
let processing_start = Instant::now();
for (value, meta) in event.value_meta_iter() {
let encoded = codec.encode(value)?;
let processed = postprocess(self.postprocessors.as_mut_slice(), ingest_ns, encoded)?;
Expand Down Expand Up @@ -281,23 +306,35 @@ impl Sink for Kafka {
delivery_futures.push(delivery_future);
}
Err((e, _)) => {
error!("[Sink::{}] failed to enque message: {}", &self.sink_url, e);
error!(
"[Sink::{}] failed to enqueue message: {}",
&self.sink_url, e
);
if is_fatal(&e) {
// handle fatal errors right here, without enqueueing
self.handle_fatal_error(&e)?;
}
// bail out with a CB fail on enqueue error
insight_event.cb = CBAction::Fail;
return Ok(Some(vec![sink::Reply::Insight(insight_event)]));
if event.transactional {
return Ok(Some(vec![sink::Reply::Insight(event.to_fail())]));
}
return Ok(None);
}
}
}
}
let insight_event = if event.transactional {
// we gonna change the success status later, if need be
Some(event.insight_ack())
} else {
None
};
// successfully enqueued all messages
// spawn the task waiting for delivery and send acks/fails then
task::spawn(wait_for_delivery(
self.sink_url.to_string(),
delivery_futures,
processing_start,
insight_event,
self.reply_tx.clone(),
self.error_tx.clone(),
Expand Down
Loading