Skip to content

Commit

Permalink
new implementation to fix the deadlock issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ClairePhi committed Dec 17, 2024
1 parent f394dbd commit 6c797e5
Showing 1 changed file with 77 additions and 46 deletions.
123 changes: 77 additions & 46 deletions cmd/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"log"
"math"
"runtime"
"sync"

"github.com/cupcicm/opp/core"
"github.com/go-git/go-git/v5/plumbing"
"github.com/urfave/cli/v3"
"golang.org/x/sync/semaphore"
)

func CleanCommand(repo *core.Repo, gh func(context.Context) core.Gh) *cli.Command {
Expand Down Expand Up @@ -45,10 +45,7 @@ func (c cleaner) Clean(ctx context.Context) error {
defer cancel()

// results channel will receive the results of each pr cleaning operation
results, err := c.cleaningPipeline(ctx)
if err != nil {
return err
}
results := c.cleaningPipeline(ctx)

for result := range results {
if result.err != nil {
Expand All @@ -60,58 +57,92 @@ func (c cleaner) Clean(ctx context.Context) error {
return nil
}

func (c cleaner) cleaningPipeline(ctx context.Context) (chan cleanResult, error) {
results := make(chan cleanResult)

// The semaphore will be used to limit the number of goroutines that can be launched in parallel.
maxNumberOfGoroutines := int64(runtime.GOMAXPROCS(0))
sem := semaphore.NewWeighted(maxNumberOfGoroutines)
func pushLocalPrsToChannel(ctx context.Context, prs ...core.LocalPr) <-chan core.LocalPr {
out := make(chan core.LocalPr)

// Wait for the semaphore to be fully released before closing the results channel.
defer func() {
go func() {
err := sem.Acquire(ctx, maxNumberOfGoroutines)
if err != nil && ctx.Err() == nil {
log.Panicf("What is the error if not the context error? Error: %s.", err)
go func() {
defer close(out)
for _, pr := range prs {
select {
case out <- pr:
case <-ctx.Done():
return
}
close(results)
}()
}

}()
return out
}

cleanPr := func(pr core.LocalPr) {
if err := sem.Acquire(ctx, 1); err != nil {
results <- cleanResult{
err: fmt.Errorf("cannot acquire semaphore: %w", err),
func (c cleaner) processCleanPRFromChannel(ctx context.Context, in <-chan core.LocalPr) <-chan cleanResult {
out := make(chan cleanResult)
go func() {
defer close(out)
for pr := range in {
select {
case out <- cleanResult{c.cleanPR(ctx, pr), pr}:
case <-ctx.Done():
return
}
}
defer sem.Release(1)
_, err := c.repo.GetRemoteTip(&pr)
if errors.Is(err, plumbing.ErrReferenceNotFound) {
// The remote tip does not exist anymore : it has been deleted on the github repo.
// Probably because the PR is either abandonned or merged.
}()
return out
}

func (c cleaner) cleanPR(ctx context.Context, pr core.LocalPr) error {
_, err := c.repo.GetRemoteTip(&pr)
if errors.Is(err, plumbing.ErrReferenceNotFound) {
// The remote tip does not exist anymore : it has been deleted on the github repo.
// Probably because the PR is either abandonned or merged.
c.repo.CleanupAfterMerge(ctx, &pr)
} else {
githubPr, _, err := c.pullRequests.Get(ctx, core.GetGithubOwner(), core.GetGithubRepoName(), pr.PrNumber)
if err != nil {
return err
}
if *githubPr.State == "closed" {
c.repo.CleanupAfterMerge(ctx, &pr)
} else {
githubPr, _, err := c.pullRequests.Get(ctx, core.GetGithubOwner(), core.GetGithubRepoName(), pr.PrNumber)
if err != nil {
select {
case results <- cleanResult{err, pr}:
case <-ctx.Done():
}
}
}
return nil
}

func mergeResultChannels(ctx context.Context, cs ...<-chan cleanResult) <-chan cleanResult {
out := make(chan cleanResult)
var wg sync.WaitGroup

output := func(c <-chan cleanResult) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-ctx.Done():
return
}
if *githubPr.State == "closed" {
c.repo.CleanupAfterMerge(ctx, &pr)
}
}
select {
case results <- cleanResult{nil, pr}:
case <-ctx.Done():
}
}

for _, pr := range c.localPrs {
go cleanPr(pr)
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

go func() {
wg.Wait()
close(out)
}()
return out
}

func (c cleaner) cleaningPipeline(ctx context.Context) <-chan cleanResult {
in := pushLocalPrsToChannel(ctx, c.localPrs...)

resultChannelNb := int(math.Min(float64(runtime.GOMAXPROCS(0)), float64(len(c.localPrs))))

resultChannels := make([]<-chan cleanResult, 0)
for i := 1; i <= resultChannelNb; i++ {
resultChannels = append(resultChannels, c.processCleanPRFromChannel(ctx, in))
}

return results, nil
return mergeResultChannels(ctx, resultChannels...)
}

0 comments on commit 6c797e5

Please sign in to comment.