Skip to content

Commit 1576521

Browse files
committed
feat: Add PubSub configuration
1 parent 9bd7855 commit 1576521

File tree

3 files changed

+161
-0
lines changed

3 files changed

+161
-0
lines changed

pkg/pubsub/config.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package pubsub
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
cbuilder "github.com/scribd/go-sdk/internal/pkg/configuration/builder"
8+
)
9+
10+
type (
11+
Config struct {
12+
Kafka Kafka `mapstructure:"kafka"`
13+
}
14+
15+
Publisher struct {
16+
MaxAttempts int `mapstructure:"max_attempts"`
17+
WriteTimeout time.Duration `mapstructure:"write_timeout"`
18+
Topic string `mapstructure:"topic"`
19+
Enabled bool `mapstructure:"enabled"`
20+
}
21+
22+
Subscriber struct {
23+
Topic string `mapstructure:"topic"`
24+
GroupId string `mapstructure:"group_id"`
25+
Enabled bool `mapstructure:"enabled"`
26+
}
27+
28+
Kafka struct {
29+
BrokerUrls []string `mapstructure:"broker_urls"`
30+
ClientId string `mapstructure:"client_id"`
31+
Cert string `mapstructure:"cert_pem"`
32+
CertKey string `mapstructure:"cert_pem_key"`
33+
SecurityProtocol string `mapstructure:"security_protocol"`
34+
Publisher Publisher `mapstructure:"publisher"`
35+
Subscriber Subscriber `mapstructure:"subscriber"`
36+
SSLVerificationEnabled bool `mapstructure:"ssl_verification_enabled"`
37+
}
38+
)
39+
40+
// NewConfig returns a new Config instance.
41+
func NewConfig() (*Config, error) {
42+
config := &Config{}
43+
viperBuilder := cbuilder.New("pubsub")
44+
45+
vConf, err := viperBuilder.Build()
46+
if err != nil {
47+
return config, err
48+
}
49+
50+
if err = vConf.Unmarshal(config); err != nil {
51+
return config, fmt.Errorf("unable to decode into struct: %s", err.Error())
52+
}
53+
54+
return config, nil
55+
}

pkg/pubsub/config_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package pubsub
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"runtime"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestNewConfig(t *testing.T) {
15+
testCases := []struct {
16+
name string
17+
wantError bool
18+
}{
19+
{
20+
name: "NewWithoutConfigFileFails",
21+
wantError: true,
22+
},
23+
}
24+
25+
for _, tc := range testCases {
26+
t.Run(tc.name, func(t *testing.T) {
27+
_, err := NewConfig()
28+
29+
gotError := err != nil
30+
assert.Equal(t, gotError, tc.wantError)
31+
})
32+
}
33+
}
34+
35+
func TestNewConfigWithAppRoot(t *testing.T) {
36+
testCases := []struct {
37+
name string
38+
env string
39+
kafka Kafka
40+
}{
41+
{
42+
name: "NewWithConfigFileWorks",
43+
env: "test",
44+
kafka: Kafka{
45+
BrokerUrls: []string{"localhost:9092"},
46+
ClientId: "test-app",
47+
Cert: "pem string",
48+
CertKey: "pem key",
49+
SecurityProtocol: "ssl",
50+
Publisher: Publisher{
51+
MaxAttempts: 3,
52+
WriteTimeout: 10 * time.Second,
53+
Topic: "test-topic",
54+
},
55+
Subscriber: Subscriber{
56+
Topic: "test-topic",
57+
},
58+
SSLVerificationEnabled: true,
59+
},
60+
},
61+
}
62+
63+
currentAppRoot := os.Getenv("APP_ROOT")
64+
defer os.Setenv("APP_ROOT", currentAppRoot)
65+
66+
for _, tc := range testCases {
67+
t.Run(tc.name, func(t *testing.T) {
68+
_, filename, _, _ := runtime.Caller(0)
69+
tmpRootParent := filepath.Dir(filename)
70+
os.Setenv("APP_ROOT", filepath.Join(tmpRootParent, "testdata"))
71+
72+
c, err := NewConfig()
73+
require.Nil(t, err)
74+
75+
assert.Equal(t, c.Kafka, tc.kafka)
76+
})
77+
}
78+
}

pkg/pubsub/testdata/config/pubsub.yml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
common: &common
2+
kafka:
3+
4+
test: &test
5+
<<: *common
6+
kafka:
7+
# Set by APP_PUBSUB_KAFKA_BROKER_URLS env variable
8+
broker_urls:
9+
- "localhost:9092"
10+
# Set by APP_PUBSUB_KAFKA_CLIENT_ID env variable
11+
client_id: "test-app"
12+
# Set by APP_PUBSUB_KAFKA_CERT_PEM env variable
13+
cert_pem: "pem string"
14+
# Set by APP_PUBSUB_KAFKA_CERT_PEM_KEY env variable
15+
cert_pem_key: "pem key"
16+
security_protocol: "ssl"
17+
ssl_verification_enabled: true
18+
publisher:
19+
# Set by APP_PUBSUB_KAFKA_PUBLISHER_MAX_ATTEMPTS env variable
20+
max_attempts: 3
21+
write_timeout: "10s"
22+
topic: "test-topic"
23+
subscriber:
24+
topic: "test-topic"
25+
group_id: ""
26+
27+
development:
28+
<<: *test

0 commit comments

Comments
 (0)