diff --git a/src/sink/kafka.rs b/src/sink/kafka.rs index c898d4ca47..cebe3ed746 100644 --- a/src/sink/kafka.rs +++ b/src/sink/kafka.rs @@ -170,7 +170,7 @@ async fn wait_for_delivery( ) -> Result<()> { let cb = match futures::future::try_join_all(futures).await { Ok(results) => { - if let Some((kafka_error, _)) = results.into_iter().find_map(|res| res.err()) { + if let Some((kafka_error, _)) = results.into_iter().find_map(std::result::Result::err) { error!( "[Sink::{}] Error delivering kafka record: {}", sink_url, &kafka_error @@ -204,14 +204,14 @@ impl Kafka { while let Ok(e) = self.error_rx.try_recv() { if !handled { // only handle on first fatal error - self.handle_fatal_error(e)?; + self.handle_fatal_error(&e)?; handled = true; } } Ok(()) } - fn handle_fatal_error(&mut self, _fatal_error: KafkaError) -> Result<()> { + fn handle_fatal_error(&mut self, _fatal_error: &KafkaError) -> Result<()> { let maybe_fatal_error = unsafe { get_fatal_error(self.producer.client()) }; if let Some(fatal_error) = maybe_fatal_error { error!( @@ -265,7 +265,7 @@ impl Sink for Kafka { error!("[Sink::{}] failed to enque message: {}", &self.sink_url, e); if is_fatal(&e) { // handle fatal errors right here, without enqueueing - self.handle_fatal_error(e)?; + self.handle_fatal_error(&e)?; } // bail out with a CB fail on enqueue error insight_event.cb = CBAction::Fail; @@ -300,7 +300,7 @@ impl Sink for Kafka { reply_channel: Sender, ) -> Result<()> { self.postprocessors = make_postprocessors(processors.post)?; - self.reply_tx = reply_channel.clone(); + self.reply_tx = reply_channel; self.sink_url = sink_url.clone(); Ok(()) } diff --git a/tremor-pipeline/src/event.rs b/tremor-pipeline/src/event.rs index 034fad399a..d96fb3b834 100644 --- a/tremor-pipeline/src/event.rs +++ b/tremor-pipeline/src/event.rs @@ -174,12 +174,7 @@ impl Event { /// normally 1, but for batched events possibly > 1 pub fn len(&self) -> usize { if self.is_batch { - self.data - .suffix() - .value() - .as_array() - .map(Vec::len) - .unwrap_or_default() + self.data.suffix().value().as_array().map_or(0, Vec::len) } else { 1 } @@ -194,8 +189,7 @@ impl Event { .suffix() .value() .as_array() - .map(Vec::is_empty) - .unwrap_or(true) + .map_or(true, Vec::is_empty) } }