diff --git a/api/v1/server/handlers/monitoring/probe.go b/api/v1/server/handlers/monitoring/probe.go index ec33adaf5..a9da8c542 100644 --- a/api/v1/server/handlers/monitoring/probe.go +++ b/api/v1/server/handlers/monitoring/probe.go @@ -70,7 +70,7 @@ func (m *MonitoringService) MonitoringPostRunProbe(ctx echo.Context, request gen cleanup, err := m.run(cancellableContext, cf, m.workflowName, events, stepChan, errorChan) if err != nil { - m.l.Error().Msgf("error running probe: %s", err) + m.l.Error().Msgf("probe: error running probe: %s", err) return nil, err } @@ -85,26 +85,29 @@ func (m *MonitoringService) MonitoringPostRunProbe(ctx echo.Context, request gen case <-cancellableContext.Done(): if cancellableContext.Err() == context.DeadlineExceeded { - m.l.Error().Msg("timed out waiting for probe to complete") - return nil, fmt.Errorf("timed out waiting for probe to complete") + m.l.Error().Msg("probe: timed out waiting for probe to complete") + return nil, fmt.Errorf("probe: timed out waiting for probe to complete") } case err := <-errorChan: - m.l.Error().Msgf("error during probe: %s", err) + m.l.Error().Msgf("probe: error during probe: %s", err) return nil, err case e := <-events: + m.l.Debug().Msgf("probe: received event: %s", e) if !strings.HasPrefix(e, messages[messageIndex]) { - return nil, fmt.Errorf("expected message %s, to start with %s", messages[messageIndex], e) + m.l.Error().Msgf("probe: expected message %s, to start with %s", e, messages[messageIndex]) + return nil, fmt.Errorf("probe: expected message %s, to start with %s", messages[messageIndex], e) } + m.l.Debug().Msgf("probe: received expected message: %s", e) messageIndex++ if messageIndex == len(messages) { for i := range stepMessages { stepMessage := <-stepChan if stepMessage != stepMessages[i] { - - return nil, fmt.Errorf("probe did not complete successfully - step messages failed") + m.l.Error().Msgf("probe: expected step message %s, got %s", stepMessages[i], stepMessage) + return nil, fmt.Errorf("probe: probe did not complete successfully - step messages failed") } } return nil, nil @@ -214,15 +217,18 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi go func() { err = c.Subscribe().StreamByAdditionalMetadata(ctx, streamKey, streamValue, func(event client.StreamEvent) error { + m.l.Info().Msgf("probe: received stream event: %s", string(event.Message)) events <- string(event.Message) return nil }) if err != nil { + m.l.Error().Msgf("error subscribing to stream: %s", err) errors <- fmt.Errorf("error subscribing to stream: %w", err) } }() + time.Sleep(100 * time.Millisecond) go func() { testEvent := probeEvent{ UniqueStreamId: streamValue, @@ -239,6 +245,7 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi ) if err != nil { + m.l.Error().Msgf("error pushing event: %s", err) errors <- fmt.Errorf("error pushing event: %w", err) } }() @@ -246,6 +253,7 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi cleanupWorker, err := w.Start() if err != nil { + m.l.Error().Msgf("error starting worker: %s", err) return nil, fmt.Errorf("error starting worker: %w", err) }