Store webhook event in database (#29145)
Refactor the webhook logic, to have the type-dependent processing happen only in one place. --- ## Current webhook flow 1. An event happens 2. It is pre-processed (depending on the webhook type) and its body is added to a task queue 3. When the task is processed, some more logic (depending on the webhook type as well) is applied to make an HTTP request This means that webhook-type dependant logic is needed in step 2 and 3. This is cumbersome and brittle to maintain. Updated webhook flow with this PR: 1. An event happens 2. It is stored as-is and added to a task queue 3. When the task is processed, the event is processed (depending on the webhook type) to make an HTTP request So the only webhook-type dependent logic happens in one place (step 3) which should be much more robust. ## Consequences of the refactor - the raw event must be stored in the hooktask (until now, the pre-processed body was stored) - to ensure that previous hooktasks are correctly sent, a `payload_version` is added (version 1: the body has already been pre-process / version 2: the body is the raw event) So future webhook additions will only have to deal with creating an http.Request based on the raw event (no need to adjust the code in multiple places, like currently). Moreover since this processing happens when fetching from the task queue, it ensures that the queuing of new events (upon a `git push` for instance) does not get slowed down by a slow webhook. As a concrete example, the PR #19307 for custom webhooks, should be substantially smaller: - no need to change `services/webhook/deliver.go` - minimal change in `services/webhook/webhook.go` (add the new webhook to the map) - no need to change all the individual webhook files (since with this refactor the `*webhook_model.Webhook` is provided as argument)
This commit is contained in:
parent
45277486c2
commit
26653b196b
28 changed files with 1686 additions and 1518 deletions
|
@ -4,11 +4,12 @@
|
|||
package webhook
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"html"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
@ -23,6 +24,37 @@ import (
|
|||
webhook_module "code.gitea.io/gitea/modules/webhook"
|
||||
)
|
||||
|
||||
func newMatrixRequest(ctx context.Context, w *webhook_model.Webhook, t *webhook_model.HookTask) (*http.Request, []byte, error) {
|
||||
meta := &MatrixMeta{}
|
||||
if err := json.Unmarshal([]byte(w.Meta), meta); err != nil {
|
||||
return nil, nil, fmt.Errorf("GetMatrixPayload meta json: %w", err)
|
||||
}
|
||||
mc := matrixConvertor{
|
||||
MsgType: messageTypeText[meta.MessageType],
|
||||
}
|
||||
payload, err := newPayload(mc, []byte(t.PayloadContent), t.EventType)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
body, err := json.MarshalIndent(payload, "", " ")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
txnID, err := getMatrixTxnID(body)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPut, w.URL+"/"+txnID, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
return req, body, addDefaultHeaders(req, []byte(w.Secret), t, body) // likely useless, but has always been sent historially
|
||||
}
|
||||
|
||||
const matrixPayloadSizeLimit = 1024 * 64
|
||||
|
||||
// MatrixMeta contains the Matrix metadata
|
||||
|
@ -46,8 +78,6 @@ func GetMatrixHook(w *webhook_model.Webhook) *MatrixMeta {
|
|||
return s
|
||||
}
|
||||
|
||||
var _ PayloadConvertor = &MatrixPayload{}
|
||||
|
||||
// MatrixPayload contains payload for a Matrix room
|
||||
type MatrixPayload struct {
|
||||
Body string `json:"body"`
|
||||
|
@ -57,90 +87,79 @@ type MatrixPayload struct {
|
|||
Commits []*api.PayloadCommit `json:"io.gitea.commits,omitempty"`
|
||||
}
|
||||
|
||||
// JSONPayload Marshals the MatrixPayload to json
|
||||
func (m *MatrixPayload) JSONPayload() ([]byte, error) {
|
||||
data, err := json.MarshalIndent(m, "", " ")
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
return data, nil
|
||||
var _ payloadConvertor[MatrixPayload] = matrixConvertor{}
|
||||
|
||||
type matrixConvertor struct {
|
||||
MsgType string
|
||||
}
|
||||
|
||||
// MatrixLinkFormatter creates a link compatible with Matrix
|
||||
func MatrixLinkFormatter(url, text string) string {
|
||||
return fmt.Sprintf(`<a href="%s">%s</a>`, html.EscapeString(url), html.EscapeString(text))
|
||||
func (m matrixConvertor) newPayload(text string, commits ...*api.PayloadCommit) (MatrixPayload, error) {
|
||||
return MatrixPayload{
|
||||
Body: getMessageBody(text),
|
||||
MsgType: m.MsgType,
|
||||
Format: "org.matrix.custom.html",
|
||||
FormattedBody: text,
|
||||
Commits: commits,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MatrixLinkToRef Matrix-formatter link to a repo ref
|
||||
func MatrixLinkToRef(repoURL, ref string) string {
|
||||
refName := git.RefName(ref).ShortName()
|
||||
switch {
|
||||
case strings.HasPrefix(ref, git.BranchPrefix):
|
||||
return MatrixLinkFormatter(repoURL+"/src/branch/"+util.PathEscapeSegments(refName), refName)
|
||||
case strings.HasPrefix(ref, git.TagPrefix):
|
||||
return MatrixLinkFormatter(repoURL+"/src/tag/"+util.PathEscapeSegments(refName), refName)
|
||||
default:
|
||||
return MatrixLinkFormatter(repoURL+"/src/commit/"+util.PathEscapeSegments(refName), refName)
|
||||
}
|
||||
}
|
||||
|
||||
// Create implements PayloadConvertor Create method
|
||||
func (m *MatrixPayload) Create(p *api.CreatePayload) (api.Payloader, error) {
|
||||
repoLink := MatrixLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
// Create implements payloadConvertor Create method
|
||||
func (m matrixConvertor) Create(p *api.CreatePayload) (MatrixPayload, error) {
|
||||
repoLink := htmlLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
refLink := MatrixLinkToRef(p.Repo.HTMLURL, p.Ref)
|
||||
text := fmt.Sprintf("[%s:%s] %s created by %s", repoLink, refLink, p.RefType, p.Sender.UserName)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Delete composes Matrix payload for delete a branch or tag.
|
||||
func (m *MatrixPayload) Delete(p *api.DeletePayload) (api.Payloader, error) {
|
||||
func (m matrixConvertor) Delete(p *api.DeletePayload) (MatrixPayload, error) {
|
||||
refName := git.RefName(p.Ref).ShortName()
|
||||
repoLink := MatrixLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
repoLink := htmlLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
text := fmt.Sprintf("[%s:%s] %s deleted by %s", repoLink, refName, p.RefType, p.Sender.UserName)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Fork composes Matrix payload for forked by a repository.
|
||||
func (m *MatrixPayload) Fork(p *api.ForkPayload) (api.Payloader, error) {
|
||||
baseLink := MatrixLinkFormatter(p.Forkee.HTMLURL, p.Forkee.FullName)
|
||||
forkLink := MatrixLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
func (m matrixConvertor) Fork(p *api.ForkPayload) (MatrixPayload, error) {
|
||||
baseLink := htmlLinkFormatter(p.Forkee.HTMLURL, p.Forkee.FullName)
|
||||
forkLink := htmlLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
text := fmt.Sprintf("%s is forked to %s", baseLink, forkLink)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Issue implements PayloadConvertor Issue method
|
||||
func (m *MatrixPayload) Issue(p *api.IssuePayload) (api.Payloader, error) {
|
||||
text, _, _, _ := getIssuesPayloadInfo(p, MatrixLinkFormatter, true)
|
||||
// Issue implements payloadConvertor Issue method
|
||||
func (m matrixConvertor) Issue(p *api.IssuePayload) (MatrixPayload, error) {
|
||||
text, _, _, _ := getIssuesPayloadInfo(p, htmlLinkFormatter, true)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// IssueComment implements PayloadConvertor IssueComment method
|
||||
func (m *MatrixPayload) IssueComment(p *api.IssueCommentPayload) (api.Payloader, error) {
|
||||
text, _, _ := getIssueCommentPayloadInfo(p, MatrixLinkFormatter, true)
|
||||
// IssueComment implements payloadConvertor IssueComment method
|
||||
func (m matrixConvertor) IssueComment(p *api.IssueCommentPayload) (MatrixPayload, error) {
|
||||
text, _, _ := getIssueCommentPayloadInfo(p, htmlLinkFormatter, true)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Wiki implements PayloadConvertor Wiki method
|
||||
func (m *MatrixPayload) Wiki(p *api.WikiPayload) (api.Payloader, error) {
|
||||
text, _, _ := getWikiPayloadInfo(p, MatrixLinkFormatter, true)
|
||||
// Wiki implements payloadConvertor Wiki method
|
||||
func (m matrixConvertor) Wiki(p *api.WikiPayload) (MatrixPayload, error) {
|
||||
text, _, _ := getWikiPayloadInfo(p, htmlLinkFormatter, true)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Release implements PayloadConvertor Release method
|
||||
func (m *MatrixPayload) Release(p *api.ReleasePayload) (api.Payloader, error) {
|
||||
text, _ := getReleasePayloadInfo(p, MatrixLinkFormatter, true)
|
||||
// Release implements payloadConvertor Release method
|
||||
func (m matrixConvertor) Release(p *api.ReleasePayload) (MatrixPayload, error) {
|
||||
text, _ := getReleasePayloadInfo(p, htmlLinkFormatter, true)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Push implements PayloadConvertor Push method
|
||||
func (m *MatrixPayload) Push(p *api.PushPayload) (api.Payloader, error) {
|
||||
// Push implements payloadConvertor Push method
|
||||
func (m matrixConvertor) Push(p *api.PushPayload) (MatrixPayload, error) {
|
||||
var commitDesc string
|
||||
|
||||
if p.TotalCommits == 1 {
|
||||
|
@ -149,13 +168,13 @@ func (m *MatrixPayload) Push(p *api.PushPayload) (api.Payloader, error) {
|
|||
commitDesc = fmt.Sprintf("%d commits", p.TotalCommits)
|
||||
}
|
||||
|
||||
repoLink := MatrixLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
repoLink := htmlLinkFormatter(p.Repo.HTMLURL, p.Repo.FullName)
|
||||
branchLink := MatrixLinkToRef(p.Repo.HTMLURL, p.Ref)
|
||||
text := fmt.Sprintf("[%s] %s pushed %s to %s:<br>", repoLink, p.Pusher.UserName, commitDesc, branchLink)
|
||||
|
||||
// for each commit, generate a new line text
|
||||
for i, commit := range p.Commits {
|
||||
text += fmt.Sprintf("%s: %s - %s", MatrixLinkFormatter(commit.URL, commit.ID[:7]), commit.Message, commit.Author.Name)
|
||||
text += fmt.Sprintf("%s: %s - %s", htmlLinkFormatter(commit.URL, commit.ID[:7]), commit.Message, commit.Author.Name)
|
||||
// add linebreak to each commit but the last
|
||||
if i < len(p.Commits)-1 {
|
||||
text += "<br>"
|
||||
|
@ -163,41 +182,41 @@ func (m *MatrixPayload) Push(p *api.PushPayload) (api.Payloader, error) {
|
|||
|
||||
}
|
||||
|
||||
return getMatrixPayload(text, p.Commits, m.MsgType), nil
|
||||
return m.newPayload(text, p.Commits...)
|
||||
}
|
||||
|
||||
// PullRequest implements PayloadConvertor PullRequest method
|
||||
func (m *MatrixPayload) PullRequest(p *api.PullRequestPayload) (api.Payloader, error) {
|
||||
text, _, _, _ := getPullRequestPayloadInfo(p, MatrixLinkFormatter, true)
|
||||
// PullRequest implements payloadConvertor PullRequest method
|
||||
func (m matrixConvertor) PullRequest(p *api.PullRequestPayload) (MatrixPayload, error) {
|
||||
text, _, _, _ := getPullRequestPayloadInfo(p, htmlLinkFormatter, true)
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Review implements PayloadConvertor Review method
|
||||
func (m *MatrixPayload) Review(p *api.PullRequestPayload, event webhook_module.HookEventType) (api.Payloader, error) {
|
||||
senderLink := MatrixLinkFormatter(setting.AppURL+url.PathEscape(p.Sender.UserName), p.Sender.UserName)
|
||||
// Review implements payloadConvertor Review method
|
||||
func (m matrixConvertor) Review(p *api.PullRequestPayload, event webhook_module.HookEventType) (MatrixPayload, error) {
|
||||
senderLink := htmlLinkFormatter(setting.AppURL+url.PathEscape(p.Sender.UserName), p.Sender.UserName)
|
||||
title := fmt.Sprintf("#%d %s", p.Index, p.PullRequest.Title)
|
||||
titleLink := MatrixLinkFormatter(p.PullRequest.HTMLURL, title)
|
||||
repoLink := MatrixLinkFormatter(p.Repository.HTMLURL, p.Repository.FullName)
|
||||
titleLink := htmlLinkFormatter(p.PullRequest.HTMLURL, title)
|
||||
repoLink := htmlLinkFormatter(p.Repository.HTMLURL, p.Repository.FullName)
|
||||
var text string
|
||||
|
||||
switch p.Action {
|
||||
case api.HookIssueReviewed:
|
||||
action, err := parseHookPullRequestEventType(event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return MatrixPayload{}, err
|
||||
}
|
||||
|
||||
text = fmt.Sprintf("[%s] Pull request review %s: %s by %s", repoLink, action, titleLink, senderLink)
|
||||
}
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
// Repository implements PayloadConvertor Repository method
|
||||
func (m *MatrixPayload) Repository(p *api.RepositoryPayload) (api.Payloader, error) {
|
||||
senderLink := MatrixLinkFormatter(setting.AppURL+p.Sender.UserName, p.Sender.UserName)
|
||||
repoLink := MatrixLinkFormatter(p.Repository.HTMLURL, p.Repository.FullName)
|
||||
// Repository implements payloadConvertor Repository method
|
||||
func (m matrixConvertor) Repository(p *api.RepositoryPayload) (MatrixPayload, error) {
|
||||
senderLink := htmlLinkFormatter(setting.AppURL+p.Sender.UserName, p.Sender.UserName)
|
||||
repoLink := htmlLinkFormatter(p.Repository.HTMLURL, p.Repository.FullName)
|
||||
var text string
|
||||
|
||||
switch p.Action {
|
||||
|
@ -206,13 +225,12 @@ func (m *MatrixPayload) Repository(p *api.RepositoryPayload) (api.Payloader, err
|
|||
case api.HookRepoDeleted:
|
||||
text = fmt.Sprintf("[%s] Repository deleted by %s", repoLink, senderLink)
|
||||
}
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
func (m *MatrixPayload) Package(p *api.PackagePayload) (api.Payloader, error) {
|
||||
senderLink := MatrixLinkFormatter(setting.AppURL+p.Sender.UserName, p.Sender.UserName)
|
||||
packageLink := MatrixLinkFormatter(p.Package.HTMLURL, p.Package.Name)
|
||||
func (m matrixConvertor) Package(p *api.PackagePayload) (MatrixPayload, error) {
|
||||
senderLink := htmlLinkFormatter(setting.AppURL+p.Sender.UserName, p.Sender.UserName)
|
||||
packageLink := htmlLinkFormatter(p.Package.HTMLURL, p.Package.Name)
|
||||
var text string
|
||||
|
||||
switch p.Action {
|
||||
|
@ -222,31 +240,7 @@ func (m *MatrixPayload) Package(p *api.PackagePayload) (api.Payloader, error) {
|
|||
text = fmt.Sprintf("[%s] Package deleted by %s", packageLink, senderLink)
|
||||
}
|
||||
|
||||
return getMatrixPayload(text, nil, m.MsgType), nil
|
||||
}
|
||||
|
||||
// GetMatrixPayload converts a Matrix webhook into a MatrixPayload
|
||||
func GetMatrixPayload(p api.Payloader, event webhook_module.HookEventType, meta string) (api.Payloader, error) {
|
||||
s := new(MatrixPayload)
|
||||
|
||||
matrix := &MatrixMeta{}
|
||||
if err := json.Unmarshal([]byte(meta), &matrix); err != nil {
|
||||
return s, errors.New("GetMatrixPayload meta json:" + err.Error())
|
||||
}
|
||||
|
||||
s.MsgType = messageTypeText[matrix.MessageType]
|
||||
|
||||
return convertPayloader(s, p, event)
|
||||
}
|
||||
|
||||
func getMatrixPayload(text string, commits []*api.PayloadCommit, msgType string) *MatrixPayload {
|
||||
p := MatrixPayload{}
|
||||
p.FormattedBody = text
|
||||
p.Body = getMessageBody(text)
|
||||
p.Format = "org.matrix.custom.html"
|
||||
p.MsgType = msgType
|
||||
p.Commits = commits
|
||||
return &p
|
||||
return m.newPayload(text)
|
||||
}
|
||||
|
||||
var urlRegex = regexp.MustCompile(`<a [^>]*?href="([^">]*?)">(.*?)</a>`)
|
||||
|
@ -271,3 +265,16 @@ func getMatrixTxnID(payload []byte) (string, error) {
|
|||
|
||||
return hex.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// MatrixLinkToRef Matrix-formatter link to a repo ref
|
||||
func MatrixLinkToRef(repoURL, ref string) string {
|
||||
refName := git.RefName(ref).ShortName()
|
||||
switch {
|
||||
case strings.HasPrefix(ref, git.BranchPrefix):
|
||||
return htmlLinkFormatter(repoURL+"/src/branch/"+util.PathEscapeSegments(refName), refName)
|
||||
case strings.HasPrefix(ref, git.TagPrefix):
|
||||
return htmlLinkFormatter(repoURL+"/src/tag/"+util.PathEscapeSegments(refName), refName)
|
||||
default:
|
||||
return htmlLinkFormatter(repoURL+"/src/commit/"+util.PathEscapeSegments(refName), refName)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue