Skip to content

Commit

Permalink
oh you, clippy!
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
  • Loading branch information
Matthias Wahl authored and Licenser committed Nov 12, 2020
1 parent 1af1686 commit c807ef5
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 13 deletions.
10 changes: 5 additions & 5 deletions src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async fn wait_for_delivery(
) -> Result<()> {
let cb = match futures::future::try_join_all(futures).await {
Ok(results) => {
if let Some((kafka_error, _)) = results.into_iter().find_map(|res| res.err()) {
if let Some((kafka_error, _)) = results.into_iter().find_map(std::result::Result::err) {
error!(
"[Sink::{}] Error delivering kafka record: {}",
sink_url, &kafka_error
Expand Down Expand Up @@ -204,14 +204,14 @@ impl Kafka {
while let Ok(e) = self.error_rx.try_recv() {
if !handled {
// only handle on first fatal error
self.handle_fatal_error(e)?;
self.handle_fatal_error(&e)?;
handled = true;
}
}
Ok(())
}

fn handle_fatal_error(&mut self, _fatal_error: KafkaError) -> Result<()> {
fn handle_fatal_error(&mut self, _fatal_error: &KafkaError) -> Result<()> {
let maybe_fatal_error = unsafe { get_fatal_error(self.producer.client()) };
if let Some(fatal_error) = maybe_fatal_error {
error!(
Expand Down Expand Up @@ -265,7 +265,7 @@ impl Sink for Kafka {
error!("[Sink::{}] failed to enque message: {}", &self.sink_url, e);
if is_fatal(&e) {
// handle fatal errors right here, without enqueueing
self.handle_fatal_error(e)?;
self.handle_fatal_error(&e)?;
}
// bail out with a CB fail on enqueue error
insight_event.cb = CBAction::Fail;
Expand Down Expand Up @@ -300,7 +300,7 @@ impl Sink for Kafka {
reply_channel: Sender<sink::Reply>,
) -> Result<()> {
self.postprocessors = make_postprocessors(processors.post)?;
self.reply_tx = reply_channel.clone();
self.reply_tx = reply_channel;
self.sink_url = sink_url.clone();
Ok(())
}
Expand Down
10 changes: 2 additions & 8 deletions tremor-pipeline/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,7 @@ impl Event {
/// normally 1, but for batched events possibly > 1
pub fn len(&self) -> usize {
if self.is_batch {
self.data
.suffix()
.value()
.as_array()
.map(Vec::len)
.unwrap_or_default()
self.data.suffix().value().as_array().map_or(0, Vec::len)
} else {
1
}
Expand All @@ -194,8 +189,7 @@ impl Event {
.suffix()
.value()
.as_array()
.map(Vec::is_empty)
.unwrap_or(true)
.map_or(true, Vec::is_empty)
}
}

Expand Down

0 comments on commit c807ef5

Please sign in to comment.