Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate event stream per namespace #1388

Merged
merged 27 commits into from
Mar 4, 2024

Conversation

nguyer
Copy link
Contributor

@nguyer nguyer commented Aug 10, 2023

This PR resolves #1381 and #1382

Because we are moving from a single event stream to one per namespace, existing subscriptions will be recreated and events will be replayed on a new event stream the first time a FireFly Core node starts with these changes. Otherwise there is no user impacting change in this PR.

Event streams now follow the format <plugin_topic_name>/<namespace_name>. For example, event stream for the default namespace with a plugin topic of 0 would now be: 0/default.

There is still outstanding work to do in the tokens plugin and tokens connectors, but I'm splitting those out into separate PRs.

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer nguyer requested a review from a team as a code owner August 10, 2023 20:53
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@codecov-commenter
Copy link

codecov-commenter commented Aug 30, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (b1fd7dc) 99.99% compared to head (7972561) 99.99%.
Report is 20 commits behind head on main.

❗ Current head 7972561 differs from pull request most recent head ba212a9. Consider uploading reports for the commit ba212a9 to get more accurate results

Additional details and impacted files
@@           Coverage Diff            @@
##             main    #1388    +/-   ##
========================================
  Coverage   99.99%   99.99%            
========================================
  Files         322      322            
  Lines       23301    23407   +106     
========================================
+ Hits        23299    23405   +106     
  Misses          1        1            
  Partials        1        1            

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@nguyer
Copy link
Contributor Author

nguyer commented Aug 31, 2023

If we have multiple namespaces using the same token connector (which I think is a valid configuration) with these changes FireFly would create multiple WebSocket connections to that one token connector which may not work correctly until the token connectors themselves are updated. I'm not sure if that's an issue or not.

Copy link
Contributor

@peterbroadhurst peterbroadhurst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great piece of work @nguyer 👍

A few comments/requests to think through.


return nil
func (e *Ethereum) getTopic(namespace string) string {
return fmt.Sprintf("%s/%s", e.pluginTopic, namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it's important we document this, in the docs for the configuration of the EthconnectConfigTopic config entry

@@ -279,7 +282,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N
return "", err
}

sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID, batchPinEventABI)
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID[namespace.Name], batchPinEventABI)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a check that the map entry is set (to avoid a nil panic)?

Or is the dynamic config loading threading model such that we know it's impossible for an API call to come down to this layer to create a subscription for a new listener, after the stream has been cleaned up?

defer close(e.closed)
func (e *Ethereum) eventLoop(namespace string) {
topic := e.getTopic(namespace)
wsconn := e.wsconn[namespace]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost certain not to be a problem, but technically I think this should be passed in on the creation of the go-routine, as there's no locking around the map. So the starting routine sets this before kicking off the eventLoop and then we're querying the map in that newly kicked off routine.

return fmt.Sprintf("%s/%s", f.pluginTopic, namespace)
}

func (f *Fabric) StartNamespace(ctx context.Context, namespace string) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a recollection that FabConnect currently has a different restriction around how multiple event streams work, to EthConnect. I wonder if it is possible that this PR will change behavior for Fabric in a way that needs to be documented, particularly for multiple namespace scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good call out. I'm investigating this...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circling back to the feedback on this PR. The conclusion of this thread was that Fabconnect has never properly supported multiple namespaces using the same contract on the same runtime. These changes do not alter this, but now they will likely result in an error, rather than silently going into completely unpredictable behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like an important callout in the migration docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return err
}
case pluginCategoryDataexchange:
if plugin.category == pluginCategoryDataexchange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I observe that the same issue addressed in this PR, exists in the connections to DataExchange.

I am not proposing any change is made under this PR to create separate per-namespace event listeners to DataExchange, but on the journey to full completion active/active (which this PR is a step towards) there should be a work item tracked to investigate if a similar code change is required there to have safe operation.

nmm.mdx.On("Start", mock.Anything).Return(nil)
nmm.mti[0].On("Start", mock.Anything).Return(nil)
nmm.mti[1].On("Start", mock.Anything).Return(nil)
// nmm.mti[0].On("StartNamespace", mock.Anything, "default").Return(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just delete these lines rather than commenting them out

func TestStartTokensFail(t *testing.T) {
nm, nmm, cleanup := newTestNamespaceManager(t, true)
defer cleanup()
// func TestStartTokensFail(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@@ -249,6 +249,16 @@ func (or *orchestrator) Init() (err error) {
func Purge(ctx context.Context, ns *core.Namespace, plugins *Plugins, dxNodeName string) {
// Clear all handlers on all plugins, as this namespace is never coming back
setHandlers(ctx, plugins, ns, dxNodeName, nil, nil)
err := plugins.Blockchain.Plugin.StopNamespace(ctx, ns.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why this one is in Purge (guaranteed never to come back) rather than Stop (we might just be reloading).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably just in the wrong spot. I'll move it. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this one again... it doesn't look like Stop gets called from Purge. Should it? Or does something else call both of them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is only safe to make the tokens change, in coordination with a change to the token connectors themselves to have a separate event stream for each namespace. Because otherwise, there will still only be one ack stream, so whichever namespace receives/acks the events from the token connector will "eat" those events preventing them being dispatched to other namespaces.

Maybe the best thing is to split the token connector work out into another PR that can be coordinated with a change to the token connectors themselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I had originally intended to make this two PRs. But it sat open long enough I kept tacking on more commits 🙂 I'll split out the tokens changes as a separate branch/PR that we can hold until the token connectors are ready.

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer nguyer marked this pull request as draft September 5, 2023 13:56
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer nguyer marked this pull request as ready for review February 2, 2024 18:34
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Copy link
Contributor

@peterbroadhurst peterbroadhurst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few minor additional comments from me to think about @nguyer, but none are a blocker to getting this important piece of work into mainline and progressing forwards to a release.

if err != nil {
return err
}
// Otherwise, make sure that our event stream is in place
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise feels superfluous here

log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic)
e.closed[namespace] = make(chan struct{})

go e.eventLoop(namespace, e.wsconn[namespace], e.closed[namespace])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth including the namespace in the logging context for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added below in the event loop

func (e *Ethereum) StopNamespace(ctx context.Context, namespace string) (err error) {
wsconn, ok := e.wsconn[namespace]
if ok {
wsconn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we should wait for the eventLoop to close

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have e.closed[namespace] for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I added a wait for this here, but it then gets into a deadlock trying to stop the namespace

if ok {
    <-e.closed[namespace]
    wsconn.Close()
}

@@ -127,7 +127,7 @@ func (s *streamManager) updateEventStream(ctx context.Context, topic string, bat
return stream, nil
}

func (s *streamManager) ensureEventStream(ctx context.Context, topic string) (*eventStream, error) {
func (s *streamManager) ensureEventStream(ctx context.Context, topic, pluginTopic string) (*eventStream, error) {
existingStreams, err := s.getEventStreams(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like we should be able to query EVMConnect with a query parameter (which EthConnect might ignore) to scope this down.

Understand this isn't new, but previously it would be much less likely to have many event streams, because there's just one per core runtime. Now we might have 100s... in fact with FFTM I suspect there's a query-limit that means we might miss them after we get above a certain page size.

@@ -140,10 +140,29 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string) (*e
}
return stream, nil
}
if stream.Name == pluginTopic {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note comment above would have to be an or select

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not sure I understand this comment. Do you mean the comment on line 144?

return fmt.Sprintf("%s/%s", f.pluginTopic, namespace)
}

func (f *Fabric) StartNamespace(ctx context.Context, namespace string) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like an important callout in the migration docs

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer
Copy link
Contributor Author

nguyer commented Feb 21, 2024

I am also noting that the Tezos plugin was developed in parallel to this PR so it does not have these changes. However, TezosConnect does not yet have evenstream support implemented. Once Tezosconnect supports eventstreams we should circle back and make these same changes there too. If I were to do the changes as part of this PR today I wouldn't be able to test them, so I think it's best to hold off for now.

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
@nguyer nguyer merged commit 5964d51 into hyperledger:main Mar 4, 2024
14 checks passed
@nguyer nguyer deleted the eventstreams branch March 4, 2024 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Re-work event streams in blockchain plugins to create one eventstream per namespace
3 participants