Skip to content

Commit

Permalink
fix prod pubsub, collect errors.
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Nichols <nicholss@google.com>
  • Loading branch information
Scott Nichols committed Aug 12, 2019
1 parent bba4603 commit 7b61368
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 46 deletions.
3 changes: 2 additions & 1 deletion cmd/samples/pubsub/multisender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"os"
"strings"

"github.com/kelseyhightower/envconfig"

"github.com/cloudevents/sdk-go"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
cloudeventspubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
"github.com/kelseyhightower/envconfig"
)

type envConfig struct {
Expand Down
117 changes: 72 additions & 45 deletions pkg/cloudevents/transport/pubsub/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package pubsub

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"
"strings"
"sync"

"cloud.google.com/go/pubsub"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
Expand Down Expand Up @@ -211,64 +213,89 @@ func (t *Transport) HasConverter() bool {
return t.Converter != nil
}

func (t *Transport) startSubscriber(ctx context.Context, sub subscriptionWithTopic, done func(error)) {
logger := cecontext.LoggerFrom(ctx)
logger.Infof("starting subscriber for Topic %q, Subscription %q", sub.topicID, sub.subscriptionID)
conn := t.getOrCreateConnection(ctx, sub.topicID, sub.subscriptionID)

logger.Info("conn is", conn)
if conn == nil {
err := fmt.Errorf("failed to find connection for Topic: %q, Subscription: %q", sub.topicID, sub.subscriptionID)
done(err)
return
}
// Ok, ready to start pulling.
err := conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
logger.Info("got an event!")
msg := &Message{
Attributes: m.Attributes,
Data: m.Data,
}
event, err := t.codec.Decode(ctx, msg)
// If codec returns and error, try with the converter if it is set.
if err != nil && t.HasConverter() {
event, err = t.Converter.Convert(ctx, msg, err)
}
if err != nil {
logger.Errorw("failed to decode message", zap.Error(err))
m.Nack()
return
}

if err := t.Receiver.Receive(ctx, *event, nil); err != nil {
logger.Warnw("pubsub receiver return err", zap.Error(err))
m.Nack()
return
}
m.Ack()
})
done(err)
}

// StartReceiver implements Transport.StartReceiver
// NOTE: This is a blocking call.
func (t *Transport) StartReceiver(ctx context.Context) error {
logger := cecontext.LoggerFrom(ctx)

// Load the codec.
if ok := t.loadCodec(ctx); !ok {
return fmt.Errorf("unknown encoding set on transport: %d", t.Encoding)
}

//cctx, cancel := context.WithCancel(ctx)
//defer cancel()
// TODO: real way is to setup a inner context with cancel and then call cancel
// when the parent called done down. but for now, hack hack hack.
cctx, cancel := context.WithCancel(ctx)
n := len(t.subscriptions)

// TODO: This is demo code.
var err error
// Make the channels for quit and errors.
quit := make(chan struct{}, n)
errc := make(chan error, n)

startSubscriber := func(sub subscriptionWithTopic) {
logger.Infof("starting subscriber for Topic %q, Subscription %q", sub.topicID, sub.subscriptionID)
conn := t.getOrCreateConnection(ctx, sub.topicID, sub.subscriptionID)

logger.Info("conn is", conn)
if conn == nil {
err = fmt.Errorf("failed to find connection for Topic: %q, Subscription: %q", sub.topicID, sub.subscriptionID)
return
}
// Ok, ready to start pulling.
err = conn.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
logger.Info("got an event!")
msg := &Message{
Attributes: m.Attributes,
Data: m.Data,
}
event, err := t.codec.Decode(ctx, msg)
// If codec returns and error, try with the converter if it is set.
if err != nil && t.HasConverter() {
event, err = t.Converter.Convert(ctx, msg, err)
}
// Start up each subscription.
for _, sub := range t.subscriptions {
go t.startSubscriber(cctx, sub, func(err error) {
if err != nil {
logger.Errorw("failed to decode message", zap.Error(err))
m.Nack()
return
errc <- err
} else {
quit <- struct{}{}
}

if err := t.Receiver.Receive(ctx, *event, nil); err != nil {
logger.Warnw("pubsub receiver return err", zap.Error(err))
m.Nack()
return
}
m.Ack()
})
}

for _, sub := range t.subscriptions {
go startSubscriber(sub)
// Block for parent context to finish.
<-ctx.Done()
cancel()

// Collect errors and done calls until we have n of them.
errs := []string(nil)
for success := 0; success < n; success++ {
var err error
select {
case err = <-errc:
case <-quit:
}
if err != nil {
errs = append(errs, err.Error())
}
}
close(quit)
close(errc)

<-ctx.Done()
return err
return errors.New(strings.Join(errs, "\n"))
}

0 comments on commit 7b61368

Please sign in to comment.