mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-11-24 06:19:51 +00:00
Remove transaction for archive download (#32186)
Since there is a status column in the database, the transaction is unnecessary when downloading an archive. The transaction is blocking database operations, especially with SQLite. Replace #27563 (cherry picked from commit e1b269e956e955dd1dfb012f40270d73f8329092)
This commit is contained in:
parent
96ee0f5647
commit
a8f2002a9b
3 changed files with 19 additions and 29 deletions
|
@ -289,9 +289,6 @@ code.gitea.io/gitea/services/pull
|
||||||
code.gitea.io/gitea/services/repository
|
code.gitea.io/gitea/services/repository
|
||||||
IsErrForkAlreadyExist
|
IsErrForkAlreadyExist
|
||||||
|
|
||||||
code.gitea.io/gitea/services/repository/archiver
|
|
||||||
ArchiveRepository
|
|
||||||
|
|
||||||
code.gitea.io/gitea/services/repository/files
|
code.gitea.io/gitea/services/repository/files
|
||||||
ContentType.String
|
ContentType.String
|
||||||
GetFileResponseFromCommit
|
GetFileResponseFromCommit
|
||||||
|
|
|
@ -69,7 +69,7 @@ func (e RepoRefNotFoundError) Is(err error) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequest creates an archival request, based on the URI. The
|
// NewRequest creates an archival request, based on the URI. The
|
||||||
// resulting ArchiveRequest is suitable for being passed to ArchiveRepository()
|
// resulting ArchiveRequest is suitable for being passed to Await()
|
||||||
// if it's determined that the request still needs to be satisfied.
|
// if it's determined that the request still needs to be satisfied.
|
||||||
func NewRequest(ctx context.Context, repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) {
|
func NewRequest(ctx context.Context, repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) {
|
||||||
r := &ArchiveRequest{
|
r := &ArchiveRequest{
|
||||||
|
@ -168,13 +168,14 @@ func (aReq *ArchiveRequest) Await(ctx context.Context) (*repo_model.RepoArchiver
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// doArchive satisfies the ArchiveRequest being passed in. Processing
|
||||||
|
// will occur in a separate goroutine, as this phase may take a while to
|
||||||
|
// complete. If the archive already exists, doArchive will not do
|
||||||
|
// anything. In all cases, the caller should be examining the *ArchiveRequest
|
||||||
|
// being returned for completion, as it may be different than the one they passed
|
||||||
|
// in.
|
||||||
func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
|
func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
|
||||||
txCtx, committer, err := db.TxContext(ctx)
|
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer committer.Close()
|
|
||||||
ctx, _, finished := process.GetManager().AddContext(txCtx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
|
|
||||||
defer finished()
|
defer finished()
|
||||||
|
|
||||||
archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
|
archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
|
||||||
|
@ -209,7 +210,7 @@ func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return archiver, committer.Commit()
|
return archiver, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !errors.Is(err, os.ErrNotExist) {
|
if !errors.Is(err, os.ErrNotExist) {
|
||||||
|
@ -278,17 +279,7 @@ func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return archiver, committer.Commit()
|
return archiver, nil
|
||||||
}
|
|
||||||
|
|
||||||
// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
|
|
||||||
// will occur in a separate goroutine, as this phase may take a while to
|
|
||||||
// complete. If the archive already exists, ArchiveRepository will not do
|
|
||||||
// anything. In all cases, the caller should be examining the *ArchiveRequest
|
|
||||||
// being returned for completion, as it may be different than the one they passed
|
|
||||||
// in.
|
|
||||||
func ArchiveRepository(ctx context.Context, request *ArchiveRequest) (*repo_model.RepoArchiver, error) {
|
|
||||||
return doArchive(ctx, request)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
|
var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
|
||||||
|
@ -298,8 +289,10 @@ func Init(ctx context.Context) error {
|
||||||
handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
|
handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
|
||||||
for _, archiveReq := range items {
|
for _, archiveReq := range items {
|
||||||
log.Trace("ArchiverData Process: %#v", archiveReq)
|
log.Trace("ArchiverData Process: %#v", archiveReq)
|
||||||
if _, err := doArchive(ctx, archiveReq); err != nil {
|
if archiver, err := doArchive(ctx, archiveReq); err != nil {
|
||||||
log.Error("Archive %v failed: %v", archiveReq, err)
|
log.Error("Archive %v failed: %v", archiveReq, err)
|
||||||
|
} else {
|
||||||
|
log.Trace("ArchiverData Success: %#v", archiver)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -81,13 +81,13 @@ func TestArchive_Basic(t *testing.T) {
|
||||||
inFlight[1] = tgzReq
|
inFlight[1] = tgzReq
|
||||||
inFlight[2] = secondReq
|
inFlight[2] = secondReq
|
||||||
|
|
||||||
ArchiveRepository(db.DefaultContext, zipReq)
|
doArchive(db.DefaultContext, zipReq)
|
||||||
ArchiveRepository(db.DefaultContext, tgzReq)
|
doArchive(db.DefaultContext, tgzReq)
|
||||||
ArchiveRepository(db.DefaultContext, secondReq)
|
doArchive(db.DefaultContext, secondReq)
|
||||||
|
|
||||||
// Make sure sending an unprocessed request through doesn't affect the queue
|
// Make sure sending an unprocessed request through doesn't affect the queue
|
||||||
// count.
|
// count.
|
||||||
ArchiveRepository(db.DefaultContext, zipReq)
|
doArchive(db.DefaultContext, zipReq)
|
||||||
|
|
||||||
// Sleep two seconds to make sure the queue doesn't change.
|
// Sleep two seconds to make sure the queue doesn't change.
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
@ -102,7 +102,7 @@ func TestArchive_Basic(t *testing.T) {
|
||||||
// We still have the other three stalled at completion, waiting to remove
|
// We still have the other three stalled at completion, waiting to remove
|
||||||
// from archiveInProgress. Try to submit this new one before its
|
// from archiveInProgress. Try to submit this new one before its
|
||||||
// predecessor has cleared out of the queue.
|
// predecessor has cleared out of the queue.
|
||||||
ArchiveRepository(db.DefaultContext, zipReq2)
|
doArchive(db.DefaultContext, zipReq2)
|
||||||
|
|
||||||
// Now we'll submit a request and TimedWaitForCompletion twice, before and
|
// Now we'll submit a request and TimedWaitForCompletion twice, before and
|
||||||
// after we release it. We should trigger both the timeout and non-timeout
|
// after we release it. We should trigger both the timeout and non-timeout
|
||||||
|
@ -110,7 +110,7 @@ func TestArchive_Basic(t *testing.T) {
|
||||||
timedReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".tar.gz")
|
timedReq, err := NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, secondCommit+".tar.gz")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotNil(t, timedReq)
|
assert.NotNil(t, timedReq)
|
||||||
ArchiveRepository(db.DefaultContext, timedReq)
|
doArchive(db.DefaultContext, timedReq)
|
||||||
|
|
||||||
zipReq2, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
|
zipReq2, err = NewRequest(ctx, ctx.Repo.Repository.ID, ctx.Repo.GitRepo, firstCommit+".zip")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Reference in a new issue