Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added configurable Kafka retention time #244

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/gobmp/gobmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
srcPort int
perfPort int
kafkaSrv string
kafkaTpRetnTimeMs string // Kafka topic retention time in ms
natsSrv string
intercept string
splitAF string
Expand All @@ -38,6 +39,7 @@ func init() {
flag.IntVar(&srcPort, "source-port", 5000, "port exposed to outside")
flag.IntVar(&dstPort, "destination-port", 5050, "port openBMP is listening")
flag.StringVar(&kafkaSrv, "kafka-server", "", "URL to access Kafka server")
flag.StringVar(&kafkaTpRetnTimeMs, "kafka-topic-retention-time-ms", "900000", "Kafka topic retention time in ms, default is 900000 ms i.e 15 minutes")
flag.StringVar(&natsSrv, "nats-server", "", "URL to access NATS server")
flag.StringVar(&intercept, "intercept", "false", "When intercept set \"true\", all incomming BMP messges will be copied to TCP port specified by destination-port, otherwise received BMP messages will be published to Kafka.")
flag.StringVar(&splitAF, "split-af", "true", "When set \"true\" (default) ipv4 and ipv6 will be published in separate topics. if set \"false\" the same topic will be used for both address families.")
Expand Down Expand Up @@ -79,7 +81,7 @@ func main() {
}
glog.V(5).Infof("NATS publisher has been successfully initialized.")
default:
publisher, err = kafka.NewKafkaPublisher(kafkaSrv)
publisher, err = kafka.NewKafkaPublisher(kafkaSrv, kafkaTpRetnTimeMs)
if err != nil {
glog.Errorf("failed to initialize Kafka publisher with error: %+v", err)
os.Exit(1)
Expand Down
4 changes: 3 additions & 1 deletion cmd/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (

var (
msgSrvAddr string
topicRetnTimeMs string
file string
delay int
iterations int
)

func init() {
flag.StringVar(&msgSrvAddr, "message-server", "", "URL to the messages supplying server")
flag.StringVar(&topicRetnTimeMs, "topic-retention-time-ms", "900000", "Kafka topic retention time in ms, default is 900000 ms i.e 15 minutes")
flag.StringVar(&file, "msg-file", "/tmp/messages.json", "File with the bmp messages to replay")
flag.IntVar(&delay, "delay", 0, "Delay in seconds to add between sending messages")
flag.IntVar(&iterations, "iterations", 1, "Number of iterations to replay messages")
Expand All @@ -44,7 +46,7 @@ func main() {
defer f.Close()

// Initializing publisher process
publisher, err := kafka.NewKafkaPublisher(msgSrvAddr)
publisher, err := kafka.NewKafkaPublisher(msgSrvAddr, topicRetnTimeMs)
if err != nil {
glog.Errorf("fail to initialize Kafka publisher with error: %+v", err)
os.Exit(1)
Expand Down
14 changes: 2 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,17 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhe
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -107,8 +97,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
Expand Down
10 changes: 4 additions & 6 deletions pkg/kafka/kafka-publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
var (
brockerConnectTimeout = 120 * time.Second
topicCreateTimeout = 1 * time.Second
// goBMP topic's retention timer is 15 minutes
topicRetention = "900000"
)

var (
Expand Down Expand Up @@ -144,7 +142,7 @@ func (p *publisher) Stop() {
}

// NewKafkaPublisher instantiates a new instance of a Kafka publisher
func NewKafkaPublisher(kafkaSrv string) (pub.Publisher, error) {
func NewKafkaPublisher(kafkaSrv, kafkaTpRetnTimeMs string) (pub.Publisher, error) {
glog.Infof("Initializing Kafka producer client")
if err := validator(kafkaSrv); err != nil {
glog.Errorf("Failed to validate Kafka server address %s with error: %+v", kafkaSrv, err)
Expand All @@ -169,7 +167,7 @@ func NewKafkaPublisher(kafkaSrv string) (pub.Publisher, error) {
glog.V(5).Infof("Connected to broker: %s id: %d\n", br.Addr(), br.ID())

for _, t := range topicNames {
if err := ensureTopic(br, topicCreateTimeout, t); err != nil {
if err := ensureTopic(br, topicCreateTimeout, t, kafkaTpRetnTimeMs); err != nil {
glog.Errorf("New Kafka publisher failed to ensure requested topics with error: %+v", err)
return nil, err
}
Expand Down Expand Up @@ -224,14 +222,14 @@ func validator(addr string) error {
return nil
}

func ensureTopic(br *sarama.Broker, timeout time.Duration, topicName string) error {
func ensureTopic(br *sarama.Broker, timeout time.Duration, topicName, kafkaTpRetnTimeMs string) error {
umkhan123 marked this conversation as resolved.
Show resolved Hide resolved
topic := &sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
topicName: {
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: map[string]*string{
"retention.ms": &topicRetention,
"retention.ms": &kafkaTpRetnTimeMs,
},
},
},
Expand Down