format with gofumpt (#18184)
* gofumpt -w -l . * gofumpt -w -l -extra . * Add linter * manual fix * change make fmt
This commit is contained in:
parent
1d98d205f5
commit
54e9ee37a7
423 changed files with 1585 additions and 1758 deletions
|
@ -109,8 +109,8 @@ func GetManager() *Manager {
|
|||
func (m *Manager) Add(managed interface{},
|
||||
t Type,
|
||||
configuration,
|
||||
exemplar interface{}) int64 {
|
||||
|
||||
exemplar interface{},
|
||||
) int64 {
|
||||
cfg, _ := json.Marshal(configuration)
|
||||
mq := &ManagedQueue{
|
||||
Type: t,
|
||||
|
@ -141,7 +141,6 @@ func (m *Manager) Remove(qid int64) {
|
|||
delete(m.Queues, qid)
|
||||
m.mutex.Unlock()
|
||||
log.Trace("Queue Manager removed: QID: %d", qid)
|
||||
|
||||
}
|
||||
|
||||
// GetManagedQueue by qid
|
||||
|
@ -225,7 +224,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
|
|||
wg.Wait()
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// ManagedQueues returns the managed queues
|
||||
|
|
|
@ -39,7 +39,7 @@ type Data interface{}
|
|||
type HandlerFunc func(...Data)
|
||||
|
||||
// NewQueueFunc is a function that creates a queue
|
||||
type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
|
||||
type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
|
||||
|
||||
// Shutdownable represents a queue that can be shutdown
|
||||
type Shutdownable interface {
|
||||
|
@ -70,8 +70,7 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro
|
|||
}
|
||||
|
||||
// DummyQueue represents an empty queue
|
||||
type DummyQueue struct {
|
||||
}
|
||||
type DummyQueue struct{}
|
||||
|
||||
// Run does nothing
|
||||
func (*DummyQueue) Run(_, _ func(func())) {}
|
||||
|
|
|
@ -195,9 +195,11 @@ loop:
|
|||
}
|
||||
}
|
||||
|
||||
var errQueueEmpty = fmt.Errorf("empty queue")
|
||||
var errEmptyBytes = fmt.Errorf("empty bytes")
|
||||
var errUnmarshal = fmt.Errorf("failed to unmarshal")
|
||||
var (
|
||||
errQueueEmpty = fmt.Errorf("empty queue")
|
||||
errEmptyBytes = fmt.Errorf("empty bytes")
|
||||
errUnmarshal = fmt.Errorf("failed to unmarshal")
|
||||
)
|
||||
|
||||
func (q *ByteFIFOQueue) doPop() error {
|
||||
q.lock.Lock()
|
||||
|
|
|
@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
|
|||
q.internal.(*LevelQueue).Shutdown()
|
||||
GetManager().Remove(q.internal.(*LevelQueue).qid)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Flush flushes the queue and blocks till the queue is empty
|
||||
|
|
|
@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) {
|
|||
for _, callback := range callbacks {
|
||||
callback()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
|
|||
for q.internal == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
var cfg = q.cfg
|
||||
cfg := q.cfg
|
||||
if s, ok := cfg.([]byte); ok {
|
||||
cfg = string(s)
|
||||
}
|
||||
|
|
|
@ -197,7 +197,6 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
|
|||
q.internal.(*LevelUniqueQueue).Shutdown()
|
||||
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Flush flushes the queue
|
||||
|
|
|
@ -341,7 +341,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
|
|||
|
||||
func (p *WorkerPool) doWork(ctx context.Context) {
|
||||
delay := time.Millisecond * 300
|
||||
var data = make([]Data, 0, p.batchLength)
|
||||
data := make([]Data, 0, p.batchLength)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue