Skip to content

Commit

Permalink
flush updates before snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 committed Jun 20, 2024
1 parent 6d2d529 commit 3126ae3
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func NewGrpcStreamingManager(
for {
select {
case <-grpcStreamingManager.ticker.C:
grpcStreamingManager.Lock()
grpcStreamingManager.FlushStreamUpdates()
grpcStreamingManager.Unlock()
case <-grpcStreamingManager.done:
grpcStreamingManager.logger.Info(
"GRPC Stream poller goroutine shutting down",
Expand Down Expand Up @@ -205,9 +207,9 @@ func (sm *GrpcStreamingManagerImpl) Stop() {
sm.done <- true
}

// SendSnapshot groups updates by their clob pair ids and
// sends messages to the subscribers. It groups out updates differently
// and bypasses the buffer.
// SendSnapshot sends messages to a particular subscriber without buffering.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *GrpcStreamingManagerImpl) SendSnapshot(
offchainUpdates *clobtypes.OffchainUpdates,
subscriptionId uint32,
Expand Down Expand Up @@ -388,16 +390,15 @@ func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache(
}

// FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcFlushUpdatesLatency,
time.Now(),
)

sm.Lock()
defer sm.Unlock()

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
idsToRemove := make([]uint32, 0)
Expand Down Expand Up @@ -446,6 +447,10 @@ func (sm *GrpcStreamingManagerImpl) InitializeNewGrpcStreams(
sm.Lock()
defer sm.Unlock()

// Flush any pending updates before sending the snapshot to avoid
// race conditions with the snapshot.
sm.FlushStreamUpdates()

updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)
for subscriptionId, subscription := range sm.orderbookSubscriptions {
subscription.initialize.Do(
Expand Down

0 comments on commit 3126ae3

Please sign in to comment.