Skip to content

Commit

Permalink
Merge pull request #999 from kaleido-io/topic
Browse files Browse the repository at this point in the history
Do not rely on "topic" to be present in event streams
  • Loading branch information
peterbroadhurst authored Aug 23, 2022
2 parents 63f24a6 + 05bc5c7 commit 6a18d08
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 44 deletions.
18 changes: 9 additions & 9 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestStreamUpdateError(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345",
httpmock.NewStringResponder(500, `pop`))

Expand All @@ -468,13 +468,13 @@ func TestInitAllExistingStreams(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */},
}))
httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345",
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}))
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"}))
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 2))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{}))
Expand Down Expand Up @@ -508,13 +508,13 @@ func TestInitAllExistingStreamsV1(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */},
}))
httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345",
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}))
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"}))
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{}))
Expand Down Expand Up @@ -548,13 +548,13 @@ func TestInitAllExistingStreamsOld(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "BatchPin"},
}))
httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345",
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}))
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"}))
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{}))
Expand Down Expand Up @@ -588,13 +588,13 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "BatchPin_3078373143373635" /* this is the subname for our combo of instance path and BatchPin */},
}))
httpmock.RegisterResponder("PATCH", "http://localhost:12345/eventstreams/es12345",
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}))
httpmock.NewJsonResponderOrPanic(200, &eventStream{ID: "es12345", Name: "topic1"}))
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 2))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{}))
Expand Down
38 changes: 18 additions & 20 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,46 +78,44 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt
return streams, nil
}

func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) {
stream := eventStream{
func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream {
return &eventStream{
Name: topic,
ErrorHandling: "block",
BatchSize: batchSize,
BatchTimeoutMS: batchTimeout,
Type: "websocket",
WebSocket: eventStreamWebsocket{Topic: topic},
Timestamps: true,
// Some implementations require a "topic" to be set separately, while others rely only on the name.
// We set them to the same thing for cross compatibility.
WebSocket: eventStreamWebsocket{Topic: topic},
Timestamps: true,
}
}

func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) {
stream := buildEventStream(topic, batchSize, batchTimeout)
res, err := s.client.R().
SetContext(ctx).
SetBody(&stream).
SetResult(&stream).
SetBody(stream).
SetResult(stream).
Post("/eventstreams")
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgEthconnectRESTErr)
}
return &stream, nil
return stream, nil
}

func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint, eventStreamID string) (*eventStream, error) {
stream := eventStream{
Name: topic,
ErrorHandling: "block",
BatchSize: batchSize,
BatchTimeoutMS: batchTimeout,
Type: "websocket",
WebSocket: eventStreamWebsocket{Topic: topic},
Timestamps: true,
}
stream := buildEventStream(topic, batchSize, batchTimeout)
res, err := s.client.R().
SetContext(ctx).
SetBody(&stream).
SetResult(&stream).
SetBody(stream).
SetResult(stream).
Patch("/eventstreams/" + eventStreamID)
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgEthconnectRESTErr)
}
return &stream, nil
return stream, nil
}

func (s *streamManager) ensureEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) {
Expand All @@ -126,7 +124,7 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string, bat
return nil, err
}
for _, stream := range existingStreams {
if stream.WebSocket.Topic == topic {
if stream.Name == topic {
stream, err = s.updateEventStream(ctx, topic, batchSize, batchTimeout, stream.ID)
if err != nil {
return nil, err
Expand Down
22 changes: 14 additions & 8 deletions internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,31 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt
return streams, nil
}

func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) {
stream := eventStream{
func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream {
return &eventStream{
Name: topic,
ErrorHandling: "block",
BatchSize: batchSize,
BatchTimeoutMS: batchTimeout,
Type: "websocket",
WebSocket: eventStreamWebsocket{Topic: topic},
Timestamps: true,
// Some implementations require a "topic" to be set separately, while others rely only on the name.
// We set them to the same thing for cross compatibility.
WebSocket: eventStreamWebsocket{Topic: topic},
Timestamps: true,
}
}

func (s *streamManager) createEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) {
stream := buildEventStream(topic, batchSize, batchTimeout)
res, err := s.client.R().
SetContext(ctx).
SetBody(&stream).
SetResult(&stream).
SetBody(stream).
SetResult(stream).
Post("/eventstreams")
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgFabconnectRESTErr)
}
return &stream, nil
return stream, nil
}

func (s *streamManager) ensureEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint) (*eventStream, error) {
Expand All @@ -110,7 +116,7 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string, bat
return nil, err
}
for _, stream := range existingStreams {
if stream.WebSocket.Topic == topic {
if stream.Name == topic {
return stream, nil
}
}
Expand Down
14 changes: 7 additions & 7 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestInitAllExistingStreams(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"},
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "BatchPin"},
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {
resetConf(e)

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(500, "pop"))
httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"),
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) {
resetConf(e)

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "ns1_BatchPin"},
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "BatchPin"},
Expand Down Expand Up @@ -416,7 +416,7 @@ func TestAddFireflySubscriptionInvalidSubName(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{
{ID: "sub12345", Stream: "es12345", Name: "BatchPin"},
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestInitNewConfig(t *testing.T) {
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", WebSocket: eventStreamWebsocket{Topic: "topic1"}}}))
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))

resetConf(e)
utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
Expand Down

0 comments on commit 6a18d08

Please sign in to comment.