diff --git a/internal/api/container_logs.go b/internal/api/container_logs.go index 6b66910106..d1266c8ec7 100644 --- a/internal/api/container_logs.go +++ b/internal/api/container_logs.go @@ -8,12 +8,13 @@ package api import ( "context" "fmt" - "github.com/vmware-tanzu/octant/internal/gvk" - "github.com/vmware-tanzu/octant/pkg/store" "strings" "sync" "time" + "github.com/vmware-tanzu/octant/internal/gvk" + "github.com/vmware-tanzu/octant/pkg/store" + "github.com/vmware-tanzu/octant/internal/config" "github.com/vmware-tanzu/octant/internal/modules/overview/container" "github.com/vmware-tanzu/octant/internal/octant" @@ -26,10 +27,6 @@ type logEntry struct { Message string `json:"message,omitempty"` } -type logResponse struct { - Entries []logEntry `json:"entries,omitempty"` -} - const ( RequestPodLogsSubscribe = "action.octant.dev/podLogs/subscribe" RequestPodLogsUnsubscribe = "action.octant.dev/podLogs/unsubscribe" @@ -67,7 +64,7 @@ func (s *podLogsStateManager) Handlers() []octant.ClientRequestHandler { } } -func (s *podLogsStateManager) StreamPodLogsSubscribe(state octant.State, payload action.Payload) error { +func (s *podLogsStateManager) StreamPodLogsSubscribe(_ octant.State, payload action.Payload) error { namespace, err := payload.String("namespace") if err != nil { return fmt.Errorf("getting namespace from payload: %w", err) @@ -107,7 +104,7 @@ func (s *podLogsStateManager) StreamPodLogsSubscribe(state octant.State, payload return nil } -func (s *podLogsStateManager) StreamPodLogsUnsubscribe(state octant.State, payload action.Payload) error { +func (s *podLogsStateManager) StreamPodLogsUnsubscribe(_ octant.State, payload action.Payload) error { namespace, err := payload.String("namespace") if err != nil { return fmt.Errorf("getting namespace from payload: %w", err) @@ -131,18 +128,17 @@ func (s *podLogsStateManager) StreamPodLogsUnsubscribe(state octant.State, paylo return nil } -func (s *podLogsStateManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client OctantClient) { s.client = client s.ctx = ctx } -func (s *podLogsStateManager) sendLogEvents(ctx context.Context, logEventType octant.EventType, logCh <-chan container.LogEntry) { - ctx, cancelFn := context.WithCancel(s.ctx) - for { +func (s *podLogsStateManager) streamEventsToClient(ctx context.Context, logEventType octant.EventType, logCh <-chan container.LogEntry) { + done := false + for !done { select { case <-ctx.Done(): - cancelFn() - return + done = true case entry, ok := <-logCh: if ok { le := newLogEntry(entry.Line(), entry.Container()) @@ -153,8 +149,7 @@ func (s *podLogsStateManager) sendLogEvents(ctx context.Context, logEventType oc } s.client.Send(logEvent) } else { - cancelFn() - return + done = true } } } @@ -165,7 +160,7 @@ func (s *podLogsStateManager) startStream(key store.Key, logStreamer container.L eventType := octant.NewLoggingEventType(key.Namespace, key.Name) logCh := make(chan container.LogEntry) - go s.sendLogEvents(ctx, eventType, logCh) + go s.streamEventsToClient(ctx, eventType, logCh) logStreamer.Stream(ctx, logCh) diff --git a/internal/api/container_logs_test.go b/internal/api/container_logs_test.go index bf52325914..46622968a8 100644 --- a/internal/api/container_logs_test.go +++ b/internal/api/container_logs_test.go @@ -2,15 +2,17 @@ package api import ( "context" + "sync" + "testing" + "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/modules/overview/container" "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/store" - "sync" - "testing" - "time" ) func TestContainerLogs_NewLogEntry(t *testing.T) { @@ -49,7 +51,7 @@ func TestContainerLogs_SendLogEventsStops(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - s.sendLogEvents(s.ctx, eventType, logCh) + s.streamEventsToClient(s.ctx, eventType, logCh) wg.Done() }() @@ -65,7 +67,7 @@ func TestContainerLogs_SendLogEventsClientSend(t *testing.T) { defer controller.Finish() dashConfig := configFake.NewMockDash(controller) - client := &octantClient{} + client := newOctantClient() key := store.Key{ Namespace: "test-ns", @@ -73,32 +75,47 @@ func TestContainerLogs_SendLogEventsClientSend(t *testing.T) { } eventType := octant.NewLoggingEventType(key.Namespace, key.Name) - logCh := make(chan container.LogEntry) + logCh := make(chan container.LogEntry, 1) s := NewPodLogsStateManager(dashConfig) - s.Start(context.Background(), nil, client) + ctx, cancel := context.WithCancel(context.Background()) + s.Start(ctx, nil, client) go func() { - s.sendLogEvents(s.ctx, eventType, logCh) + s.streamEventsToClient(ctx, eventType, logCh) }() le := container.NewLogEntry("container-a", "testing log line") logCh <- le + + <-client.ch + cancel() close(logCh) - assert.NotNil(t, client.sendCalledWith) - assert.Equal(t, eventType, client.sendCalledWith.Type) + if assert.NotNil(t, client.sendCalledWith) { + assert.Equal(t, eventType, client.sendCalledWith.Type) - clientLe, ok := client.sendCalledWith.Data.(logEntry) - assert.True(t, ok) - assert.Equal(t, "container-a", clientLe.Container) - assert.Equal(t, "testing log line", clientLe.Message) - assert.Nil(t, clientLe.Timestamp) + clientLe, ok := client.sendCalledWith.Data.(logEntry) + assert.True(t, ok) + assert.Equal(t, "container-a", clientLe.Container) + assert.Equal(t, "testing log line", clientLe.Message) + assert.Nil(t, clientLe.Timestamp) + } } type octantClient struct { sendCalledWith octant.Event + ch chan bool } -func (oc *octantClient) Send(event octant.Event) { oc.sendCalledWith = event } -func (oc *octantClient) ID() string { return "" } +func newOctantClient() *octantClient { + return &octantClient{ + ch: make(chan bool, 1), + } +} + +func (oc *octantClient) Send(event octant.Event) { + oc.sendCalledWith = event + oc.ch <- true +} +func (oc *octantClient) ID() string { return "" }