Skip to content

Commit

Permalink
✨ feat: added kafka pre-service #50
Browse files Browse the repository at this point in the history
  • Loading branch information
pnguyen215 committed Dec 4, 2023
1 parent 6aebc86 commit d1dcea1
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
5 changes: 3 additions & 2 deletions queues/apache_kafka_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type ClusterMultiTenantKafkaConfig struct {
}

type KafkaPublisherRequest struct {
Topic string `json:"topic" binding:"required" yaml:"topic"`
Data interface{} `json:"data" yaml:"data"`
TopicKey string `json:"topic_key" binding:"required" yaml:"topic_key"`
TenantKey string `json:"tenant_key" binding:"required" yaml:"tenant_key"`
Payload map[string]interface{} `json:"payload" yaml:"payload"`
}
39 changes: 38 additions & 1 deletion queues/apache_kafka_producer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package queues

import "github.com/sivaosorg/govm/utils"
import (
"fmt"

"github.com/sivaosorg/govm/utils"
)

func NewKafkaProducerConfig() *KafkaProducerConfig {
return &KafkaProducerConfig{
Expand Down Expand Up @@ -52,3 +56,36 @@ func GetKafkaProducerConfigSample() *KafkaProducerConfig {
AppendProperty("retries", 3).AppendProperty(Bootstrap_Servers, "kafka-broker-3:9092")
return k
}

func NewKafkaPublisherRequest() *KafkaPublisherRequest {
return &KafkaPublisherRequest{}
}

func (k *KafkaPublisherRequest) SetTopicKey(value string) *KafkaPublisherRequest {
k.TopicKey = value
return k
}

func (k *KafkaPublisherRequest) SetTenantKey(value string) *KafkaPublisherRequest {
k.TenantKey = value
return k
}

func (k *KafkaPublisherRequest) SetPayload(value map[string]interface{}) *KafkaPublisherRequest {
k.Payload = value
return k
}

func (k *KafkaPublisherRequest) Json() string {
return utils.ToJson(k)
}

func KafkaPublisherRequestValidator(k KafkaPublisherRequest) error {
if utils.IsEmpty(k.TopicKey) {
return fmt.Errorf("Topic key is required")
}
if utils.IsEmpty(k.TenantKey) {
return fmt.Errorf("Tenant key is required")
}
return nil
}
52 changes: 52 additions & 0 deletions queues/apache_kafka_service.go
Original file line number Diff line number Diff line change
@@ -1 +1,53 @@
package queues

import (
"github.com/sivaosorg/govm/apix"
)

type KafkaService interface {
}

type kafkaServiceImpl struct {
conf []apix.ApiRequestConfig
}

func NewKafkaService(conf []apix.ApiRequestConfig) KafkaService {
return &kafkaServiceImpl{
conf: conf,
}
}

func NewKafkaServiceSlices(conf ...apix.ApiRequestConfig) KafkaService {
return &kafkaServiceImpl{
conf: conf,
}
}

// func (s *kafkaServiceImpl) ProduceCallback(request KafkaPublisherRequest) (*restify.Response, error) {
// err := KafkaPublisherRequestValidator(request)
// if err != nil {
// return nil, err
// }
// if len(s.conf) == 0 {
// return nil, fmt.Errorf("API Conf is required")
// }
// configs, ok := apix.Get(s.conf, request.TenantKey)
// if !ok {
// return nil, fmt.Errorf("Tenant Key undefined: %v", request.TenantKey)
// }
// endpoint, err := configs.GetEndpoint(request.TopicKey)
// if err != nil {
// return nil, err
// }
// if len(request.Payload) > 0 {
// for k, v := range request.Payload {
// endpoint.AppendBodyWith(k, v)
// }
// }
// svc := apix.NewApiService(configs)
// return svc.Do(nil, endpoint)
// }

// func (s *kafkaServiceImpl) ProduceCallbackNoneWait(request KafkaPublisherRequest) {
// go s.ProduceCallback(request)
// }

0 comments on commit d1dcea1

Please sign in to comment.