Skip to content

Commit

Permalink
backend: list message concurrent stream send fix (#1076)
Browse files Browse the repository at this point in the history
* backend: in stream reporter use mutex to guard for concurrent writes to stream send()

* frontend: fix commented out timeout code

* backend: go format

* frontend: fix debug logs

* frontend: remove extra empty line after req
  • Loading branch information
bojand committed Feb 5, 2024
1 parent fde9a36 commit 8a44126
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 32 deletions.
22 changes: 11 additions & 11 deletions backend/pkg/api/connect/service/console/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,6 @@ func NewService(logger *zap.Logger,

// ListMessages consumes a Kafka topic and streams the Kafka records back.
func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alpha.ListMessagesRequest], stream *connect.ServerStream[v1alpha.ListMessagesResponse]) error {
timeout := 35 * time.Second
if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest {
// Push-down filters and StartOffset = Newest may be long-running streams.
// There's already a client-side provided timeout which we usually trust.
// But additionally we want to ensure it never takes much longer than that.
timeout = 31 * time.Minute
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

lmq := httptypes.ListMessagesRequest{
TopicName: req.Msg.GetTopic(),
StartOffset: req.Msg.GetStartOffset(),
Expand Down Expand Up @@ -122,6 +111,17 @@ func (api *Service) ListMessages(ctx context.Context, req *connect.Request[v1alp

api.authHooks.PrintListMessagesAuditLog(ctx, req, &listReq)

timeout := 35 * time.Second
if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest {
// Push-down filters and StartOffset = Newest may be long-running streams.
// There's already a client-side provided timeout which we usually trust.
// But additionally we want to ensure it never takes much longer than that.
timeout = 31 * time.Minute
}

ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("list fetch timeout"))
defer cancel()

progress := &streamProgressReporter{
ctx: ctx,
logger: api.logger,
Expand Down
58 changes: 46 additions & 12 deletions backend/pkg/api/connect/service/console/stream_progress_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package console

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -31,6 +32,8 @@ type streamProgressReporter struct {

messagesConsumed atomic.Int64
bytesConsumed atomic.Int64

writeMutex sync.Mutex
}

func (p *streamProgressReporter) Start() {
Expand All @@ -44,41 +47,53 @@ func (p *streamProgressReporter) Start() {
// topic it may take some time until there are messages. This go routine is in charge of keeping the user up to
// date about the progress Kowl made streaming the topic
go func() {
ticker := time.NewTicker(1 * time.Second)

for {
select {
case <-p.ctx.Done():
ticker.Stop()
return
default:
case <-ticker.C:
p.reportProgress()
}
time.Sleep(1 * time.Second)
}
}()
}

func (p *streamProgressReporter) reportProgress() {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_ProgressMessage{
MessagesConsumed: p.messagesConsumed.Load(),
BytesConsumed: p.bytesConsumed.Load(),
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Progress{
Progress: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream reportProgress", zap.Error(err))
}
}

func (p *streamProgressReporter) OnPhase(name string) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_PhaseMessage{
Phase: name,
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Phase{
Phase: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream OnPhase", zap.Error(err))
}
}

func (p *streamProgressReporter) OnMessageConsumed(size int64) {
Expand All @@ -87,6 +102,13 @@ func (p *streamProgressReporter) OnMessageConsumed(size int64) {
}

func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
if message == nil {
return
}

p.writeMutex.Lock()
defer p.writeMutex.Unlock()

headers := make([]*v1alpha.KafkaRecordHeader, 0, len(message.Headers))

for _, mh := range message.Headers {
Expand Down Expand Up @@ -164,36 +186,48 @@ func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
})
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Data{
Data: data,
},
})
}); err != nil {
p.logger.Error("send error in stream OnMessage", zap.Error(err))
}
}

func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_StreamCompletedMessage{
ElapsedMs: elapsedMs,
IsCancelled: isCancelled,
MessagesConsumed: p.messagesConsumed.Load(),
BytesConsumed: p.bytesConsumed.Load(),
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Done{
Done: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream OnComplete", zap.Error(err))
}
}

func (p *streamProgressReporter) OnError(message string) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

msg := &v1alpha.ListMessagesResponse_ErrorMessage{
Message: message,
}

p.stream.Send(&v1alpha.ListMessagesResponse{
if err := p.stream.Send(&v1alpha.ListMessagesResponse{
ControlMessage: &v1alpha.ListMessagesResponse_Error{
Error: msg,
},
})
}); err != nil {
p.logger.Error("send error in stream OnError", zap.Error(err))
}
}
4 changes: 2 additions & 2 deletions backend/pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (s *Service) FetchMessages(ctx context.Context, progress IListMessagesProgr
// 2. Create consumer workers
jobs := make(chan *kgo.Record, 100)
resultsCh := make(chan *TopicMessage, 100)
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
workerCtx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("worker cancel"))

wg := sync.WaitGroup{}

Expand Down
15 changes: 8 additions & 7 deletions frontend/src/state/backendApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ const addBearerTokenInterceptor: ConnectRpcInterceptor = (next) => async (req: U
};


const transport = createConnectTransport({
baseUrl: appConfig.grpcBase,
interceptors: [addBearerTokenInterceptor]
});

const consoleClient = createPromiseClient(ConsoleService, transport);

let messageSearchAbortController: AbortController | null = null;

//
Expand Down Expand Up @@ -372,12 +379,6 @@ const apiStore = {

// do it
const abortController = messageSearchAbortController = new AbortController();
const transport = createConnectTransport({
baseUrl: appConfig.grpcBase,
interceptors: [addBearerTokenInterceptor]
});

const client = createPromiseClient(ConsoleService, transport);

const req = new ListMessagesRequest();
req.topic = searchRequest.topicName;
Expand All @@ -399,7 +400,7 @@ const apiStore = {
}

try {
for await (const res of await client.listMessages(req, { signal: abortController.signal, timeoutMs })) {
for await (const res of await consoleClient.listMessages(req, { signal: abortController.signal, timeoutMs })) {
if (abortController.signal.aborted)
break;

Expand Down

0 comments on commit 8a44126

Please sign in to comment.