67 lines
1.1 KiB
Go
67 lines
1.1 KiB
Go
package workerpool
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
type WorkerPool struct {
|
|
workers int
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
queue chan func(context.Context)
|
|
qg sync.WaitGroup
|
|
}
|
|
|
|
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
queue := make(chan func(context.Context), queueSize)
|
|
|
|
workerPool := WorkerPool{
|
|
workers: workers,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
queue: queue,
|
|
}
|
|
|
|
for i := 0; i < workerPool.workers; i++ {
|
|
workerPool.wg.Add(1)
|
|
go workerPool.work(workerPool.ctx)
|
|
}
|
|
|
|
return &workerPool
|
|
}
|
|
|
|
func (wp *WorkerPool) Stop() {
|
|
wp.cancel()
|
|
wp.wg.Wait()
|
|
}
|
|
|
|
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
|
|
wp.qg.Add(1)
|
|
wp.queue <- job
|
|
}
|
|
|
|
func (wp *WorkerPool) WaitForEmptyQueue() {
|
|
wp.qg.Wait()
|
|
}
|
|
|
|
func (wp *WorkerPool) QueueLength() int {
|
|
return len(wp.queue)
|
|
}
|
|
|
|
func (wp *WorkerPool) work(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
wp.wg.Done()
|
|
return
|
|
case job := <-wp.queue:
|
|
func() {
|
|
defer wp.qg.Done()
|
|
job(ctx)
|
|
}()
|
|
}
|
|
}
|
|
}
|