Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a bunch of logging and a little delay so that we have time to set… #1210

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions api/v1/server/handlers/monitoring/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *MonitoringService) MonitoringPostRunProbe(ctx echo.Context, request gen

cleanup, err := m.run(cancellableContext, cf, m.workflowName, events, stepChan, errorChan)
if err != nil {
m.l.Error().Msgf("error running probe: %s", err)
m.l.Error().Msgf("probe: error running probe: %s", err)
return nil, err
}

Expand All @@ -85,26 +85,29 @@ func (m *MonitoringService) MonitoringPostRunProbe(ctx echo.Context, request gen
case <-cancellableContext.Done():

if cancellableContext.Err() == context.DeadlineExceeded {
m.l.Error().Msg("timed out waiting for probe to complete")
return nil, fmt.Errorf("timed out waiting for probe to complete")
m.l.Error().Msg("probe: timed out waiting for probe to complete")
return nil, fmt.Errorf("probe: timed out waiting for probe to complete")
}

case err := <-errorChan:
m.l.Error().Msgf("error during probe: %s", err)
m.l.Error().Msgf("probe: error during probe: %s", err)
return nil, err

case e := <-events:
m.l.Debug().Msgf("probe: received event: %s", e)
if !strings.HasPrefix(e, messages[messageIndex]) {
return nil, fmt.Errorf("expected message %s, to start with %s", messages[messageIndex], e)
m.l.Error().Msgf("probe: expected message %s, to start with %s", e, messages[messageIndex])
return nil, fmt.Errorf("probe: expected message %s, to start with %s", messages[messageIndex], e)
}
m.l.Debug().Msgf("probe: received expected message: %s", e)
messageIndex++

if messageIndex == len(messages) {
for i := range stepMessages {
stepMessage := <-stepChan
if stepMessage != stepMessages[i] {

return nil, fmt.Errorf("probe did not complete successfully - step messages failed")
m.l.Error().Msgf("probe: expected step message %s, got %s", stepMessages[i], stepMessage)
return nil, fmt.Errorf("probe: probe did not complete successfully - step messages failed")
}
}
return nil, nil
Expand Down Expand Up @@ -214,15 +217,18 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi

go func() {
err = c.Subscribe().StreamByAdditionalMetadata(ctx, streamKey, streamValue, func(event client.StreamEvent) error {
m.l.Info().Msgf("probe: received stream event: %s", string(event.Message))
events <- string(event.Message)

return nil
})
if err != nil {
m.l.Error().Msgf("error subscribing to stream: %s", err)
errors <- fmt.Errorf("error subscribing to stream: %w", err)
}
}()

time.Sleep(100 * time.Millisecond)
go func() {
testEvent := probeEvent{
UniqueStreamId: streamValue,
Expand All @@ -239,13 +245,15 @@ func (m *MonitoringService) run(ctx context.Context, cf clientconfig.ClientConfi
)

if err != nil {
m.l.Error().Msgf("error pushing event: %s", err)
errors <- fmt.Errorf("error pushing event: %w", err)
}
}()

cleanupWorker, err := w.Start()

if err != nil {
m.l.Error().Msgf("error starting worker: %s", err)
return nil, fmt.Errorf("error starting worker: %w", err)
}

Expand Down
Loading