Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-646] stream offchain updates through stream manager #1138

Merged
merged 6 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,9 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand Down Expand Up @@ -1758,8 +1760,5 @@ func getGrpcStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
if appFlags.NonValidatingFullNode {
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
return streaming.NewGrpcStreamingManager()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add the if statement back in in the next PR

Copy link
Contributor

@coderabbitai coderabbitai bot Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC server is initialized without specifying credentials, which could lead to insecure communication. It's important to ensure that all gRPC communication is encrypted to prevent eavesdropping and tampering by malicious actors. Consider adding SSL/TLS credentials to the gRPC server initialization. You can create credentials using credentials.NewServerTLSFromFile("cert.pem", "cert.key") and include them in the server options.

grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromFile("cert.pem", "cert.key")))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the ec2 instance I am running this on does not support filesystem reads and writes. Can I paste the contents of my .pem file in the code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonfung-dydx Embedding the contents of your .pem file directly in the code is not recommended due to security concerns. A more secure approach would be to use environment variables or a secrets management service provided by your cloud provider. This way, you can securely manage the .pem file contents without exposing them in your source code. For example, you could load the .pem file contents into an environment variable and then access it in your code like so:

cert := os.Getenv("CERT_PEM_CONTENTS")
key := os.Getenv("KEY_PEM_CONTENTS")
creds, err := credentials.NewServerTLSFromString(cert, key)
if err != nil {
    // Handle error
}
grpcServer := grpc.NewServer(grpc.Creds(creds))

Make sure to securely set the environment variables CERT_PEM_CONTENTS and KEY_PEM_CONTENTS with the contents of your .pem files. If you're using AWS, you might consider using AWS Secrets Manager or Parameter Store to securely manage and access these values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I have no money to use AWS secrets manager. What should I do here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonfung-dydx If AWS Secrets Manager is not an option due to cost concerns, you might consider the following approaches to securely manage your .pem file contents for the gRPC server:

  1. Encrypted Environment Variables: You can encrypt the .pem file contents and store them as environment variables. During your application startup, you can decrypt these variables to use them. This approach requires you to manage encryption keys securely, but it avoids storing sensitive information in plain text.

  2. Open-Source Secrets Management Tools: There are several open-source secrets management tools that you can run without significant cost. Tools like HashiCorp Vault or Bitnami Sealed Secrets can help you manage secrets securely. These tools provide mechanisms to store, access, and manage secrets, including certificates, without needing to store them directly in your code or as plain environment variables.

  3. In-Memory Storage: Since your environment does not support filesystem reads/writes, consider loading the .pem contents into memory at the start of your application (e.g., from an encrypted source) and using them directly from there. This approach keeps the sensitive information out of the filesystem and environment variables, reducing the risk of accidental exposure.

Each of these methods has its own set of considerations, especially regarding the secure handling of encryption keys and secret values. It's important to evaluate them in the context of your application's security requirements and operational environment.

}
91 changes: 88 additions & 3 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package grpc

import (
"sync"

"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -9,30 +13,111 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
sync.Mutex
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextId uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nextSubscriptionId

}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
clobPairIds []uint32
srv clobtypes.Query_StreamOrderbookUpdatesServer
finished chan bool
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{}
return &GrpcStreamingManagerImpl{
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
}
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
return true
}

// Subscribe subscribes to the orderbook updates stream.
// This function returns a channel that is used to signal termination when an error occurs.
jayy04 marked this conversation as resolved.
Show resolved Hide resolved
func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
finished = make(chan bool)
subscription := &OrderbookSubscription{
clobPairIds: req.GetClobPairId(),
srv: srv,
finished: finished,
}

sm.Lock()
defer sm.Unlock()

sm.orderbookSubscriptions[sm.nextId] = subscription
sm.nextId++
Copy link
Contributor

@jonfung-dydx jonfung-dydx Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding, what is the point of the nextId variable if we are just using this as a unique key variable for a map? seems like we don't expose it anywhere here, where are we gonna expose subscription key? functionally this is the same as an append only array

EDIT: i see, it is for removal


return finished, nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
offchainUpdates *clobtypes.OffchainUpdates,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be now, but do we want to condense the messages somehow? We have this condensing function for indexer, but i don't think it works in this case. these events might be condensed differently.

func (om *OffchainUpdates) CondenseMessagesForReplay() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this gets called before SendOffchainMessages is called? i.e. the message is already condensed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets called during Replay Operations but i feel it might not be used half the time due to this code:

	if replayUpdates != nil {
		offchainUpdates = replayUpdates
	}

I think it should be fine, unless you can think of any edge case where condensing messages drops anything vital for orderbook updates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah should be fine to send for example (place + remove) for example


// Unmarshal messages to v1 updates.
v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for clobPairId, update := range updates {
v1updates[clobPairId] = GetOffchainUpdatesV1(update)
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: false,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
}
}
}

// Clean up subscriptions that have been closed.
jayy04 marked this conversation as resolved.
Show resolved Hide resolved
for _, id := range idsToRemove {
sm.orderbookSubscriptions[id].finished <- true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close the channel if it's done and this is the only goroutine that sends stuff into this channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think we can just use ctx.Done()? no need to have this extra channel

delete(sm.orderbookSubscriptions, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that this delete will properly clean up the nested inner data structures? grpc.ServerStream might have a close function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's no close function 🆗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah wasn't able to find a close function either

not really sure if we need to do ctx, cancelCtx := context.WithCancel(ctx) to cancel the context, since the connection is likely already broken and the context is already done 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/grpc/grpc-go/blob/v1.60.x/stream.go#L262-L278

	// Possible context leak:
	// The cancel function for the child context we create will only be called
	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
	// an error is generated by SendMsg.

I'm assuming an error will be generated by SendMsg in this case so the cancel function will be called.

}
}

// GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1.
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, message := range offchainUpdates.Messages {
var update ocutypes.OffChainUpdateV1
proto.Unmarshal(message.Message.Value, &update)
v1updates = append(v1updates, update)
}
return v1updates
}
21 changes: 20 additions & 1 deletion protocol/x/clob/keeper/grpc_stream_orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,24 @@ func (k Keeper) StreamOrderbookUpdates(
req *types.StreamOrderbookUpdatesRequest,
stream types.Query_StreamOrderbookUpdatesServer,
) error {
return nil
grpcStreamingManager := k.GetGrpcStreamingManager()
if !grpcStreamingManager.Enabled() {
return types.ErrGrpcStreamingManagerNotEnabled
}

finished, err := grpcStreamingManager.Subscribe(*req, stream)
if err != nil {
return err
}

// Keep this scope alive because once this scope exits - the stream is closed
ctx := stream.Context()
for {
select {
case <-finished:
return nil
case <-ctx.Done():
return nil
}
}
}
2 changes: 2 additions & 0 deletions protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,8 @@ func (k Keeper) SendOffchainMessages(
}
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of SendOrderbookUpdates to the Keeper struct is a critical part of enhancing the system's capability to stream offchain updates. This function leverages the GrpcStreamingManager to send order book updates, aligning with the PR's objectives. However, it's crucial to ensure that offchainUpdates passed to this function are validated and sanitized before use to prevent any potential security issues. Additionally, consider error handling for scenarios where the gRPC streaming manager might not be available or fails to send updates, to ensure the system's robustness.

Ensure proper validation and error handling within SendOrderbookUpdates to enhance security and robustness.

}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
6 changes: 6 additions & 0 deletions protocol/x/clob/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,10 @@ var (
10001,
"Subaccount cannot open more orders due to equity tier limit.",
)

ErrGrpcStreamingManagerNotEnabled = errorsmod.Register(
jayy04 marked this conversation as resolved.
Show resolved Hide resolved
ModuleName,
11000,
"GrpcStreamingManager is not enabled",
)
)
Loading