diff --git a/go.mod b/go.mod index 4f67d59259..58676302b0 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/mattn/go-runewidth v0.0.16 // indirect github.com/muesli/termenv v0.16.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/pflag v1.0.7 // indirect github.com/stretchr/testify v1.10.0 // indirect github.com/thlib/go-timezone-local v0.0.7 // indirect diff --git a/go.sum b/go.sum index 68f8f0ac7e..53ed3a1ca6 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEVZGK7IN2kJkjTuQ= github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= @@ -95,5 +97,6 @@ golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/cli/logs.go b/pkg/cli/logs.go index 09b392dc2e..47942c0019 100644 --- a/pkg/cli/logs.go +++ b/pkg/cli/logs.go @@ -15,6 +15,7 @@ import ( "github.com/githubnext/gh-aw/pkg/console" "github.com/githubnext/gh-aw/pkg/constants" "github.com/githubnext/gh-aw/pkg/workflow" + "github.com/sourcegraph/conc/pool" "github.com/spf13/cobra" ) @@ -46,14 +47,27 @@ type LogMetrics = workflow.LogMetrics // ErrNoArtifacts indicates that a workflow run has no artifacts var ErrNoArtifacts = errors.New("no artifacts found for this run") +// DownloadResult represents the result of downloading artifacts for a single run +type DownloadResult struct { + Run WorkflowRun + Metrics LogMetrics + Error error + Skipped bool + LogsPath string +} + // Constants for the iterative algorithm const ( // MaxIterations limits how many batches we fetch to prevent infinite loops - MaxIterations = 10 + MaxIterations = 20 // BatchSize is the number of runs to fetch in each iteration - BatchSize = 50 + BatchSize = 100 // BatchSizeForAllWorkflows is the larger batch size when searching for agentic workflows - BatchSizeForAllWorkflows = 100 + // There can be a really large number of workflow runs in a repository, so + // we are generous in the batch size when used without qualification. + BatchSizeForAllWorkflows = 250 + // MaxConcurrentDownloads limits the number of parallel artifact downloads + MaxConcurrentDownloads = 10 ) // NewLogsCommand creates the logs command @@ -185,44 +199,33 @@ func DownloadWorkflowLogs(workflowName string, count int, startDate, endDate, ou // Process each run in this batch batchProcessed := 0 - for _, run := range runs { + downloadResults := downloadRunArtifactsConcurrent(runs, outputDir, verbose, count-len(processedRuns)) + + for _, result := range downloadResults { // Stop if we've reached our target count if len(processedRuns) >= count { break } - if verbose { - fmt.Println(console.FormatInfoMessage(fmt.Sprintf("Processing run %d (%s)...", run.DatabaseID, run.Status))) - } - - // Download artifacts and logs for this run - runOutputDir := filepath.Join(outputDir, fmt.Sprintf("run-%d", run.DatabaseID)) - if err := downloadRunArtifacts(run.DatabaseID, runOutputDir, verbose); err != nil { - // Check if this is a "no artifacts" case - skip silently for cancelled/failed runs - if errors.Is(err, ErrNoArtifacts) { - if verbose { - fmt.Println(console.FormatWarningMessage(fmt.Sprintf("Skipping run %d: %v", run.DatabaseID, err))) + if result.Skipped { + if verbose { + if result.Error != nil { + fmt.Println(console.FormatWarningMessage(fmt.Sprintf("Skipping run %d: %v", result.Run.DatabaseID, result.Error))) } - continue } - fmt.Println(console.FormatWarningMessage(fmt.Sprintf("Failed to download artifacts for run %d: %v", run.DatabaseID, err))) continue } - // Extract metrics from logs - metrics, err := extractLogMetrics(runOutputDir, verbose) - if err != nil { - if verbose { - fmt.Println(console.FormatWarningMessage(fmt.Sprintf("Failed to extract metrics for run %d: %v", run.DatabaseID, err))) - } + if result.Error != nil { + fmt.Println(console.FormatWarningMessage(fmt.Sprintf("Failed to download artifacts for run %d: %v", result.Run.DatabaseID, result.Error))) + continue } // Update run with metrics and path - // Note: Duration is calculated from GitHub API timestamps (StartedAt/UpdatedAt), - // not parsed from log files for accuracy and consistency - run.TokenUsage = metrics.TokenUsage - run.EstimatedCost = metrics.EstimatedCost - run.LogsPath = runOutputDir + run := result.Run + run.TokenUsage = result.Metrics.TokenUsage + run.EstimatedCost = result.Metrics.EstimatedCost + run.LogsPath = result.LogsPath // Always use GitHub API timestamps for duration calculation if !run.StartedAt.IsZero() && !run.UpdatedAt.IsZero() { @@ -271,6 +274,83 @@ func DownloadWorkflowLogs(workflowName string, count int, startDate, endDate, ou return nil } +// downloadRunArtifactsConcurrent downloads artifacts for multiple workflow runs concurrently +func downloadRunArtifactsConcurrent(runs []WorkflowRun, outputDir string, verbose bool, maxRuns int) []DownloadResult { + if len(runs) == 0 { + return []DownloadResult{} + } + + // Limit the number of runs to process if maxRuns is specified + actualRuns := runs + if maxRuns > 0 && len(runs) > maxRuns { + actualRuns = runs[:maxRuns] + } + + if verbose { + fmt.Println(console.FormatInfoMessage(fmt.Sprintf("Processing %d runs in parallel...", len(actualRuns)))) + } + + // Use conc pool for controlled concurrency with results + p := pool.NewWithResults[DownloadResult]().WithMaxGoroutines(MaxConcurrentDownloads) + + // Process each run concurrently + for _, run := range actualRuns { + run := run // capture loop variable + p.Go(func() DownloadResult { + if verbose { + fmt.Println(console.FormatInfoMessage(fmt.Sprintf("Processing run %d (%s)...", run.DatabaseID, run.Status))) + } + + // Download artifacts and logs for this run + runOutputDir := filepath.Join(outputDir, fmt.Sprintf("run-%d", run.DatabaseID)) + err := downloadRunArtifacts(run.DatabaseID, runOutputDir, verbose) + + result := DownloadResult{ + Run: run, + LogsPath: runOutputDir, + } + + if err != nil { + // Check if this is a "no artifacts" case - mark as skipped for cancelled/failed runs + if errors.Is(err, ErrNoArtifacts) { + result.Skipped = true + result.Error = err + } else { + result.Error = err + } + } else { + // Extract metrics from logs + metrics, metricsErr := extractLogMetrics(runOutputDir, verbose) + if metricsErr != nil { + if verbose { + fmt.Println(console.FormatWarningMessage(fmt.Sprintf("Failed to extract metrics for run %d: %v", run.DatabaseID, metricsErr))) + } + // Don't fail the whole download for metrics errors + metrics = LogMetrics{} + } + result.Metrics = metrics + } + + return result + }) + } + + // Wait for all downloads to complete and collect results + results := p.Wait() + + if verbose { + successCount := 0 + for _, result := range results { + if result.Error == nil && !result.Skipped { + successCount++ + } + } + fmt.Println(console.FormatSuccessMessage(fmt.Sprintf("Completed parallel processing: %d successful, %d total", successCount, len(results)))) + } + + return results +} + // listWorkflowRunsWithPagination fetches workflow runs from GitHub with pagination support func listWorkflowRunsWithPagination(workflowName string, count int, startDate, endDate, beforeDate string, verbose bool) ([]WorkflowRun, error) { args := []string{"run", "list", "--json", "databaseId,number,url,status,conclusion,workflowName,createdAt,startedAt,updatedAt,event,headBranch,headSha,displayTitle"} diff --git a/pkg/cli/logs_parallel_test.go b/pkg/cli/logs_parallel_test.go new file mode 100644 index 0000000000..13e7a2dc87 --- /dev/null +++ b/pkg/cli/logs_parallel_test.go @@ -0,0 +1,145 @@ +package cli + +import ( + "fmt" + "strings" + "testing" + "time" +) + +func TestDownloadRunArtifactsParallel(t *testing.T) { + // Test with empty runs slice + results := downloadRunArtifactsConcurrent([]WorkflowRun{}, "./test-logs", false, 5) + if len(results) != 0 { + t.Errorf("Expected 0 results for empty runs, got %d", len(results)) + } + + // Test with mock runs + runs := []WorkflowRun{ + { + DatabaseID: 12345, + Number: 1, + Status: "completed", + Conclusion: "success", + WorkflowName: "Test Workflow", + CreatedAt: time.Now().Add(-1 * time.Hour), + StartedAt: time.Now().Add(-55 * time.Minute), + UpdatedAt: time.Now().Add(-50 * time.Minute), + }, + { + DatabaseID: 12346, + Number: 2, + Status: "completed", + Conclusion: "failure", + WorkflowName: "Test Workflow", + CreatedAt: time.Now().Add(-2 * time.Hour), + StartedAt: time.Now().Add(-115 * time.Minute), + UpdatedAt: time.Now().Add(-110 * time.Minute), + }, + } + + // This will fail since we don't have real GitHub CLI access, + // but we can verify the structure and that no panics occur + results = downloadRunArtifactsConcurrent(runs, "./test-logs", false, 5) + + // We expect 2 results even if they fail + if len(results) != 2 { + t.Errorf("Expected 2 results, got %d", len(results)) + } + + // Verify we have results for all our runs (order may vary due to parallel execution) + foundRuns := make(map[int64]bool) + for _, result := range results { + foundRuns[result.Run.DatabaseID] = true + + // Verify the LogsPath follows the expected pattern (normalize path separators) + expectedSuffix := fmt.Sprintf("run-%d", result.Run.DatabaseID) + if !strings.Contains(result.LogsPath, expectedSuffix) { + t.Errorf("Expected LogsPath to contain %s, got %s", expectedSuffix, result.LogsPath) + } + } + + // Verify we processed all the runs we sent + for _, run := range runs { + if !foundRuns[run.DatabaseID] { + t.Errorf("Missing result for run %d", run.DatabaseID) + } + } +} + +func TestDownloadRunArtifactsParallelMaxRuns(t *testing.T) { + // Test maxRuns parameter + runs := []WorkflowRun{ + {DatabaseID: 1, Status: "completed"}, + {DatabaseID: 2, Status: "completed"}, + {DatabaseID: 3, Status: "completed"}, + {DatabaseID: 4, Status: "completed"}, + {DatabaseID: 5, Status: "completed"}, + } + + // Limit to 3 runs + results := downloadRunArtifactsConcurrent(runs, "./test-logs", false, 3) + + if len(results) != 3 { + t.Errorf("Expected 3 results when maxRuns=3, got %d", len(results)) + } + + // Verify we got exactly 3 results from the first 3 runs (order may vary due to parallel execution) + expectedIDs := map[int64]bool{1: false, 2: false, 3: false} + for _, result := range results { + if _, expected := expectedIDs[result.Run.DatabaseID]; expected { + expectedIDs[result.Run.DatabaseID] = true + } else { + t.Errorf("Got unexpected DatabaseID %d", result.Run.DatabaseID) + } + } + + // Verify all expected IDs were found + for id, found := range expectedIDs { + if !found { + t.Errorf("Missing expected DatabaseID %d", id) + } + } +} + +func TestDownloadResult(t *testing.T) { + // Test DownloadResult structure + run := WorkflowRun{ + DatabaseID: 12345, + Status: "completed", + } + + result := DownloadResult{ + Run: run, + LogsPath: "./test-path", + Skipped: false, + Error: nil, + } + + if result.Run.DatabaseID != 12345 { + t.Errorf("Expected DatabaseID 12345, got %d", result.Run.DatabaseID) + } + + if result.LogsPath != "./test-path" { + t.Errorf("Expected LogsPath './test-path', got %s", result.LogsPath) + } + + if result.Skipped { + t.Error("Expected Skipped to be false") + } + + if result.Error != nil { + t.Errorf("Expected Error to be nil, got %v", result.Error) + } +} + +func TestMaxConcurrentDownloads(t *testing.T) { + // Test that MaxConcurrentDownloads constant is properly defined + if MaxConcurrentDownloads <= 0 { + t.Errorf("MaxConcurrentDownloads should be positive, got %d", MaxConcurrentDownloads) + } + + if MaxConcurrentDownloads > 20 { + t.Errorf("MaxConcurrentDownloads should be reasonable (<=20), got %d", MaxConcurrentDownloads) + } +}