Skip to content

Commit

Permalink
Greatly improve logging behavior of the various source implementations
Browse files Browse the repository at this point in the history
This should greatly improved debuggability of the library. All source must now implement
a `SetLogger` to set the expectation correctly that source should use this logger for
all its logging purposes.

Removed a lot of leftover & dead code along the way.
  • Loading branch information
Matthieu Vachon committed Jul 7, 2020
1 parent 64b1d5e commit 64e1a90
Show file tree
Hide file tree
Showing 30 changed files with 526 additions and 620 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## Unreleased

### Added

- `Tracker` object to streamline queries about different targets (like network head, database lib, relayer blockstream head, whatever other BlockRef tags), ask the question about them being near one another (to select between live mode or catch-up mode). Also streamlines the requests of a start block, with a bunch of different backend implementations that can answer to the questions regarding where to start.
- `JoiningSourceWithTracker` to avoid joining to live when live and file sources are very far apart.
- `HeadBlockRefGetter` and `LIBBlockRefGetter` that targets a `HeadInfo` service, and satisfies the `Tracker` _BlockRefGetter_ func signature.

### Changed

- Greatly improve logging behavior of the various source implementations, this should greatly improved debuggability of the library.
- **BREAKING** All `Source` must now implement a `SetLogger(logger *zap.Logger)` method.
- **BREAKING** Removed all `Name`, `SetName`, and `*Name` options on all source and across `bstream`. Replaced by a proper `*zap.Logger`
instance instead. Re-configure using the logger, you can use `SetLogger(zlog.With("name", "my-source-name"))` to emulate
the old behavior.

## [v0.0.1] - 2020-06-22

### Added
Expand Down
39 changes: 26 additions & 13 deletions blockstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ import (
"google.golang.org/grpc/status"
)

type ServerOption func(s *Server)

func ServerOptionWithLogger(logger *zap.Logger) ServerOption {
return func(s *Server) {
s.logger = logger
}
}

type Server struct {
headInfo *headInfo
buffer *bstream.Buffer
Expand All @@ -41,18 +49,25 @@ type Server struct {
grpcServer *grpc.Server

lock sync.RWMutex

logger *zap.Logger
}

func NewBufferedServer(server *grpc.Server, size int) *Server {
bs := NewServer(server)
bs.buffer = bstream.NewBuffer("blockserver")
func NewBufferedServer(server *grpc.Server, size int, opts ...ServerOption) *Server {
bs := NewServer(server, opts...)
bs.buffer = bstream.NewBuffer("blockserver", bs.logger.Named("buffer"))
bs.bufferSize = size
return bs
}

func NewServer(server *grpc.Server) *Server {
func NewServer(server *grpc.Server, opts ...ServerOption) *Server {
s := &Server{
grpcServer: server,
logger: zlog,
}

for _, opt := range opts {
opt(s)
}

pbheadinfo.RegisterHeadInfoServer(s.grpcServer, s)
Expand Down Expand Up @@ -82,7 +97,7 @@ func (s *Server) GetHeadInfo(ctx context.Context, req *pbheadinfo.HeadInfoReques
}

func (s *Server) Blocks(r *pbbstream.BlockRequest, stream pbbstream.BlockStream_BlocksServer) error {
zlog.Info("receive block request", zap.String("requester", r.Requester), zap.Reflect("request", r))
s.logger.Info("receive block request", zap.String("requester", r.Requester), zap.Reflect("request", r))
subscription := s.subscribe(int(r.Burst), r.Requester)
if subscription == nil {
return fmt.Errorf("failed to create subscription for subscriber: %s", r.Requester)
Expand Down Expand Up @@ -158,7 +173,7 @@ func (s *Server) PushBlock(blk *bstream.Block) error {

for _, sub := range s.subscriptions {
if sub.closed {
zlog.Info("not pushing block to a closed subscription", zap.String("subscriber", sub.subscriber))
sub.logger.Info("not pushing block to a closed subscription")
continue
}
sub.Push(blk)
Expand All @@ -185,21 +200,19 @@ func (s *Server) subscribe(requestedBurst int, subscriber string) *subscription
}
}

sub := newSubscription(chanSize)
sub.SetSubscriber(subscriber)

zlog.Info("sending burst", zap.Int("busrt_size", len(blocks)), zap.String("subscriber", subscriber))
sub := newSubscription(chanSize, s.logger.Named("sub").Named(subscriber))

sub.logger.Info("sending burst", zap.Int("busrt_size", len(blocks)))
for _, blk := range blocks {
if sub.closed {
zlog.Info("subscription closed during burst", zap.Int("busrt_size", len(blocks)), zap.String("subscriber", subscriber))
sub.logger.Info("subscription closed during burst", zap.Int("busrt_size", len(blocks)))
return nil
}
sub.Push(blk.(*bstream.Block))
}

s.subscriptions = append(s.subscriptions, sub)
zlog.Info("subscribed", zap.Int("new_length", len(s.subscriptions)), zap.String("subscriber", subscriber))
s.logger.Info("subscribed", zap.Int("new_length", len(s.subscriptions)), zap.String("subscriber", subscriber))

return sub
}
Expand All @@ -216,5 +229,5 @@ func (s *Server) unsubscribe(toRemove *subscription) {
}

s.subscriptions = newListeners
zlog.Info("unsubscribed", zap.Int("new_length", len(s.subscriptions)))
s.logger.Info("unsubscribed", zap.Int("new_length", len(s.subscriptions)))
}
63 changes: 32 additions & 31 deletions blockstream/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"fmt"
"time"

pbbstream "github.com/dfuse-io/pbgo/dfuse/bstream/v1"
"github.com/dfuse-io/shutter"
"github.com/dfuse-io/bstream"
"github.com/dfuse-io/dgrpc"
pbbstream "github.com/dfuse-io/pbgo/dfuse/bstream/v1"
"github.com/dfuse-io/shutter"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand All @@ -38,23 +38,34 @@ type Source struct {
preprocThreads int
gator bstream.Gator

name string
requester string
logger *zap.Logger
}

type SourceOption = func(s *Source)

func WithRequester(requester string) SourceOption {
return func(s *Source) {
s.requester = requester
}
}

func WithLogger(logger *zap.Logger) SourceOption {
return func(s *Source) {
s.logger = logger
}
}

func WithTimeThresholdGator(threshold time.Duration) SourceOption {
return func(s *Source) {
zlog.Info("setting time gator",
zap.Duration("threshold", threshold),
)
s.logger.Info("setting time gator", zap.Duration("threshold", threshold))
s.gator = bstream.NewTimeThresholdGator(threshold)
}
}

func WithNumGator(blockNum uint64, exclusive bool) SourceOption {
return func(s *Source) {
zlog.Info("setting num gator", zap.Uint64("block_num", blockNum), zap.Bool("exclusive", exclusive))
s.logger.Info("setting num gator", zap.Uint64("block_num", blockNum), zap.Bool("exclusive", exclusive))
if exclusive {
s.gator = bstream.NewExclusiveBlockNumberGator(blockNum)
} else {
Expand All @@ -63,13 +74,6 @@ func WithNumGator(blockNum uint64, exclusive bool) SourceOption {
}
}

func WithName(name string) SourceOption {
return func(s *Source) {
s.name = name
}

}

func WithParallelPreproc(f bstream.PreprocessFunc, threads int) SourceOption {
return func(s *Source) {
s.preprocFunc = f
Expand All @@ -90,16 +94,18 @@ func NewSource(
burst: burst,
handler: h,
Shutter: shutter.New(),
name: "default",
logger: zlog,
}

for _, option := range options {
option(s)
}

return s
}

func (s *Source) SetName(name string) {
s.name = name
func (s *Source) SetLogger(logger *zap.Logger) {
s.logger = logger
}

func (s *Source) SetParallelPreproc(f bstream.PreprocessFunc, threads int) {
Expand All @@ -108,8 +114,6 @@ func (s *Source) SetParallelPreproc(f bstream.PreprocessFunc, threads int) {
}

func (s *Source) Run() {
zlogger := zlog.With(zap.String("subscriber", s.name))

var transport *grpc.ClientConn
err := s.LockedInit(func() error {
var err error
Expand All @@ -120,7 +124,7 @@ func (s *Source) Run() {

s.OnTerminating(func(_ error) {
if err := transport.Close(); err != nil {
zlogger.Info("failed closing client transport on shutdown", zap.Error(err))
s.logger.Info("failed closing client transport on shutdown", zap.Error(err))
}
})

Expand All @@ -137,26 +141,23 @@ func (s *Source) Run() {
}

func (s *Source) run(client pbbstream.BlockStreamClient) (err error) {
zlogger := zlog.With(zap.String("subscriber", s.name))

blocksStreamer, err := client.Blocks(s.ctx, &pbbstream.BlockRequest{
Burst: s.burst,
Requester: s.name,
Requester: s.requester,
})
if err != nil {
return fmt.Errorf("failed to strart block source streamer: %s", err)
}

zlogger.Info("starting block source consumption")

s.readStream(zlogger, blocksStreamer)
s.logger.Info("starting block source consumption")
s.readStream(blocksStreamer)
s.logger.Info("source shutting down", zap.Error(s.Err()))

zlogger.Info("source shutting down", zap.Error(s.Err()))
return s.Err()
}

func (s *Source) readStream(zlogger *zap.Logger, client pbbstream.BlockStream_BlocksClient) {
zlogger.Info("block stream source reading messages")
func (s *Source) readStream(client pbbstream.BlockStream_BlocksClient) {
s.logger.Info("block stream source reading messages")

blkchan := make(chan chan *bstream.PreprocessedBlock, s.preprocThreads)
go func() {
Expand All @@ -169,12 +170,12 @@ func (s *Source) readStream(zlogger *zap.Logger, client pbbstream.BlockStream_Bl

blk, err := bstream.BlockFromProto(response)
if err != nil {
s.Shutdown(fmt.Errorf("unable to transform StreamableBlock to bstream.Block: %s", err))
s.Shutdown(fmt.Errorf("unable to transform to bstream.Block: %w", err))
return
}

if s.gator != nil && !s.gator.Pass(blk) {
zlog.Debug("gator not passed dropping block")
s.logger.Debug("gator not passed dropping block")
continue
}

Expand Down
8 changes: 5 additions & 3 deletions blockstream/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"testing"
"time"

"github.com/dfuse-io/bstream"
pbbstream "github.com/dfuse-io/pbgo/dfuse/bstream/v1"
"github.com/dfuse-io/shutter"
"github.com/dfuse-io/bstream"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -177,9 +177,10 @@ func TestSourcePreprocessShutdown(t *testing.T) {
Shutter: shutter.New(),
ctx: context.Background(),
handler: test.handler,
name: "testRunClient",
preprocFunc: test.preprocFunc,
preprocThreads: test.preprocThreads,
logger: zlog,
requester: "testRunClient",
}

time.AfterFunc(time.Millisecond*2, func() {
Expand Down Expand Up @@ -261,9 +262,10 @@ func TestSourceRunPreprocess(t *testing.T) {
Shutter: shutter.New(),
ctx: context.Background(),
handler: bstream.HandlerFunc(procFunc),
name: "testRunClient",
preprocFunc: test.preprocFunc,
preprocThreads: test.preprocThreads,
logger: zlog,
requester: "testRunClient",
}

go func() {
Expand Down
16 changes: 9 additions & 7 deletions blockstream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"go.uber.org/zap"
)

func newSubscription(chanSize int) (out *subscription) {
func newSubscription(chanSize int, logger *zap.Logger) (out *subscription) {
return &subscription{
incomingBlock: make(chan *bstream.Block, chanSize),
logger: logger,
}
}

Expand All @@ -32,28 +33,29 @@ type subscription struct {
closed bool

incomingBlock chan *bstream.Block
subscriber string

logger *zap.Logger
}

func (s *subscription) SetSubscriber(subscriber string) {
s.subscriber = subscriber
func (s *subscription) SetLogger(logger *zap.Logger) {
s.logger = logger
}

func (s *subscription) Push(blk *bstream.Block) {
if len(s.incomingBlock) == cap(s.incomingBlock) {
s.quitOnce.Do(func() {
zlog.Info("reach max buffer size for subcription, closing channel", zap.String("subscriber", s.subscriber), zap.Int("capacity", cap(s.incomingBlock)))
s.logger.Info("reach max buffer size for subcription, closing channel", zap.Int("capacity", cap(s.incomingBlock)))
s.closed = true
close(s.incomingBlock)
})
return
}

if s.closed {
zlog.Info("Warning: Pushing block in a close subscription", zap.String("subscriber", s.subscriber), zap.Int("capacity", cap(s.incomingBlock)))
s.logger.Info("Warning: Pushing block in a close subscription", zap.Int("capacity", cap(s.incomingBlock)))
return
}

zlog.Debug("subscription writing accepted block", zap.String("subscriber", s.subscriber), zap.String("subscriber", s.subscriber), zap.Int("channel_len", len(s.incomingBlock)))
s.logger.Debug("subscription writing accepted block", zap.Int("channel_len", len(s.incomingBlock)))
s.incomingBlock <- blk
}
14 changes: 7 additions & 7 deletions blockstream/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func NewTestSubscription(chanSize int) *subscription {
return newSubscription(chanSize)
return newSubscription(chanSize, zlog)
}

func TestNewSubscription(t *testing.T) {
Expand All @@ -35,14 +35,14 @@ func TestNewSubscription(t *testing.T) {
name: "sunny path",
subscriptionBufferSize: 3,
pushedMessages: []*bstream.Block{
bstream.TestBlock("00000003a","00000002a"),
bstream.TestBlock("00000001a","00000000a"),
bstream.TestBlock("00000002a","00000001a"),
bstream.TestBlock("00000003a", "00000002a"),
bstream.TestBlock("00000001a", "00000000a"),
bstream.TestBlock("00000002a", "00000001a"),
},
expectedMessages: []*bstream.Block{
bstream.TestBlock("00000001a","00000000a"),
bstream.TestBlock("00000002a","00000001a"),
bstream.TestBlock("00000003a","00000002a"),
bstream.TestBlock("00000001a", "00000000a"),
bstream.TestBlock("00000002a", "00000001a"),
bstream.TestBlock("00000003a", "00000002a"),
},
},
}
Expand Down
Loading

0 comments on commit 64e1a90

Please sign in to comment.