diff --git a/executor/api/kafka/server.go b/executor/api/kafka/server.go index 421f1d91b8..bf5349829b 100644 --- a/executor/api/kafka/server.go +++ b/executor/api/kafka/server.go @@ -36,11 +36,13 @@ const ( ENV_KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC" ENV_KAFKA_FULL_GRAPH = "KAFKA_FULL_GRAPH" ENV_KAFKA_WORKERS = "KAFKA_WORKERS" + ENV_KAFKA_AUTO_COMMIT = "KAFKA_AUTO_COMMIT" ) type SeldonKafkaServer struct { Client client.SeldonApiClient Producer *kafka.Producer + Consumer *kafka.Consumer DeploymentName string Namespace string Transport string @@ -53,9 +55,10 @@ type SeldonKafkaServer struct { Log logr.Logger Protocol string FullHealthCheck bool + AutoCommit bool } -func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, protocol, transport string, annotations map[string]string, serverUrl *url.URL, predictor *v1.PredictorSpec, broker, topicIn, topicOut string, log logr.Logger, fullHealthCheck bool) (*SeldonKafkaServer, error) { +func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, protocol, transport string, annotations map[string]string, serverUrl *url.URL, predictor *v1.PredictorSpec, broker, topicIn, topicOut string, log logr.Logger, fullHealthCheck bool, autoCommit bool) (*SeldonKafkaServer, error) { var apiClient client.SeldonApiClient var err error if fullGraph { @@ -101,6 +104,11 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot } } + + if !autoCommit && workers > 1 { + log.Info("Disabling auto commit for kafka can have undesired side effects with multiple workers") + } + // Create Producer log.Info("Creating producer", "broker", broker) p, err := kafka.NewProducer(&producerConfigMap) @@ -124,6 +132,7 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot Log: log.WithName("KafkaServer"), Protocol: protocol, FullHealthCheck: fullHealthCheck, + AutoCommit: autoCommit, }, nil } @@ -167,8 +176,9 @@ func (ks *SeldonKafkaServer) Serve() error { "broker.address.family": "v4", "group.id": ks.getGroupName(), "session.timeout.ms": 6000, - "enable.auto.commit": true, - "auto.offset.reset": "earliest"} + "enable.auto.commit": ks.AutoCommit, + "auto.offset.reset": "earliest", + } if util.GetKafkaSecurityProtocol() == "SSL" { sslKakfaServer := util.GetSslElements() @@ -190,6 +200,7 @@ func (ks *SeldonKafkaServer) Serve() error { if err != nil { return err } + ks.Consumer = c ks.Log.Info("Created", "consumer", c.String(), "consumer group", ks.getGroupName(), "topic", ks.TopicIn) err = c.SubscribeTopics([]string{ks.TopicIn}, nil) @@ -275,7 +286,7 @@ func (ks *SeldonKafkaServer) Serve() error { job := KafkaJob{ headers: headers, - reqKey: e.Key, + message: e, reqPayload: reqPayload, } // enqueue a job diff --git a/executor/api/kafka/worker.go b/executor/api/kafka/worker.go index acb4b3bf07..eded70c719 100644 --- a/executor/api/kafka/worker.go +++ b/executor/api/kafka/worker.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" @@ -12,7 +13,7 @@ import ( type KafkaJob struct { headers map[string][]string - reqKey []byte + message *kafka.Message reqPayload payload.SeldonPayload } @@ -63,11 +64,22 @@ func (ks *SeldonKafkaServer) processKafkaRequest(job *KafkaJob) { err = ks.Producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &ks.TopicOut, Partition: kafka.PartitionAny}, - Key: job.reqKey, + Key: job.message.Key, Value: resBytes, Headers: kafkaHeaders, }, nil) + if err != nil { ks.Log.Error(err, "Failed to produce response") } + + // Commit the messages here + if !ks.AutoCommit { + _, err = ks.Consumer.CommitMessage(job.message) + + if err != nil { + ks.Log.Error(err, "Failed to commit offsets") + } + } + } diff --git a/executor/cmd/executor/main.go b/executor/cmd/executor/main.go index 437943a6e3..4413f48d82 100644 --- a/executor/cmd/executor/main.go +++ b/executor/cmd/executor/main.go @@ -79,6 +79,7 @@ var ( kafkaTopicOut = flag.String("kafka_output_topic", "", "The kafka output topic") kafkaFullGraph = flag.Bool("kafka_full_graph", false, "Use kafka for internal graph processing") kafkaWorkers = flag.Int("kafka_workers", 4, "Number of kafka workers") + kafkaAutoCommit = flag.Bool("kafka_auto_commit", true, "Use auto committing in the kafka consumer") logKafkaBroker = flag.String("log_kafka_broker", "", "The kafka log broker") logKafkaTopic = flag.String("log_kafka_topic", "", "The kafka log topic") fullHealthChecks = flag.Bool("full_health_checks", false, "Full health checks via chosen protocol API") @@ -285,6 +286,17 @@ func main() { } } + // Get Kafka Auto Commit + kafkaAutoCommitFromEnv := os.Getenv(kafka.ENV_KAFKA_AUTO_COMMIT) + if kafkaAutoCommitFromEnv != "" { + kafkaAutoCommitFromEnvBool, err := strconv.ParseBool(kafkaAutoCommitFromEnv) + if err != nil { + log.Fatalf("Failed to parse %s %s", kafka.ENV_KAFKA_AUTO_COMMIT, kafkaAutoCommitFromEnv) + } else { + *kafkaAutoCommit = kafkaAutoCommitFromEnvBool + } + } + //Kafka workers kafkaWorkersFromEnv := os.Getenv(kafka.ENV_KAFKA_WORKERS) if kafkaWorkersFromEnv != "" { @@ -364,7 +376,7 @@ func main() { if *serverType == "kafka" { logger.Info("Starting kafka server") - kafkaServer, err := kafka.NewKafkaServer(*kafkaFullGraph, *kafkaWorkers, *sdepName, *namespace, *protocol, *transport, annotations, serverUrl, predictor, *kafkaBroker, *kafkaTopicIn, *kafkaTopicOut, logger, *fullHealthChecks) + kafkaServer, err := kafka.NewKafkaServer(*kafkaFullGraph, *kafkaWorkers, *sdepName, *namespace, *protocol, *transport, annotations, serverUrl, predictor, *kafkaBroker, *kafkaTopicIn, *kafkaTopicOut, logger, *fullHealthChecks, *kafkaAutoCommit) if err != nil { log.Fatalf("Failed to create kafka server: %v", err) }