Browse Source

#835: Realtime webhooks

Unknwon 9 years ago
parent
commit
fa298a2c30

+ 7 - 0
cmd/serve.go

@@ -16,6 +16,7 @@ import (
 	"github.com/codegangsta/cli"
 
 	"github.com/gogits/gogs/models"
+	"github.com/gogits/gogs/modules/httplib"
 	"github.com/gogits/gogs/modules/log"
 	"github.com/gogits/gogs/modules/setting"
 	"github.com/gogits/gogs/modules/uuid"
@@ -193,6 +194,12 @@ func runServ(c *cli.Context) {
 		}
 	}
 
+	// Send deliver hook request.
+	resp, err := httplib.Head(setting.AppUrl + setting.AppSubUrl + repoUserName + "/" + repoName + "/hooks/trigger").Response()
+	if err == nil {
+		resp.Body.Close()
+	}
+
 	// Update key activity.
 	key, err := models.GetPublicKeyById(keyId)
 	if err != nil {

+ 1 - 0
cmd/web.go

@@ -451,6 +451,7 @@ func runWeb(ctx *cli.Context) {
 		m.Get("/archive/*", repo.Download)
 		m.Get("/pulls2/", repo.PullRequest2)
 		m.Get("/milestone2/", repo.Milestones2)
+		m.Head("/hooks/trigger", repo.TriggerHook)
 
 		m.Group("", func() {
 			m.Get("/src/*", repo.Home)

+ 2 - 2
conf/app.ini

@@ -91,8 +91,8 @@ ENABLE_REVERSE_PROXY_AUTO_REGISTRATION = false
 DISABLE_MINIMUM_KEY_SIZE_CHECK = false
 
 [webhook]
-; Cron task interval in minutes
-TASK_INTERVAL = 1
+; Hook task queue length
+QUEUE_LENGTH = 1000
 ; Deliver timeout in seconds
 DELIVER_TIMEOUT = 5
 ; Allow insecure certification

+ 1 - 1
gogs.go

@@ -17,7 +17,7 @@ import (
 	"github.com/gogits/gogs/modules/setting"
 )
 
-const APP_VER = "0.6.2.0725 Beta"
+const APP_VER = "0.6.3.0725 Beta"
 
 func init() {
 	runtime.GOMAXPROCS(runtime.NumCPU())

+ 2 - 0
models/action.go

@@ -431,6 +431,8 @@ func CommitRepoAction(userId, repoUserId int64, userName, actEmail string,
 		}
 
 		if err = CreateHookTask(&HookTask{
+			RepoID:      repo.Id,
+			HookID:      w.Id,
 			Type:        w.HookTaskType,
 			Url:         w.Url,
 			BasePayload: payload,

+ 113 - 59
models/webhook.go

@@ -9,6 +9,7 @@ import (
 	"encoding/json"
 	"errors"
 	"io/ioutil"
+	"sync"
 	"time"
 
 	"github.com/gogits/gogs/modules/httplib"
@@ -259,7 +260,9 @@ func (p Payload) GetJSONPayload() ([]byte, error) {
 
 // HookTask represents a hook task.
 type HookTask struct {
-	Id             int64
+	ID             int64 `xorm:"pk autoincr"`
+	RepoID         int64 `xorm:"INDEX"`
+	HookID         int64
 	Uuid           string
 	Type           HookTaskType
 	Url            string
@@ -269,6 +272,7 @@ type HookTask struct {
 	EventType      HookEventType
 	IsSsl          bool
 	IsDelivered    bool
+	Delivered      int64
 	IsSucceed      bool
 }
 
@@ -287,87 +291,137 @@ func CreateHookTask(t *HookTask) error {
 
 // UpdateHookTask updates information of hook task.
 func UpdateHookTask(t *HookTask) error {
-	_, err := x.Id(t.Id).AllCols().Update(t)
+	_, err := x.Id(t.ID).AllCols().Update(t)
 	return err
 }
 
-var (
-	// Prevent duplicate deliveries.
-	// This happens with massive hook tasks cannot finish delivering
-	// before next shooting starts.
-	isShooting = false
-)
+type hookQueue struct {
+	// Make sure one repository only occur once in the queue.
+	lock    sync.Mutex
+	repoIDs map[int64]bool
 
-// DeliverHooks checks and delivers undelivered hooks.
-// FIXME: maybe can use goroutine to shoot a number of them at same time?
-func DeliverHooks() {
-	if isShooting {
+	queue chan int64
+}
+
+func (q *hookQueue) removeRepoID(id int64) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	delete(q.repoIDs, id)
+}
+
+func (q *hookQueue) addRepoID(id int64) {
+	q.lock.Lock()
+	if q.repoIDs[id] {
+		q.lock.Unlock()
 		return
 	}
-	isShooting = true
-	defer func() { isShooting = false }()
+	q.repoIDs[id] = true
+	q.lock.Unlock()
+	q.queue <- id
+}
 
-	tasks := make([]*HookTask, 0, 10)
+// AddRepoID adds repository ID to hook delivery queue.
+func (q *hookQueue) AddRepoID(id int64) {
+	go q.addRepoID(id)
+}
+
+var HookQueue *hookQueue
+
+func deliverHook(t *HookTask) {
 	timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
-	x.Where("is_delivered=?", false).Iterate(new(HookTask),
-		func(idx int, bean interface{}) error {
-			t := bean.(*HookTask)
-			req := httplib.Post(t.Url).SetTimeout(timeout, timeout).
-				Header("X-Gogs-Delivery", t.Uuid).
-				Header("X-Gogs-Event", string(t.EventType)).
-				SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify})
-
-			switch t.ContentType {
-			case JSON:
-				req = req.Header("Content-Type", "application/json").Body(t.PayloadContent)
-			case FORM:
-				req.Param("payload", t.PayloadContent)
-			}
+	req := httplib.Post(t.Url).SetTimeout(timeout, timeout).
+		Header("X-Gogs-Delivery", t.Uuid).
+		Header("X-Gogs-Event", string(t.EventType)).
+		SetTLSClientConfig(&tls.Config{InsecureSkipVerify: setting.Webhook.SkipTLSVerify})
 
-			t.IsDelivered = true
+	switch t.ContentType {
+	case JSON:
+		req = req.Header("Content-Type", "application/json").Body(t.PayloadContent)
+	case FORM:
+		req.Param("payload", t.PayloadContent)
+	}
 
-			// FIXME: record response.
-			switch t.Type {
-			case GOGS:
-				{
-					if _, err := req.Response(); err != nil {
-						log.Error(5, "Delivery: %v", err)
+	t.IsDelivered = true
+
+	// FIXME: record response.
+	switch t.Type {
+	case GOGS:
+		{
+			if resp, err := req.Response(); err != nil {
+				log.Error(5, "Delivery: %v", err)
+			} else {
+				resp.Body.Close()
+				t.IsSucceed = true
+			}
+		}
+	case SLACK:
+		{
+			if resp, err := req.Response(); err != nil {
+				log.Error(5, "Delivery: %v", err)
+			} else {
+				defer resp.Body.Close()
+				contents, err := ioutil.ReadAll(resp.Body)
+				if err != nil {
+					log.Error(5, "%s", err)
+				} else {
+					if string(contents) != "ok" {
+						log.Error(5, "slack failed with: %s", string(contents))
 					} else {
 						t.IsSucceed = true
 					}
 				}
-			case SLACK:
-				{
-					if res, err := req.Response(); err != nil {
-						log.Error(5, "Delivery: %v", err)
-					} else {
-						defer res.Body.Close()
-						contents, err := ioutil.ReadAll(res.Body)
-						if err != nil {
-							log.Error(5, "%s", err)
-						} else {
-							if string(contents) != "ok" {
-								log.Error(5, "slack failed with: %s", string(contents))
-							} else {
-								t.IsSucceed = true
-							}
-						}
-					}
-				}
 			}
+		}
+	}
 
-			tasks = append(tasks, t)
+	t.Delivered = time.Now().UTC().UnixNano()
+	if t.IsSucceed {
+		log.Trace("Hook delivered(%s): %s", t.Uuid, t.PayloadContent)
+	}
+}
 
-			if t.IsSucceed {
-				log.Trace("Hook delivered(%s): %s", t.Uuid, t.PayloadContent)
-			}
+// DeliverHooks checks and delivers undelivered hooks.
+func DeliverHooks() {
+	tasks := make([]*HookTask, 0, 10)
+	x.Where("is_delivered=?", false).Iterate(new(HookTask),
+		func(idx int, bean interface{}) error {
+			t := bean.(*HookTask)
+			deliverHook(t)
+			tasks = append(tasks, t)
 			return nil
 		})
 
 	// Update hook task status.
 	for _, t := range tasks {
 		if err := UpdateHookTask(t); err != nil {
-			log.Error(4, "UpdateHookTask(%d): %v", t.Id, err)
+			log.Error(4, "UpdateHookTask(%d): %v", t.ID, err)
+		}
+	}
+
+	HookQueue = &hookQueue{
+		lock:    sync.Mutex{},
+		repoIDs: make(map[int64]bool),
+		queue:   make(chan int64, setting.Webhook.QueueLength),
+	}
+
+	// Start listening on new hook requests.
+	for repoID := range HookQueue.queue {
+		HookQueue.removeRepoID(repoID)
+
+		tasks = make([]*HookTask, 0, 5)
+		if err := x.Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil {
+			log.Error(4, "Get repository(%d) hook tasks: %v", repoID, err)
+			continue
+		}
+		for _, t := range tasks {
+			deliverHook(t)
+			if err := UpdateHookTask(t); err != nil {
+				log.Error(4, "UpdateHookTask(%d): %v", t.ID, err)
+			}
 		}
 	}
 }
+
+func InitDeliverHooks() {
+	go DeliverHooks()
+}

File diff suppressed because it is too large
+ 0 - 0
modules/bindata/bindata.go


+ 0 - 1
modules/cron/manager.go

@@ -15,7 +15,6 @@ var c = New()
 
 func NewCronContext() {
 	c.AddFunc("Update mirrors", "@every 1h", models.MirrorUpdate)
-	c.AddFunc("Deliver hooks", fmt.Sprintf("@every %dm", setting.Webhook.TaskInterval), models.DeliverHooks)
 	if setting.Git.Fsck.Enable {
 		c.AddFunc("Repository health check", fmt.Sprintf("@every %dh", setting.Git.Fsck.Interval), models.GitFsck)
 	}

+ 2 - 2
modules/setting/setting.go

@@ -76,7 +76,7 @@ var (
 
 	// Webhook settings.
 	Webhook struct {
-		TaskInterval   int
+		QueueLength    int
 		DeliverTimeout int
 		SkipTLSVerify  bool
 	}
@@ -555,7 +555,7 @@ func newNotifyMailService() {
 
 func newWebhookService() {
 	sec := Cfg.Section("webhook")
-	Webhook.TaskInterval = sec.Key("TASK_INTERVAL").MustInt(1)
+	Webhook.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000)
 	Webhook.DeliverTimeout = sec.Key("DELIVER_TIMEOUT").MustInt(5)
 	Webhook.SkipTLSVerify = sec.Key("SKIP_TLS_VERIFY").MustBool()
 }

+ 1 - 0
routers/install.go

@@ -68,6 +68,7 @@ func GlobalInit() {
 
 		models.HasEngine = true
 		cron.NewCronContext()
+		models.InitDeliverHooks()
 		log.NewGitLogger(path.Join(setting.LogRootPath, "http.log"))
 	}
 	if models.EnableSQLite3 {

+ 4 - 1
routers/repo/http.go

@@ -190,7 +190,10 @@ func Http(ctx *middleware.Context) {
 						refName := fields[2]
 
 						// FIXME: handle error.
-						models.Update(refName, oldCommitId, newCommitId, authUsername, username, reponame, authUser.Id)
+						if err = models.Update(refName, oldCommitId, newCommitId, authUsername, username, reponame, authUser.Id); err == nil {
+							models.HookQueue.AddRepoID(repo.Id)
+						}
+
 					}
 					lastLine = lastLine + size
 				} else {

+ 4 - 0
routers/repo/setting.go

@@ -634,3 +634,7 @@ func GitHooksEditPost(ctx *middleware.Context) {
 	}
 	ctx.Redirect(ctx.Repo.RepoLink + "/settings/hooks/git")
 }
+
+func TriggerHook(ctx *middleware.Context) {
+	models.HookQueue.AddRepoID(ctx.Repo.Repository.Id)
+}

+ 1 - 1
templates/.VERSION

@@ -1 +1 @@
-0.6.2.0725 Beta
+0.6.3.0725 Beta

Some files were not shown because too many files changed in this diff