Skip to content

Commit

Permalink
feat: Add PubSub configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Neurostep committed Mar 23, 2022
1 parent 9bd7855 commit 166fd7e
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 0 deletions.
55 changes: 55 additions & 0 deletions pkg/pubsub/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pubsub

import (
"fmt"
"time"

cbuilder "github.com/scribd/go-sdk/internal/pkg/configuration/builder"
)

type (
Config struct {
Kafka Kafka `mapstructure:"kafka"`
}

Publisher struct {
MaxAttempts int `mapstructure:"max_attempts"`
WriteTimeout time.Duration `mapstructure:"write_timeout"`
Topic string `mapstructure:"topic"`
Enabled bool `mapstructure:"enabled"`
}

Subscriber struct {
Topic string `mapstructure:"topic"`
GroupId string `mapstructure:"group_id"`
Enabled bool `mapstructure:"enabled"`
}

Kafka struct {
BrokerUrls []string `mapstructure:"broker_urls"`
ClientId string `mapstructure:"client_id"`
Cert string `mapstructure:"cert_pem"`
CertKey string `mapstructure:"cert_pem_key"`
SecurityProtocol string `mapstructure:"security_protocol"`
Publisher Publisher `mapstructure:"publisher"`
Subscriber Subscriber `mapstructure:"subscriber"`
SSLVerificationEnabled bool `mapstructure:"ssl_verification_enabled"`
}
)

// NewConfig returns a new Config instance.
func NewConfig() (*Config, error) {
config := &Config{}
viperBuilder := cbuilder.New("pubsub")

vConf, err := viperBuilder.Build()
if err != nil {
return config, err
}

if err = vConf.Unmarshal(config); err != nil {
return config, fmt.Errorf("unable to decode into struct: %s", err.Error())
}

return config, nil
}
77 changes: 77 additions & 0 deletions pkg/pubsub/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package pubsub

import (
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewConfig(t *testing.T) {
testCases := []struct {
name string
wantError bool
}{
{
name: "NewWithoutConfigFileFails",
wantError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, err := NewConfig()

gotError := err != nil
assert.Equal(t, gotError, tc.wantError)
})
}
}

func TestNewConfigWithAppRoot(t *testing.T) {
testCases := []struct {
name string
env string
kafka Kafka
}{
{
name: "NewWithConfigFileWorks",
env: "test",
kafka: Kafka{
BrokerUrls: []string{"localhost:9092"},
ClientId: "test-app",
Cert: "pem string",
CertKey: "pem key",
SecurityProtocol: "ssl",
Publisher: Publisher{
MaxAttempts: 3,
WriteTimeout: 10 * time.Second,
Topic: "test-topic",
},
Subscriber: Subscriber{
Topic: "test-topic",
},
},
},
}

currentAppRoot := os.Getenv("APP_ROOT")
defer os.Setenv("APP_ROOT", currentAppRoot)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, filename, _, _ := runtime.Caller(0)
tmpRootParent := filepath.Dir(filename)
os.Setenv("APP_ROOT", filepath.Join(tmpRootParent, "testdata"))

c, err := NewConfig()
require.Nil(t, err)

assert.Equal(t, c.Kafka, tc.kafka)
})
}
}
28 changes: 28 additions & 0 deletions pkg/pubsub/testdata/config/pubsub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
common: &common
kafka:

test: &test
<<: *common
kafka:
# Set by APP_PUBSUB_KAFKA_BROKER_URLS env variable
broker_urls:
- "localhost:9092"
# Set by APP_PUBSUB_KAFKA_CLIENT_ID env variable
client_id: "test-app"
# Set by APP_PUBSUB_KAFKA_CERT_PEM env variable
cert_pem: "pem string"
# Set by APP_PUBSUB_KAFKA_CERT_PEM_KEY env variable
cert_pem_key: "pem key"
# Set by APP_PUBSUB_KAFKA_MAX_ATTEMPTS env variable
security_protocol: "ssl"
ssl_verification_enabled: true
publisher:
max_attempts: 3
write_timeout: "10s"
topic: "test-topic"
subscriber:
topic: "test-topic"
group_id: ""

development:
<<: *test

0 comments on commit 166fd7e

Please sign in to comment.