Skip to content

Commit

Permalink
Merge pull request #1 from crberube/pubsub
Browse files Browse the repository at this point in the history
fix issues raised in checkr/flagr PR #209
  • Loading branch information
vic3lord authored Feb 5, 2019
2 parents a5aee07 + b8b5f06 commit 2ff5b65
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 24 deletions.
10 changes: 5 additions & 5 deletions pkg/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ var Config = struct {
RecorderKinesisVerbose bool `env:"FLAGR_RECORDER_KINESIS_VERBOSE" envDefault:"false"`

// Pubsub related configurations for data records logging (Flagr Metrics)
RecorderPubsubProjectID string `env:"FLAGR_RECORDER_PUBSUB_PROJECT_ID" envDefault:""`
RecorderPubsubTopicName string `env:"FLAGR_RECORDER_PUBSUB_TOPIC_NAME" envDefault:"flagr-records"`
RecorderPubsubKeyFile string `env:"FLAGR_RECORDER_PUBSUB_KEYFILE" envDefault:""`
RecorderPubsubVerbose bool `env:"FLAGR_RECORDER_PUBSUB_VERBOSE" envDefault:"false"`
RecorderPubsubVerboseCancel time.Duration `env:"FLAGR_RECORDER_PUBSUB_VERBOSE_CANCEL" envDefault:"5s"`
RecorderPubsubProjectID string `env:"FLAGR_RECORDER_PUBSUB_PROJECT_ID" envDefault:""`
RecorderPubsubTopicName string `env:"FLAGR_RECORDER_PUBSUB_TOPIC_NAME" envDefault:"flagr-records"`
RecorderPubsubKeyFile string `env:"FLAGR_RECORDER_PUBSUB_KEYFILE" envDefault:""`
RecorderPubsubVerbose bool `env:"FLAGR_RECORDER_PUBSUB_VERBOSE" envDefault:"false"`
RecorderPubsubVerboseCancelTimeout time.Duration `env:"FLAGR_RECORDER_PUBSUB_VERBOSE_CANCEL_TIMEOUT" envDefault:"5s"`

/**
JWTAuthEnabled enables the JWT Auth
Expand Down
22 changes: 13 additions & 9 deletions pkg/handler/data_recorder_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@ type pubsubRecorder struct {
topic *pubsub.Topic
}

var (
pubsubClient = func() (*pubsub.Client, error) {
return pubsub.NewClient(
context.Background(),
config.Config.RecorderPubsubProjectID,
option.WithCredentialsFile(config.Config.RecorderPubsubKeyFile),
)
}
)

// NewPubsubRecorder creates a new Pubsub recorder
var NewPubsubRecorder = func() DataRecorder {
client, err := pubsub.NewClient(
context.Background(),
config.Config.RecorderPubsubProjectID,
option.WithCredentialsFile(config.Config.RecorderPubsubKeyFile),
)
client, err := pubsubClient()
if err != nil {
// TODO: use Fatal again after fixing the test expecting to not panic.
// logrus.WithField("pubsub_error", err).Fatal("error getting pubsub client")
logrus.WithField("pubsub_error", err).Error("error getting pubsub client")
logrus.WithField("pubsub_error", err).Fatal("error getting pubsub client")
}

return &pubsubRecorder{
Expand Down Expand Up @@ -66,7 +70,7 @@ func (p *pubsubRecorder) AsyncRecord(r *models.EvalResult) {
res := p.topic.Publish(ctx, &pubsub.Message{Data: message})
if config.Config.RecorderPubsubVerbose {
go func() {
ctx, cancel := context.WithTimeout(ctx, config.Config.RecorderPubsubVerboseCancel)
ctx, cancel := context.WithTimeout(ctx, config.Config.RecorderPubsubVerboseCancelTimeout)
defer cancel()
id, err := res.Get(ctx)
if err != nil {
Expand Down
39 changes: 29 additions & 10 deletions pkg/handler/data_recorder_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/checkr/flagr/pkg/util"
"github.com/checkr/flagr/swagger_gen/models"
"github.com/prashantv/gostub"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -67,6 +68,15 @@ func TestPubsubEvalResult(t *testing.T) {

func TestNewPubsubRecorder(t *testing.T) {
t.Run("no panics", func(t *testing.T) {
client := mockClient(t)
defer client.Close()

defer gostub.StubFunc(
&pubsubClient,
client,
nil,
).Reset()

assert.NotPanics(t, func() { NewPubsubRecorder() })
})
}
Expand All @@ -81,17 +91,8 @@ func TestPubsubAsyncRecord(t *testing.T) {
})

t.Run("enabled and valid", func(t *testing.T) {
ctx := context.Background()
srv := pstest.NewServer()
defer srv.Close()
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
t.Fatal("cannot connect to mocked server")
}
defer conn.Close()
client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
client := mockClient(t)
defer client.Close()
assert.NoError(t, err)
topic := client.Topic("test")
assert.NotPanics(t, func() {
pr := &pubsubRecorder{
Expand All @@ -115,3 +116,21 @@ func TestPubsubAsyncRecord(t *testing.T) {
})
})
}

func mockClient(t *testing.T) *pubsub.Client {
ctx := context.Background()
srv := pstest.NewServer()
defer srv.Close()
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
t.Fatal("cannot connect to mocked server")
}
defer conn.Close()
client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))

if err != nil {
t.Fatal("failed creating mock client", err)
}

return client
}

0 comments on commit 2ff5b65

Please sign in to comment.