Make LFS http_client parallel within a batch. (#32369)
Signed-off-by: Royce Remer <royceremer@gmail.com> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
parent
f2a6df03d9
commit
54146e62c0
7 changed files with 183 additions and 153 deletions
|
@ -17,6 +17,8 @@ import (
|
|||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/proxy"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// HTTPClient is used to communicate with the LFS server
|
||||
|
@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
|
|||
return c.performOperation(ctx, objects, nil, callback)
|
||||
}
|
||||
|
||||
// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
|
||||
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
|
||||
if len(objects) == 0 {
|
||||
return nil
|
||||
|
@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
|
|||
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
|
||||
}
|
||||
|
||||
errGroup, groupCtx := errgroup.WithContext(ctx)
|
||||
errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
|
||||
for _, object := range result.Objects {
|
||||
if object.Error != nil {
|
||||
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
|
||||
if uc != nil {
|
||||
if _, err := uc(object.Pointer, object.Error); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := dc(object.Pointer, nil, object.Error); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if uc != nil {
|
||||
if len(object.Actions) == 0 {
|
||||
log.Trace("%v already present on server", object.Pointer)
|
||||
continue
|
||||
}
|
||||
|
||||
link, ok := object.Actions["upload"]
|
||||
if !ok {
|
||||
log.Debug("%+v", object)
|
||||
return errors.New("missing action 'upload'")
|
||||
}
|
||||
|
||||
content, err := uc(object.Pointer, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
link, ok = object.Actions["verify"]
|
||||
if ok {
|
||||
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
link, ok := object.Actions["download"]
|
||||
if !ok {
|
||||
// no actions block in response, try legacy response schema
|
||||
link, ok = object.Links["download"]
|
||||
}
|
||||
if !ok {
|
||||
log.Debug("%+v", object)
|
||||
return errors.New("missing action 'download'")
|
||||
}
|
||||
|
||||
content, err := transferAdapter.Download(ctx, link)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := dc(object.Pointer, content, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
errGroup.Go(func() error {
|
||||
return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
|
||||
})
|
||||
}
|
||||
|
||||
// only the first error is returned, preserving legacy behavior before concurrency
|
||||
return errGroup.Wait()
|
||||
}
|
||||
|
||||
// performSingleOperation performs an LFS upload or download operation on a single object
|
||||
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
|
||||
// the response from a lfs batch api request for this specific object id contained an error
|
||||
if object.Error != nil {
|
||||
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
|
||||
|
||||
// this was an 'upload' request inside the batch request
|
||||
if uc != nil {
|
||||
if _, err := uc(object.Pointer, object.Error); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
|
||||
if err := dc(object.Pointer, nil, object.Error); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// if the callback returns no err, then the error could be ignored, and the operations should continue
|
||||
return nil
|
||||
}
|
||||
|
||||
// the response from an lfs batch api request contained necessary upload/download fields to act upon
|
||||
if uc != nil {
|
||||
if len(object.Actions) == 0 {
|
||||
log.Trace("%v already present on server", object.Pointer)
|
||||
return nil
|
||||
}
|
||||
|
||||
link, ok := object.Actions["upload"]
|
||||
if !ok {
|
||||
return errors.New("missing action 'upload'")
|
||||
}
|
||||
|
||||
content, err := uc(object.Pointer, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
link, ok = object.Actions["verify"]
|
||||
if ok {
|
||||
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
link, ok := object.Actions["download"]
|
||||
if !ok {
|
||||
// no actions block in response, try legacy response schema
|
||||
link, ok = object.Links["download"]
|
||||
}
|
||||
if !ok {
|
||||
log.Debug("%+v", object)
|
||||
return errors.New("missing action 'download'")
|
||||
}
|
||||
|
||||
content, err := transferAdapter.Download(ctx, link)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := dc(object.Pointer, content, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue