Skip to content

Commit

Permalink
[CT-644] instantiate grpc stream manager (#1134)
Browse files Browse the repository at this point in the history
* [CT-644] instantiate grpc stream manager

* update type

* update channel type
  • Loading branch information
jayy04 authored Mar 4, 2024
1 parent 7c6b3d9 commit 5e0d678
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 11 deletions.
27 changes: 25 additions & 2 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"

// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
)

var (
Expand Down Expand Up @@ -298,8 +302,9 @@ type App struct {
// module configurator
configurator module.Configurator

IndexerEventManager indexer_manager.IndexerEventManager
Server *daemonserver.Server
IndexerEventManager indexer_manager.IndexerEventManager
GrpcStreamingManager streamingtypes.GrpcStreamingManager
Server *daemonserver.Server

// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is
Expand Down Expand Up @@ -679,6 +684,9 @@ func New(
tkeys[indexer_manager.TransientStoreKey],
indexerFlags.SendOffchainData,
)

app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger)

timeProvider := &timelib.TimeProviderImpl{}

app.EpochsKeeper = *epochsmodulekeeper.NewKeeper(
Expand Down Expand Up @@ -976,6 +984,7 @@ func New(
app.StatsKeeper,
app.RewardsKeeper,
app.IndexerEventManager,
app.GrpcStreamingManager,
txConfig.TxDecoder(),
clobFlags,
rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](),
Expand Down Expand Up @@ -1740,3 +1749,17 @@ func getIndexerFromOptions(
}
return indexerMessageSender, indexerFlags
}

// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified
// options. This function will default to returning a no-op instance.
func getGrpcStreamingManagerFromOptions(
appFlags flags.Flags,
appOpts servertypes.AppOptions,
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
if appFlags.NonValidatingFullNode {
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
}
38 changes: 38 additions & 0 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package grpc

import (
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{}
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
return true
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
) {
}
33 changes: 33 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package grpc

import (
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil)

type NoopGrpcStreamingManager struct{}

func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager {
return &NoopGrpcStreamingManager{}
}

func (sm *NoopGrpcStreamingManager) Enabled() bool {
return false
}

func (sm *NoopGrpcStreamingManager) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
}

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
) {
}
19 changes: 19 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package types

import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

type GrpcStreamingManager interface {
Enabled() bool

// L3+ Orderbook updates.
Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
}
2 changes: 2 additions & 0 deletions protocol/testutil/keeper/clob.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
clobtest "github.com/dydxprotocol/v4-chain/protocol/testutil/clob"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
asskeeper "github.com/dydxprotocol/v4-chain/protocol/x/assets/keeper"
Expand Down Expand Up @@ -214,6 +215,7 @@ func createClobKeeper(
statsKeeper,
rewardsKeeper,
indexerEventManager,
streaming.NewNoopGrpcStreamingManager(),
constants.TestEncodingCfg.TxConfig.TxDecoder(),
flags.GetDefaultClobFlags(),
rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](),
Expand Down
27 changes: 18 additions & 9 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
flags "github.com/dydxprotocol/v4-chain/protocol/x/clob/flags"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
Expand All @@ -31,16 +32,18 @@ type (
UntriggeredConditionalOrders map[types.ClobPairId]*UntriggeredConditionalOrders
PerpetualIdToClobPairId map[uint32][]types.ClobPairId

subaccountsKeeper types.SubaccountsKeeper
assetsKeeper types.AssetsKeeper
bankKeeper types.BankKeeper
blockTimeKeeper types.BlockTimeKeeper
feeTiersKeeper types.FeeTiersKeeper
perpetualsKeeper types.PerpetualsKeeper
pricesKeeper types.PricesKeeper
statsKeeper types.StatsKeeper
rewardsKeeper types.RewardsKeeper
subaccountsKeeper types.SubaccountsKeeper
assetsKeeper types.AssetsKeeper
bankKeeper types.BankKeeper
blockTimeKeeper types.BlockTimeKeeper
feeTiersKeeper types.FeeTiersKeeper
perpetualsKeeper types.PerpetualsKeeper
pricesKeeper types.PricesKeeper
statsKeeper types.StatsKeeper
rewardsKeeper types.RewardsKeeper

indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.GrpcStreamingManager

memStoreInitialized *atomic.Bool

Expand Down Expand Up @@ -82,6 +85,7 @@ func NewKeeper(
statsKeeper types.StatsKeeper,
rewardsKeeper types.RewardsKeeper,
indexerEventManager indexer_manager.IndexerEventManager,
grpcStreamingManager streamingtypes.GrpcStreamingManager,
txDecoder sdk.TxDecoder,
clobFlags flags.ClobFlags,
placeOrderRateLimiter rate_limit.RateLimiter[*types.MsgPlaceOrder],
Expand All @@ -107,6 +111,7 @@ func NewKeeper(
statsKeeper: statsKeeper,
rewardsKeeper: rewardsKeeper,
indexerEventManager: indexerEventManager,
streamingManager: grpcStreamingManager,
memStoreInitialized: &atomic.Bool{},
txDecoder: txDecoder,
mevTelemetryConfig: MevTelemetryConfig{
Expand Down Expand Up @@ -136,6 +141,10 @@ func (k Keeper) GetIndexerEventManager() indexer_manager.IndexerEventManager {
return k.indexerEventManager
}

func (k Keeper) GetGrpcStreamingManager() streamingtypes.GrpcStreamingManager {
return k.streamingManager
}

func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With(
log.ModuleKey, "x/clob",
Expand Down

0 comments on commit 5e0d678

Please sign in to comment.