Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Sep 17, 2024
1 parent e8c01a1 commit 806b0ff
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metric_definitions::KAFKA_INGRESS_REQUESTS;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use base64::Engine;
use bytes::Bytes;
use metrics::counter;
Expand All @@ -18,20 +22,19 @@ use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
use rdkafka::{ClientConfig, Message};
use tokio::sync::oneshot;
use tracing::{debug, info, info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind};
use restate_ingress_dispatcher::{
DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest,
};
use restate_types::invocation::{Header, SpanRelation};
use restate_types::message::MessageIndex;
use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::{debug, info, info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::metric_definitions::KAFKA_INGRESS_REQUESTS;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -86,15 +89,21 @@ impl DeduplicationId for KafkaDeduplicationId {

#[derive(Clone)]
pub struct MessageSender {
subscription_id: String,
subscription: Subscription,
dispatcher: IngressDispatcher,

subscription_id: String,
ingress_request_counter: metrics::Counter,
}

impl MessageSender {
pub fn new(subscription: Subscription, dispatcher: IngressDispatcher) -> Self {
Self {
subscription_id: subscription.id().to_string(),
ingress_request_counter: counter!(
KAFKA_INGRESS_REQUESTS,
"subscription" => subscription.id().to_string()
),
subscription,
dispatcher,
}
Expand Down Expand Up @@ -140,11 +149,7 @@ impl MessageSender {
cause,
})?;

counter!(
KAFKA_INGRESS_REQUESTS,
"subscription" => self.subscription_id.clone()
)
.increment(1);
self.ingress_request_counter.increment(1);

self.dispatcher
.dispatch_ingress_request(req)
Expand Down

0 comments on commit 806b0ff

Please sign in to comment.