Reduce integration test overhead (#32475)

In profiling integration tests, I found a couple places where per-test
overhead could be reduced:

* Avoiding disk IO by synchronizing instead of deleting & copying test
Git repository data. This saves ~100ms per test on my machine
* When flushing queues in `PrintCurrentTest`, invoke `FlushWithContext`
in a parallel.

---------

Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
Rowan Bohde 2024-11-14 13:28:46 -06:00 committed by GitHub
parent 249e67672a
commit 68731c07c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
102 changed files with 104 additions and 465 deletions

View file

@ -5,6 +5,7 @@ package queue
import (
"context"
"errors"
"sync"
"time"
@ -32,6 +33,7 @@ type ManagedWorkerPoolQueue interface {
// FlushWithContext tries to make the handler process all items in the queue synchronously.
// It is for testing purpose only. It's not designed to be used in a cluster.
// Negative timeout means discarding all items in the queue.
FlushWithContext(ctx context.Context, timeout time.Duration) error
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
@ -76,15 +78,16 @@ func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
// It is for testing purpose only. It's not designed to be used in a cluster.
// Negative timeout means discarding all items in the queue.
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
var finalErr error
var finalErrors []error
qs := m.ManagedQueues()
for _, q := range qs {
if err := q.FlushWithContext(ctx, timeout); err != nil {
finalErr = err // TODO: in Go 1.20: errors.Join
finalErrors = append(finalErrors, err)
}
}
return finalErr
return errors.Join(finalErrors...)
}
// CreateSimpleQueue creates a simple queue from global setting config provider by name

View file

@ -197,15 +197,30 @@ func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
defer log.Debug("Queue %q finishes flushing", q.GetName())
// stop all workers, and prepare a new worker context to start new workers
wg.ctxWorkerCancel()
wg.wg.Wait()
defer func() {
close(flush)
close(flush.c)
wg.doPrepareWorkerContext()
}()
if flush.timeout < 0 {
// discard everything
wg.batchBuffer = nil
for {
select {
case <-wg.popItemChan:
case <-wg.popItemErr:
case <-q.batchChan:
case <-q.ctxRun.Done():
return
default:
return
}
}
}
// drain the batch channel first
loop:
for {

View file

@ -42,7 +42,10 @@ type WorkerPoolQueue[T any] struct {
workerNumMu sync.Mutex
}
type flushType chan struct{}
type flushType struct {
timeout time.Duration
c chan struct{}
}
var _ ManagedWorkerPoolQueue = (*WorkerPoolQueue[any])(nil)
@ -104,12 +107,12 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
if timeout > 0 {
after = time.After(timeout)
}
c := make(flushType)
flush := flushType{timeout: timeout, c: make(chan struct{})}
// send flush request
// if it blocks, it means that there is a flush in progress or the queue hasn't been started yet
select {
case q.flushChan <- c:
case q.flushChan <- flush:
case <-ctx.Done():
return ctx.Err()
case <-q.ctxRun.Done():
@ -120,7 +123,7 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
// wait for flush to finish
select {
case <-c:
case <-flush.c:
return nil
case <-ctx.Done():
return ctx.Err()