Fix SSH LFS memory usage (#33455)

Fix #33448
This commit is contained in:
wxiaoguang 2025-01-31 19:05:48 +08:00 committed by GitHub
parent 4f3cc26b4e
commit 0e8738b4b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 77 additions and 103 deletions

View file

@ -4,7 +4,6 @@
package backend
import (
"bytes"
"context"
"encoding/base64"
"fmt"
@ -29,7 +28,7 @@ var Capabilities = []string{
"locking",
}
var _ transfer.Backend = &GiteaBackend{}
var _ transfer.Backend = (*GiteaBackend)(nil)
// GiteaBackend is an adapter between git-lfs-transfer library and Gitea's internal LFS API
type GiteaBackend struct {
@ -78,17 +77,17 @@ func (g *GiteaBackend) Batch(_ string, pointers []transfer.BatchItem, args trans
headerAccept: mimeGitLFS,
headerContentType: mimeGitLFS,
}
req := newInternalRequest(g.ctx, url, http.MethodPost, headers, bodyBytes)
req := newInternalRequestLFS(g.ctx, url, http.MethodPost, headers, bodyBytes)
resp, err := req.Response()
if err != nil {
g.logger.Log("http request error", err)
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
g.logger.Log("http statuscode error", resp.StatusCode, statusCodeToErr(resp.StatusCode))
return nil, statusCodeToErr(resp.StatusCode)
}
defer resp.Body.Close()
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
g.logger.Log("http read error", err)
@ -158,8 +157,7 @@ func (g *GiteaBackend) Batch(_ string, pointers []transfer.BatchItem, args trans
return pointers, nil
}
// Download implements transfer.Backend. The returned reader must be closed by the
// caller.
// Download implements transfer.Backend. The returned reader must be closed by the caller.
func (g *GiteaBackend) Download(oid string, args transfer.Args) (io.ReadCloser, int64, error) {
idMapStr, exists := args[argID]
if !exists {
@ -187,25 +185,25 @@ func (g *GiteaBackend) Download(oid string, args transfer.Args) (io.ReadCloser,
headerGiteaInternalAuth: g.internalAuth,
headerAccept: mimeOctetStream,
}
req := newInternalRequest(g.ctx, url, http.MethodGet, headers, nil)
req := newInternalRequestLFS(g.ctx, url, http.MethodGet, headers, nil)
resp, err := req.Response()
if err != nil {
return nil, 0, err
return nil, 0, fmt.Errorf("failed to get response: %w", err)
}
// no need to close the body here by "defer resp.Body.Close()", see below
if resp.StatusCode != http.StatusOK {
return nil, 0, statusCodeToErr(resp.StatusCode)
}
defer resp.Body.Close()
respBytes, err := io.ReadAll(resp.Body)
respSize, err := strconv.ParseInt(resp.Header.Get("X-Gitea-LFS-Content-Length"), 10, 64)
if err != nil {
return nil, 0, err
return nil, 0, fmt.Errorf("failed to parse content length: %w", err)
}
respSize := int64(len(respBytes))
respBuf := io.NopCloser(bytes.NewBuffer(respBytes))
return respBuf, respSize, nil
// transfer.Backend will check io.Closer interface and close this Body reader
return resp.Body, respSize, nil
}
// StartUpload implements transfer.Backend.
// Upload implements transfer.Backend.
func (g *GiteaBackend) Upload(oid string, size int64, r io.Reader, args transfer.Args) error {
idMapStr, exists := args[argID]
if !exists {
@ -234,15 +232,14 @@ func (g *GiteaBackend) Upload(oid string, size int64, r io.Reader, args transfer
headerContentType: mimeOctetStream,
headerContentLength: strconv.FormatInt(size, 10),
}
reqBytes, err := io.ReadAll(r)
if err != nil {
return err
}
req := newInternalRequest(g.ctx, url, http.MethodPut, headers, reqBytes)
req := newInternalRequestLFS(g.ctx, url, http.MethodPut, headers, nil)
req.Body(r)
resp, err := req.Response()
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return statusCodeToErr(resp.StatusCode)
}
@ -284,11 +281,12 @@ func (g *GiteaBackend) Verify(oid string, size int64, args transfer.Args) (trans
headerAccept: mimeGitLFS,
headerContentType: mimeGitLFS,
}
req := newInternalRequest(g.ctx, url, http.MethodPost, headers, bodyBytes)
req := newInternalRequestLFS(g.ctx, url, http.MethodPost, headers, bodyBytes)
resp, err := req.Response()
if err != nil {
return transfer.NewStatus(transfer.StatusInternalServerError), err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return transfer.NewStatus(uint32(resp.StatusCode), http.StatusText(resp.StatusCode)), statusCodeToErr(resp.StatusCode)
}

View file

@ -50,7 +50,7 @@ func (g *giteaLockBackend) Create(path, refname string) (transfer.Lock, error) {
headerAccept: mimeGitLFS,
headerContentType: mimeGitLFS,
}
req := newInternalRequest(g.ctx, url, http.MethodPost, headers, bodyBytes)
req := newInternalRequestLFS(g.ctx, url, http.MethodPost, headers, bodyBytes)
resp, err := req.Response()
if err != nil {
g.logger.Log("http request error", err)
@ -102,7 +102,7 @@ func (g *giteaLockBackend) Unlock(lock transfer.Lock) error {
headerAccept: mimeGitLFS,
headerContentType: mimeGitLFS,
}
req := newInternalRequest(g.ctx, url, http.MethodPost, headers, bodyBytes)
req := newInternalRequestLFS(g.ctx, url, http.MethodPost, headers, bodyBytes)
resp, err := req.Response()
if err != nil {
g.logger.Log("http request error", err)
@ -185,7 +185,7 @@ func (g *giteaLockBackend) queryLocks(v url.Values) ([]transfer.Lock, string, er
headerAccept: mimeGitLFS,
headerContentType: mimeGitLFS,
}
req := newInternalRequest(g.ctx, url, http.MethodGet, headers, nil)
req := newInternalRequestLFS(g.ctx, url, http.MethodGet, headers, nil)
resp, err := req.Response()
if err != nil {
g.logger.Log("http request error", err)

View file

@ -5,15 +5,12 @@ package backend
import (
"context"
"crypto/tls"
"fmt"
"net"
"io"
"net/http"
"time"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/proxyprotocol"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/private"
"github.com/charmbracelet/git-lfs-transfer/transfer"
)
@ -89,53 +86,19 @@ func statusCodeToErr(code int) error {
}
}
func newInternalRequest(ctx context.Context, url, method string, headers map[string]string, body []byte) *httplib.Request {
req := httplib.NewRequest(url, method).
SetContext(ctx).
SetTimeout(10*time.Second, 60*time.Second).
SetTLSClientConfig(&tls.Config{
InsecureSkipVerify: true,
})
if setting.Protocol == setting.HTTPUnix {
req.SetTransport(&http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
var d net.Dialer
conn, err := d.DialContext(ctx, "unix", setting.HTTPAddr)
if err != nil {
return conn, err
}
if setting.LocalUseProxyProtocol {
if err = proxyprotocol.WriteLocalHeader(conn); err != nil {
_ = conn.Close()
return nil, err
}
}
return conn, err
},
})
} else if setting.LocalUseProxyProtocol {
req.SetTransport(&http.Transport{
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
var d net.Dialer
conn, err := d.DialContext(ctx, network, address)
if err != nil {
return conn, err
}
if err = proxyprotocol.WriteLocalHeader(conn); err != nil {
_ = conn.Close()
return nil, err
}
return conn, err
},
})
}
func newInternalRequestLFS(ctx context.Context, url, method string, headers map[string]string, body any) *httplib.Request {
req := private.NewInternalRequest(ctx, url, method)
for k, v := range headers {
req.Header(k, v)
}
req.Body(body)
switch body := body.(type) {
case nil: // do nothing
case []byte:
req.Body(body) // []byte
case io.Reader:
req.Body(body) // io.Reader or io.ReadCloser
default:
panic(fmt.Sprintf("unsupported request body type %T", body))
}
return req
}