From eadd40002af5f713350630a977b56c1c725fce6c Mon Sep 17 00:00:00 2001 From: wuhuizuo Date: Mon, 9 Sep 2024 13:48:06 +0800 Subject: [PATCH] feat(cloudevents-server): add dead letter topic support (#168) --- .../configs/example-config-sqlite3.yaml | 16 +++++++ .../configs/example-config.yaml | 16 +++++++ cloudevents-server/main.go | 46 +++++++++++++++---- .../pkg/events/handler/kafka.go | 30 ++++++------ cloudevents-server/pkg/kafka/types.go | 5 +- 5 files changed, 88 insertions(+), 25 deletions(-) diff --git a/cloudevents-server/configs/example-config-sqlite3.yaml b/cloudevents-server/configs/example-config-sqlite3.yaml index 4411731..30ebf9d 100644 --- a/cloudevents-server/configs/example-config-sqlite3.yaml +++ b/cloudevents-server/configs/example-config-sqlite3.yaml @@ -21,3 +21,19 @@ tekton: - event_type: dev.tekton.event.pipelinerun.failed.v1 event_subject_reg: ^xxx-from-.* receivers: [failure-receiver] +kafka: + brokers: + - broker1:9092 + client_id: cloudevents-server + # authentication: + # mechanism: SCRAM-SHA-256 + # user: username + # password: password + producer: + topic_mapping: {} + default_topic: test-topic + consumer: + group_id: consumer-group-1 + topic_mapping: + '*': test-topic + dead_letter_topic: test-topic-dead-letter \ No newline at end of file diff --git a/cloudevents-server/configs/example-config.yaml b/cloudevents-server/configs/example-config.yaml index 09f88e0..976c3ec 100644 --- a/cloudevents-server/configs/example-config.yaml +++ b/cloudevents-server/configs/example-config.yaml @@ -21,3 +21,19 @@ tekton: - event_type: dev.tekton.event.pipelinerun.failed.v1 event_subject_reg: ^xxx-from-.* receivers: [failure-receiver] +kafka: + brokers: + - broker1:9092 + client_id: cloudevents-server + # authentication: + # mechanism: SCRAM-SHA-256 + # user: username + # password: password + producer: + topic_mapping: {} + default_topic: test-topic + consumer: + group_id: consumer-group-1 + topic_mapping: + '*': test-topic + dead_letter_topic: test-topic-dead-letter \ No newline at end of file diff --git a/cloudevents-server/main.go b/cloudevents-server/main.go index 4e97094..95c8107 100644 --- a/cloudevents-server/main.go +++ b/cloudevents-server/main.go @@ -4,6 +4,10 @@ import ( "context" "flag" "net/http" + "os" + "os/signal" + "syscall" + "time" "github.com/gin-gonic/gin" "github.com/rs/zerolog" @@ -42,10 +46,10 @@ func main() { } gin.SetMode(ginMode) - r := gin.Default() - _ = r.SetTrustedProxies(nil) + ginEngine := gin.Default() + _ = ginEngine.SetTrustedProxies(nil) - setRouters(r, cfg) + setRouters(ginEngine, cfg) hd, err := newCloudEventsHandler(cfg) if err != nil { @@ -57,13 +61,39 @@ func main() { if err != nil { log.Fatal().Err(err).Msg("failed to create consumer group") } - defer cg.Close() - go cg.Start(context.Background()) - log.Info().Str("address", serveAddr).Msg("server started.") - if err := http.ListenAndServe(serveAddr, r); err != nil { - log.Fatal().Err(err).Send() + srv := &http.Server{Addr: serveAddr, Handler: ginEngine} + startServices(srv, cg) +} + +func startServices(srv *http.Server, cg handler.EventConsumerGroup) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatal().Err(err).Msg("server error") + } + }() + log.Info().Str("address", srv.Addr).Msg("server started.") + cgCtx, cgCancel := context.WithCancel(context.Background()) + go cg.Start(cgCtx) + + // Wait for interrupt signal to gracefully shutdown + sig := <-sigChan + log.Warn().Str("signal", sig.String()).Msg("signal received") + + // shutdown consumer group. + cgCancel() + + // shutdown http server. + shutdownSrvCtx, shutdownSrvCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownSrvCancel() + if err := srv.Shutdown(shutdownSrvCtx); err != nil { + log.Error().Err(err).Msg("server shutdown error") } + + log.Info().Msg("server gracefully stopped") } func setRouters(r gin.IRoutes, cfg *config.Config) { diff --git a/cloudevents-server/pkg/events/handler/kafka.go b/cloudevents-server/pkg/events/handler/kafka.go index 1214b3a..154cb58 100644 --- a/cloudevents-server/pkg/events/handler/kafka.go +++ b/cloudevents-server/pkg/events/handler/kafka.go @@ -2,10 +2,7 @@ package handler import ( "context" - "os" - "os/signal" "sync" - "syscall" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/rs/zerolog/log" @@ -28,25 +25,32 @@ func NewEventProducer(cfg config.Kafka) (*EventProducer, error) { }, nil } -func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler) (*EventConsumer, error) { +func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler, faultWriter *kafka.Writer) (*EventConsumer, error) { reader, err := skakfa.NewReader(cfg.Authentication, cfg.Brokers, topic, cfg.Consumer.GroupID, cfg.ClientID) if err != nil { return nil, err } return &EventConsumer{ - reader: reader, - handler: hander, + reader: reader, + handler: hander, + writer: faultWriter, + faultTopic: cfg.Consumer.DeadLetterTopic, }, nil } func NewEventConsumerGroup(cfg config.Kafka, hander EventHandler) (EventConsumerGroup, error) { + faultWriter, err := skakfa.NewWriter(cfg.Authentication, cfg.Brokers, "", cfg.ClientID) + if err != nil { + return nil, err + } + consumerGroup := make(EventConsumerGroup) for _, topic := range cfg.Consumer.TopicMapping { if consumerGroup[topic] != nil { continue } - consumer, err := NewEventConsumer(cfg, topic, hander) + consumer, err := NewEventConsumer(cfg, topic, hander, faultWriter) if err != nil { return nil, err } @@ -106,6 +110,9 @@ func (ecs EventConsumerGroup) Close() { } } +// Start runs the EventConsumerGroup in parallel, starting each EventConsumer +// in a separate goroutine. It waits for all EventConsumers to finish before +// returning. func (ecs EventConsumerGroup) Start(ctx context.Context) { wg := new(sync.WaitGroup) for _, ec := range ecs { @@ -129,19 +136,12 @@ type EventConsumer struct { // consumer workers func (ec *EventConsumer) Start(ctx context.Context) error { - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - defer ec.Close() for { select { case <-ctx.Done(): return ctx.Err() - case <-sigterm: - // When SIGTERM received, try to flush remaining messages - // and exit gracefully - return nil default: m, err := ec.reader.ReadMessage(ctx) if err != nil { @@ -157,7 +157,7 @@ func (ec *EventConsumer) Start(ctx context.Context) error { result := ec.handler.Handle(event) if !cloudevents.IsACK(result) { log.Error().Err(err).Msg("error handling event") - // ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value}) + ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value}) } } } diff --git a/cloudevents-server/pkg/kafka/types.go b/cloudevents-server/pkg/kafka/types.go index 859a183..b5d78c2 100644 --- a/cloudevents-server/pkg/kafka/types.go +++ b/cloudevents-server/pkg/kafka/types.go @@ -6,6 +6,7 @@ type Producer struct { } type Consumer struct { - GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` - TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic. + GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` + TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic. + DeadLetterTopic string `yaml:"dead_letter_topic,omitempty" json:"dead_letter_topic,omitempty"` }