Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-646] stream offchain updates through stream manager #1138

Merged
merged 6 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,9 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand Down Expand Up @@ -1758,8 +1760,5 @@ func getGrpcStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
if appFlags.NonValidatingFullNode {
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
return streaming.NewGrpcStreamingManager()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add the if statement back in in the next PR

Copy link
Contributor

@coderabbitai coderabbitai bot Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC server is initialized without specifying credentials, which could lead to insecure communication. It's important to ensure that all gRPC communication is encrypted to prevent eavesdropping and tampering by malicious actors. Consider adding SSL/TLS credentials to the gRPC server initialization. You can create credentials using credentials.NewServerTLSFromFile("cert.pem", "cert.key") and include them in the server options.

grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromFile("cert.pem", "cert.key")))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the ec2 instance I am running this on does not support filesystem reads and writes. Can I paste the contents of my .pem file in the code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonfung-dydx Embedding the contents of your .pem file directly in the code is not recommended due to security concerns. A more secure approach would be to use environment variables or a secrets management service provided by your cloud provider. This way, you can securely manage the .pem file contents without exposing them in your source code. For example, you could load the .pem file contents into an environment variable and then access it in your code like so:

cert := os.Getenv("CERT_PEM_CONTENTS")
key := os.Getenv("KEY_PEM_CONTENTS")
creds, err := credentials.NewServerTLSFromString(cert, key)
if err != nil {
    // Handle error
}
grpcServer := grpc.NewServer(grpc.Creds(creds))

Make sure to securely set the environment variables CERT_PEM_CONTENTS and KEY_PEM_CONTENTS with the contents of your .pem files. If you're using AWS, you might consider using AWS Secrets Manager or Parameter Store to securely manage and access these values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I have no money to use AWS secrets manager. What should I do here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonfung-dydx If AWS Secrets Manager is not an option due to cost concerns, you might consider the following approaches to securely manage your .pem file contents for the gRPC server:

  1. Encrypted Environment Variables: You can encrypt the .pem file contents and store them as environment variables. During your application startup, you can decrypt these variables to use them. This approach requires you to manage encryption keys securely, but it avoids storing sensitive information in plain text.

  2. Open-Source Secrets Management Tools: There are several open-source secrets management tools that you can run without significant cost. Tools like HashiCorp Vault or Bitnami Sealed Secrets can help you manage secrets securely. These tools provide mechanisms to store, access, and manage secrets, including certificates, without needing to store them directly in your code or as plain environment variables.

  3. In-Memory Storage: Since your environment does not support filesystem reads/writes, consider loading the .pem contents into memory at the start of your application (e.g., from an encrypted source) and using them directly from there. This approach keeps the sensitive information out of the filesystem and environment variables, reducing the risk of accidental exposure.

Each of these methods has its own set of considerations, especially regarding the secure handling of encryption keys and secret values. It's important to evaluate them in the context of your application's security requirements and operational environment.

}
102 changes: 98 additions & 4 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package grpc

import (
"sync"

"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -9,10 +13,23 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
sync.Mutex
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
clobPairIds []uint32
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{}
return &GrpcStreamingManagerImpl{
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
}
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
Expand All @@ -24,15 +41,92 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
clobPairIds := req.GetClobPairId()

// Perform some basic validation on the request.
if len(clobPairIds) == 0 {
return clobtypes.ErrInvalidGrpcStreamingRequest
}

subscription := &OrderbookSubscription{
clobPairIds: clobPairIds,
srv: srv,
}

sm.Lock()
defer sm.Unlock()

sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription
sm.nextSubscriptionId++

return nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
offchainUpdates *clobtypes.OffchainUpdates,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be now, but do we want to condense the messages somehow? We have this condensing function for indexer, but i don't think it works in this case. these events might be condensed differently.

func (om *OffchainUpdates) CondenseMessagesForReplay() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this gets called before SendOffchainMessages is called? i.e. the message is already condensed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets called during Replay Operations but i feel it might not be used half the time due to this code:

	if replayUpdates != nil {
		offchainUpdates = replayUpdates
	}

I think it should be fine, unless you can think of any edge case where condensing messages drops anything vital for orderbook updates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah should be fine to send for example (place + remove) for example


// Unmarshal messages to v1 updates.
v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for clobPairId, update := range updates {
v1update, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe golang kills the whole process on a panic. Would this lead to the whole validator going bye bye? Do we want to have a recover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was trying to see what makes sense here -

based on my understanding, MMs will run their own full nodes and these full nodes are only used for streaming information to their trading strategies. in this case, if something happens, probably not important to recover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm that sounds good to me. but we should make this extra clear in the grpcstream enabled flag description and in the docs that this feature is intended to be used on full nodes?

Would it even make sense to make sense to add validation that grpcstream can only be enabled on full nodes? Might be overkill. I would be a little suspicious if a consensus-participating validator had this stream turned on of mev.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - planning to add that in my next PR

}
v1updates[clobPairId] = v1update
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: false,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
}
}
}

// Clean up subscriptions that have been closed.
jayy04 marked this conversation as resolved.
Show resolved Hide resolved
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that this delete will properly clean up the nested inner data structures? grpc.ServerStream might have a close function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's no close function 🆗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah wasn't able to find a close function either

not really sure if we need to do ctx, cancelCtx := context.WithCancel(ctx) to cancel the context, since the connection is likely already broken and the context is already done 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/grpc/grpc-go/blob/v1.60.x/stream.go#L262-L278

	// Possible context leak:
	// The cancel function for the child context we create will only be called
	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
	// an error is generated by SendMsg.

I'm assuming an error will be generated by SendMsg in this case so the cancel function will be called.

}
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, message := range offchainUpdates.Messages {
var update ocutypes.OffChainUpdateV1
err := proto.Unmarshal(message.Message.Value, &update)
if err != nil {
return nil, err
}
v1updates = append(v1updates, update)
}
return v1updates, nil
}
3 changes: 1 addition & 2 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
return clobtypes.ErrGrpcStreamingManagerNotEnabled
}

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
Expand Down
1 change: 0 additions & 1 deletion protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type GrpcStreamingManager interface {
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
Expand Down
8 changes: 8 additions & 0 deletions protocol/x/clob/keeper/grpc_stream_orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,13 @@ func (k Keeper) StreamOrderbookUpdates(
req *types.StreamOrderbookUpdatesRequest,
stream types.Query_StreamOrderbookUpdatesServer,
) error {
err := k.GetGrpcStreamingManager().Subscribe(*req, stream)
if err != nil {
return err
}

// Keep this scope alive because once this scope exits - the stream is closed
ctx := stream.Context()
<-ctx.Done()
return nil
}
2 changes: 2 additions & 0 deletions protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,8 @@ func (k Keeper) SendOffchainMessages(
}
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of SendOrderbookUpdates to the Keeper struct is a critical part of enhancing the system's capability to stream offchain updates. This function leverages the GrpcStreamingManager to send order book updates, aligning with the PR's objectives. However, it's crucial to ensure that offchainUpdates passed to this function are validated and sanitized before use to prevent any potential security issues. Additionally, consider error handling for scenarios where the gRPC streaming manager might not be available or fails to send updates, to ensure the system's robustness.

Ensure proper validation and error handling within SendOrderbookUpdates to enhance security and robustness.

}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
12 changes: 12 additions & 0 deletions protocol/x/clob/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,16 @@ var (
10001,
"Subaccount cannot open more orders due to equity tier limit.",
)

// GrpcStreamingManager errors.
ErrGrpcStreamingManagerNotEnabled = errorsmod.Register(
jayy04 marked this conversation as resolved.
Show resolved Hide resolved
ModuleName,
11000,
"GrpcStreamingManager is not enabled",
)
ErrInvalidGrpcStreamingRequest = errorsmod.Register(
ModuleName,
11001,
"Invalid gRPC streaming request",
)
)
Loading