Skip to content

Commit

Permalink
Enable event flushing during consumption (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
sananguliyev authored Sep 15, 2024
1 parent ed53b15 commit e81409d
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
2 changes: 1 addition & 1 deletion internal/impl/pure/processor_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,5 +1048,5 @@ workflow:
{Type: "CONSUME", Content: "hello world", Meta: map[string]interface{}{}},
{Type: "PRODUCE", Content: "{\"id\":\"HELLO WORLD\"}", Meta: map[string]interface{}{}},
},
}, tracer.ProcessorEvents())
}, tracer.ProcessorEvents(false))
}
4 changes: 2 additions & 2 deletions public/service/stream_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ output:
require.NoError(t, strm.Run(tCtx))

eventKeys := map[string]map[string]struct{}{}
for k, v := range tracSum.InputEvents() {
for k, v := range tracSum.InputEvents(false) {
eMap := map[string]struct{}{}
for _, e := range v {
eMap[e.Content] = struct{}{}
Expand All @@ -1283,7 +1283,7 @@ output:
}, eventKeys)

eventKeys = map[string]map[string]struct{}{}
for k, v := range tracSum.OutputEvents() {
for k, v := range tracSum.OutputEvents(false) {
eMap := map[string]struct{}{}
for _, e := range v {
eMap[e.Content] = struct{}{}
Expand Down
12 changes: 6 additions & 6 deletions public/service/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func (s *TracingSummary) TotalOutput() uint64 {
// execution of a stream pipeline.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) InputEvents() map[string][]TracingEvent {
func (s *TracingSummary) InputEvents(flush bool) map[string][]TracingEvent {
m := map[string][]TracingEvent{}
for k, v := range s.summary.InputEvents(false) {
for k, v := range s.summary.InputEvents(flush) {
events := make([]TracingEvent, len(v))
for i, e := range v {
events[i] = TracingEvent{
Expand All @@ -100,9 +100,9 @@ func (s *TracingSummary) InputEvents() map[string][]TracingEvent {
// execution of a stream pipeline.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent {
func (s *TracingSummary) ProcessorEvents(flush bool) map[string][]TracingEvent {
m := map[string][]TracingEvent{}
for k, v := range s.summary.ProcessorEvents(false) {
for k, v := range s.summary.ProcessorEvents(flush) {
events := make([]TracingEvent, len(v))
for i, e := range v {
events[i] = TracingEvent{
Expand All @@ -120,9 +120,9 @@ func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent {
// execution of a stream pipeline.
//
// Experimental: This method may change outside of major version releases.
func (s *TracingSummary) OutputEvents() map[string][]TracingEvent {
func (s *TracingSummary) OutputEvents(flush bool) map[string][]TracingEvent {
m := map[string][]TracingEvent{}
for k, v := range s.summary.OutputEvents(false) {
for k, v := range s.summary.OutputEvents(flush) {
events := make([]TracingEvent, len(v))
for i, e := range v {
events[i] = TracingEvent{
Expand Down
6 changes: 3 additions & 3 deletions public/service/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ logger:
{Type: service.TracingEventProduce, Content: `{"id":4}`, Meta: tMap{}},
{Type: service.TracingEventProduce, Content: `{"id":5}`, Meta: tMap{}},
},
}, trace.InputEvents())
}, trace.InputEvents(false))

assert.Equal(t, map[string][]service.TracingEvent{
"root.pipeline.processors.0": {
Expand All @@ -129,7 +129,7 @@ logger:
{Type: service.TracingEventConsume, Content: `{"id":5}`, Meta: tMap{}},
{Type: service.TracingEventProduce, Content: `{"count":5}`, Meta: tMap{"foo": int64(5)}},
},
}, trace.ProcessorEvents())
}, trace.ProcessorEvents(false))

assert.Equal(t, map[string][]service.TracingEvent{
"root.output": {
Expand All @@ -139,7 +139,7 @@ logger:
{Type: service.TracingEventConsume, Content: `{"id":4}`, Meta: tMap{}},
{Type: service.TracingEventConsume, Content: `{"count":5}`, Meta: tMap{"foo": int64(5)}},
},
}, trace.OutputEvents())
}, trace.OutputEvents(false))
}

func BenchmarkStreamTracing(b *testing.B) {
Expand Down

0 comments on commit e81409d

Please sign in to comment.