Skip to content

Commit

Permalink
feat(cloudevents-server): add dead letter topic support (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhuizuo authored Sep 9, 2024
1 parent 211b251 commit eadd400
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 25 deletions.
16 changes: 16 additions & 0 deletions cloudevents-server/configs/example-config-sqlite3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ tekton:
- event_type: dev.tekton.event.pipelinerun.failed.v1
event_subject_reg: ^xxx-from-.*
receivers: [failure-receiver]
kafka:
brokers:
- broker1:9092
client_id: cloudevents-server
# authentication:
# mechanism: SCRAM-SHA-256
# user: username
# password: password
producer:
topic_mapping: {}
default_topic: test-topic
consumer:
group_id: consumer-group-1
topic_mapping:
'*': test-topic
dead_letter_topic: test-topic-dead-letter
16 changes: 16 additions & 0 deletions cloudevents-server/configs/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ tekton:
- event_type: dev.tekton.event.pipelinerun.failed.v1
event_subject_reg: ^xxx-from-.*
receivers: [failure-receiver]
kafka:
brokers:
- broker1:9092
client_id: cloudevents-server
# authentication:
# mechanism: SCRAM-SHA-256
# user: username
# password: password
producer:
topic_mapping: {}
default_topic: test-topic
consumer:
group_id: consumer-group-1
topic_mapping:
'*': test-topic
dead_letter_topic: test-topic-dead-letter
46 changes: 38 additions & 8 deletions cloudevents-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"flag"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -42,10 +46,10 @@ func main() {
}

gin.SetMode(ginMode)
r := gin.Default()
_ = r.SetTrustedProxies(nil)
ginEngine := gin.Default()
_ = ginEngine.SetTrustedProxies(nil)

setRouters(r, cfg)
setRouters(ginEngine, cfg)

hd, err := newCloudEventsHandler(cfg)
if err != nil {
Expand All @@ -57,13 +61,39 @@ func main() {
if err != nil {
log.Fatal().Err(err).Msg("failed to create consumer group")
}
defer cg.Close()
go cg.Start(context.Background())

log.Info().Str("address", serveAddr).Msg("server started.")
if err := http.ListenAndServe(serveAddr, r); err != nil {
log.Fatal().Err(err).Send()
srv := &http.Server{Addr: serveAddr, Handler: ginEngine}
startServices(srv, cg)
}

func startServices(srv *http.Server, cg handler.EventConsumerGroup) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal().Err(err).Msg("server error")
}
}()
log.Info().Str("address", srv.Addr).Msg("server started.")
cgCtx, cgCancel := context.WithCancel(context.Background())
go cg.Start(cgCtx)

// Wait for interrupt signal to gracefully shutdown
sig := <-sigChan
log.Warn().Str("signal", sig.String()).Msg("signal received")

// shutdown consumer group.
cgCancel()

// shutdown http server.
shutdownSrvCtx, shutdownSrvCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownSrvCancel()
if err := srv.Shutdown(shutdownSrvCtx); err != nil {
log.Error().Err(err).Msg("server shutdown error")
}

log.Info().Msg("server gracefully stopped")
}

func setRouters(r gin.IRoutes, cfg *config.Config) {
Expand Down
30 changes: 15 additions & 15 deletions cloudevents-server/pkg/events/handler/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package handler

import (
"context"
"os"
"os/signal"
"sync"
"syscall"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/rs/zerolog/log"
Expand All @@ -28,25 +25,32 @@ func NewEventProducer(cfg config.Kafka) (*EventProducer, error) {
}, nil
}

func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler) (*EventConsumer, error) {
func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler, faultWriter *kafka.Writer) (*EventConsumer, error) {
reader, err := skakfa.NewReader(cfg.Authentication, cfg.Brokers, topic, cfg.Consumer.GroupID, cfg.ClientID)
if err != nil {
return nil, err
}

return &EventConsumer{
reader: reader,
handler: hander,
reader: reader,
handler: hander,
writer: faultWriter,
faultTopic: cfg.Consumer.DeadLetterTopic,
}, nil
}

func NewEventConsumerGroup(cfg config.Kafka, hander EventHandler) (EventConsumerGroup, error) {
faultWriter, err := skakfa.NewWriter(cfg.Authentication, cfg.Brokers, "", cfg.ClientID)
if err != nil {
return nil, err
}

consumerGroup := make(EventConsumerGroup)
for _, topic := range cfg.Consumer.TopicMapping {
if consumerGroup[topic] != nil {
continue
}
consumer, err := NewEventConsumer(cfg, topic, hander)
consumer, err := NewEventConsumer(cfg, topic, hander, faultWriter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -106,6 +110,9 @@ func (ecs EventConsumerGroup) Close() {
}
}

// Start runs the EventConsumerGroup in parallel, starting each EventConsumer
// in a separate goroutine. It waits for all EventConsumers to finish before
// returning.
func (ecs EventConsumerGroup) Start(ctx context.Context) {
wg := new(sync.WaitGroup)
for _, ec := range ecs {
Expand All @@ -129,19 +136,12 @@ type EventConsumer struct {

// consumer workers
func (ec *EventConsumer) Start(ctx context.Context) error {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

defer ec.Close()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-sigterm:
// When SIGTERM received, try to flush remaining messages
// and exit gracefully
return nil
default:
m, err := ec.reader.ReadMessage(ctx)
if err != nil {
Expand All @@ -157,7 +157,7 @@ func (ec *EventConsumer) Start(ctx context.Context) error {
result := ec.handler.Handle(event)
if !cloudevents.IsACK(result) {
log.Error().Err(err).Msg("error handling event")
// ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value})
ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value})
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions cloudevents-server/pkg/kafka/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Producer struct {
}

type Consumer struct {
GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"`
TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic.
GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"`
TopicMapping map[string]string `yaml:"topic_mapping,omitempty" json:"topic_mapping,omitempty"` // event type to topic.
DeadLetterTopic string `yaml:"dead_letter_topic,omitempty" json:"dead_letter_topic,omitempty"`
}

0 comments on commit eadd400

Please sign in to comment.