Skip to content

Commit

Permalink
Remove transaction for archive download (#32186) (#32520)
Browse files Browse the repository at this point in the history
Backport #32186 by @lunny

Since there is a status column in the database, the transaction is
unnecessary when downloading an archive. The transaction is blocking
database operations, especially with SQLite.

Replace #27563

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
  • Loading branch information
GiteaBot and lunny authored Nov 15, 2024
1 parent 257ce61 commit d03dd04
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 26 deletions.
33 changes: 13 additions & 20 deletions services/repository/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (e RepoRefNotFoundError) Is(err error) bool {
}

// NewRequest creates an archival request, based on the URI. The
// resulting ArchiveRequest is suitable for being passed to ArchiveRepository()
// resulting ArchiveRequest is suitable for being passed to Await()
// if it's determined that the request still needs to be satisfied.
func NewRequest(repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) {
r := &ArchiveRequest{
Expand Down Expand Up @@ -151,13 +151,14 @@ func (aReq *ArchiveRequest) Await(ctx context.Context) (*repo_model.RepoArchiver
}
}

// doArchive satisfies the ArchiveRequest being passed in. Processing
// will occur in a separate goroutine, as this phase may take a while to
// complete. If the archive already exists, doArchive will not do
// anything. In all cases, the caller should be examining the *ArchiveRequest
// being returned for completion, as it may be different than the one they passed
// in.
func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
txCtx, committer, err := db.TxContext(ctx)
if err != nil {
return nil, err
}
defer committer.Close()
ctx, _, finished := process.GetManager().AddContext(txCtx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
defer finished()

archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
Expand Down Expand Up @@ -192,7 +193,7 @@ func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver
return nil, err
}
}
return archiver, committer.Commit()
return archiver, nil
}

if !errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -261,17 +262,7 @@ func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver
}
}

return archiver, committer.Commit()
}

// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
// will occur in a separate goroutine, as this phase may take a while to
// complete. If the archive already exists, ArchiveRepository will not do
// anything. In all cases, the caller should be examining the *ArchiveRequest
// being returned for completion, as it may be different than the one they passed
// in.
func ArchiveRepository(ctx context.Context, request *ArchiveRequest) (*repo_model.RepoArchiver, error) {
return doArchive(ctx, request)
return archiver, nil
}

var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
Expand All @@ -281,8 +272,10 @@ func Init(ctx context.Context) error {
handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
for _, archiveReq := range items {
log.Trace("ArchiverData Process: %#v", archiveReq)
if _, err := doArchive(ctx, archiveReq); err != nil {
if archiver, err := doArchive(ctx, archiveReq); err != nil {
log.Error("Archive %v failed: %v", archiveReq, err)
} else {
log.Trace("ArchiverData Success: %#v", archiver)
}
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions services/repository/archiver/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ func TestArchive_Basic(t *testing.T) {
inFlight[1] = tgzReq
inFlight[2] = secondReq

ArchiveRepository(db.DefaultContext, zipReq)
ArchiveRepository(db.DefaultContext, tgzReq)
ArchiveRepository(db.DefaultContext, secondReq)
doArchive(db.DefaultContext, zipReq)
doArchive(db.DefaultContext, tgzReq)
doArchive(db.DefaultContext, secondReq)

// Make sure sending an unprocessed request through doesn't affect the queue
// count.
ArchiveRepository(db.DefaultContext, zipReq)
doArchive(db.DefaultContext, zipReq)

// Sleep two seconds to make sure the queue doesn't change.
time.Sleep(2 * time.Second)
Expand All @@ -101,15 +101,15 @@ func TestArchive_Basic(t *testing.T) {
// We still have the other three stalled at completion, waiting to remove
// from archiveInProgress. Try to submit this new one before its
// predecessor has cleared out of the queue.
ArchiveRepository(db.DefaultContext, zipReq2)
doArchive(db.DefaultContext, zipReq2)

// Now we'll submit a request and TimedWaitForCompletion twice, before and
// after we release it. We should trigger both the timeout and non-timeout
// cases.
timedReq, err := NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".tar.gz")
assert.NoError(t, err)
assert.NotNil(t, timedReq)
ArchiveRepository(db.DefaultContext, timedReq)
doArchive(db.DefaultContext, timedReq)

zipReq2, err = NewRequest(ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
assert.NoError(t, err)
Expand Down

0 comments on commit d03dd04

Please sign in to comment.