package workerpool import ( "context" "sync" ) type WorkerPool struct { workers int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup queue chan func(context.Context) qg sync.WaitGroup } func NewWorkerPool(workers int, queueSize int) *WorkerPool { ctx, cancel := context.WithCancel(context.Background()) queue := make(chan func(context.Context), queueSize) workerPool := WorkerPool{ workers: workers, ctx: ctx, cancel: cancel, queue: queue, } for i := 0; i < workerPool.workers; i++ { workerPool.wg.Add(1) go workerPool.work(workerPool.ctx) } return &workerPool } func (wp *WorkerPool) Stop() { wp.cancel() wp.wg.Wait() } func (wp *WorkerPool) QueueJob(job func(context.Context)) { wp.qg.Add(1) wp.queue <- job } func (wp *WorkerPool) WaitForEmptyQueue() { wp.qg.Wait() } func (wp *WorkerPool) QueueLength() int { return len(wp.queue) } func (wp *WorkerPool) work(ctx context.Context) { for { select { case <-ctx.Done(): wp.wg.Done() return case job := <-wp.queue: func() { defer wp.qg.Done() job(ctx) }() } } }