This repository has been archived by the owner on Nov 3, 2024. It is now read-only.
forked from streamingfast/bstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblockstream.go
119 lines (102 loc) · 3.25 KB
/
blockstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package hub
import (
"context"
"fmt"
"github.com/streamingfast/bstream"
ggrpcserver "github.com/streamingfast/dgrpc/server"
"github.com/streamingfast/logging"
pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1"
pbheadinfo "github.com/streamingfast/pbgo/sf/headinfo/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
// implementation of blockstream.Server from the hub
func (h *ForkableHub) NewBlockstreamServer(dgrpcServer ggrpcserver.Server) *BlockstreamServer {
bs := &BlockstreamServer{
hub: h,
dgrpcServer: dgrpcServer,
}
pbheadinfo.RegisterHeadInfoServer(dgrpcServer.ServiceRegistrar(), bs)
pbbstream.RegisterBlockStreamServer(dgrpcServer.ServiceRegistrar(), bs)
return bs
}
type BlockstreamServer struct {
hub *ForkableHub
dgrpcServer ggrpcserver.Server
}
func (s *BlockstreamServer) Launch(serverAddr string) {
<-s.hub.Ready
zlog.Info("blockstream server hub ready, launching", zap.String("server_addr", serverAddr))
go s.dgrpcServer.Launch(serverAddr)
}
func (s *BlockstreamServer) Close() {
s.dgrpcServer.Shutdown(0)
}
func (s *BlockstreamServer) GetHeadInfo(ctx context.Context, req *pbheadinfo.HeadInfoRequest) (*pbheadinfo.HeadInfoResponse, error) {
num, id, t, libNum, err := s.hub.HeadInfo()
if err != nil {
return nil, err
}
resp := &pbheadinfo.HeadInfoResponse{
LibNum: libNum,
HeadNum: num,
HeadID: id,
HeadTime: timestamppb.New(t),
}
return resp, nil
}
func (s *BlockstreamServer) Blocks(r *pbbstream.BlockRequest, stream pbbstream.BlockStream_BlocksServer) error {
logger := logging.Logger(stream.Context(), zlog).Named("sub").Named(r.Requester)
logger.Info("receive block request", zap.Reflect("request", r))
h := streamHandler(stream, logger)
var source bstream.Source
if r.Burst == -1 {
_, _, _, libNum, err := s.hub.HeadInfo()
if err != nil {
return err
}
source = s.hub.SourceFromBlockNumWithForks(libNum, h)
} else if r.Burst < -1 {
desiredBlock := uint64(-r.Burst)
if lowestHub := s.hub.LowestBlockNum(); lowestHub > desiredBlock {
desiredBlock = lowestHub
}
source = s.hub.SourceFromBlockNumWithForks(desiredBlock, h)
} else {
headNum, _, _, _, err := s.hub.HeadInfo()
if err != nil {
return err
}
var desiredBlock uint64
if uint64(r.Burst) > headNum || headNum-uint64(r.Burst) < bstream.GetProtocolFirstStreamableBlock {
desiredBlock = bstream.GetProtocolFirstStreamableBlock
} else {
desiredBlock = headNum - uint64(r.Burst)
}
if lowestHub := s.hub.LowestBlockNum(); lowestHub > desiredBlock {
desiredBlock = lowestHub
}
source = s.hub.SourceFromBlockNumWithForks(desiredBlock, h)
}
if source == nil {
return fmt.Errorf("cannot get source for request %+v", r)
}
source.Run()
<-source.Terminated()
if err := source.Err(); err != nil {
return err
}
return nil
}
func streamHandler(stream pbbstream.BlockStream_BlocksServer, logger *zap.Logger) bstream.Handler {
return bstream.HandlerFunc(
func(blk *bstream.Block, _ interface{}) error {
block, err := blk.ToProto()
if err != nil {
panic(fmt.Errorf("unable to transform from bstream.Block to StreamableBlock: %w", err))
}
err = stream.Send(block)
logger.Debug("block sent to stream", zap.Stringer("block", blk), zap.Error(err))
return err
})
}