Pond
🔘 Minimalistic and High-performance goroutine worker pool written in Go
Install / Use
/learn @alitto/PondREADME
<a title="Codecov" target="_blank" href="https://github.com/alitto/pond/actions"><img alt="Build status" src="https://github.com/alitto/pond/actions/workflows/main.yml/badge.svg?branch=main&event=push"/></a> <a title="Codecov" target="_blank" href="https://app.codecov.io/gh/alitto/pond/tree/main"><img src="https://codecov.io/gh/alitto/pond/branch/main/graph/badge.svg"/></a> <a title="Release" target="_blank" href="https://github.com/alitto/pond/releases"><img src="https://img.shields.io/github/v/release/alitto/pond"/></a> <a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/alitto/pond/v2"><img src="https://goreportcard.com/badge/github.com/alitto/pond/v2"/></a>
<img src="./docs/logo.svg" height="100" />pond is a minimalistic and high-performance Go library designed to elegantly manage concurrent tasks.
Motivation
This library is meant to provide a simple and idiomatic way to manage concurrency in Go programs. Based on the Worker Pool pattern, it allows running a large number of tasks concurrently while limiting the number of goroutines that are running at the same time. This is useful when you need to limit the number of concurrent operations to avoid resource exhaustion or hitting rate limits.
Some common use cases include:
- Processing a large number of tasks concurrently
- Limiting the number of concurrent HTTP requests
- Limiting the number of concurrent database connections
- Sending HTTP requests to a rate-limited API
Features:
- Zero dependencies
- Create pools of goroutines that scale automatically based on the number of tasks submitted
- Limit the number of concurrent tasks running at the same time
- Worker goroutines are only created when needed and immediately removed when idle (scale to zero)
- Minimalistic and fluent APIs for:
- Creating worker pools with maximum number of workers
- Submitting tasks to a pool and waiting for them to complete
- Submitting tasks to a pool in a fire-and-forget fashion
- Submitting a group of tasks and waiting for them to complete or the first error to occur
- Stopping a worker pool
- Monitoring pool metrics such as number of running workers, tasks waiting in the queue, etc.
- Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios
- Complete pool metrics such as number of running workers, tasks waiting in the queue and more
- Configurable parent context to stop all workers when it is cancelled
- New features in v2:
- Bounded or Unbounded task queues
- Submission of tasks that return results
- Awaitable task completion
- Type safe APIs for tasks that return errors or results
- Panics recovery (panics are captured and returned as errors)
- Subpools with a fraction of the parent pool's maximum number of workers
- Blocking and non-blocking submission of tasks when the queue is full
- Dynamic resizing of the pool
- API reference
Installation
go get -u github.com/alitto/pond/v2
Usage
Submitting tasks to a pool with limited concurrency
package main
import (
"fmt"
"github.com/alitto/pond/v2"
)
func main() {
// Create a pool with limited concurrency
pool := pond.NewPool(100)
// Submit 1000 tasks
for i := 0; i < 1000; i++ {
i := i
pool.Submit(func() {
fmt.Printf("Running task #%d\n", i)
})
}
// Stop the pool and wait for all submitted tasks to complete
pool.StopAndWait()
}
Submitting tasks that return errors
This feature allows you to submit tasks that return an error. This is useful when you need to handle errors that occur during the execution of a task.
// Create a pool with limited concurrency
pool := pond.NewPool(100)
// Submit a task that returns an error
task := pool.SubmitErr(func() error {
return errors.New("An error occurred")
})
// Wait for the task to complete and get the error
err := task.Wait()
Submitting tasks that return results
This feature allows you to submit tasks that return a value. This is useful when you need to process the result of a task.
// Create a pool that accepts tasks that return a string and an error
pool := pond.NewResultPool[string](10)
// Submit a task that returns a string
task := pool.Submit(func() (string) {
return "Hello, World!"
})
// Wait for the task to complete and get the result
result, err := task.Wait()
// result = "Hello, World!" and err = nil
Submitting tasks that return results or errors
This feature allows you to submit tasks that return a value and an error. This is useful when you need to handle errors that occur during the execution of a task.
// Create a concurrency limited pool that accepts tasks that return a string
pool := pond.NewResultPool[string](10)
// Submit a task that returns a string value or an error
task := pool.SubmitErr(func() (string, error) {
return "Hello, World!", nil
})
// Wait for the task to complete and get the result
result, err := task.Wait()
// result = "Hello, World!" and err = nil
Submitting tasks associated with a context
If you need to submit a task that is associated with a context, you can pass the context directly to the task function.
// Create a pool with limited concurrency
pool := pond.NewPool(10)
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
// Submit a task that is associated with a context
task := pool.SubmitErr(func() error {
return doSomethingWithCtx(ctx) // Pass the context to the task directly
})
// Wait for the task to complete and get the error.
// If the context is cancelled, the task is stopped and an error is returned.
err := task.Wait()
Submitting a group of related tasks
You can submit a group of tasks that are related to each other. This is useful when you need to execute a group of tasks concurrently and wait for all of them to complete.
// Create a pool with limited concurrency
pool := pond.NewPool(10)
// Create a task group
group := pool.NewGroup()
// Submit a group of tasks
for i := 0; i < 20; i++ {
i := i
group.Submit(func() {
fmt.Printf("Running group task #%d\n", i)
})
}
// Wait for all tasks in the group to complete
err := group.Wait()
Submitting a group of related tasks associated with a context
You can submit a group of tasks that are linked to a context. This is useful when you need to execute a group of tasks concurrently and stop them when the context is cancelled (e.g. when the parent task is cancelled or times out).
// Create a pool with limited concurrency
pool := pond.NewPool(10)
// Create a context with a 5s timeout
timeout, _ := context.WithTimeout(context.Background(), 5*time.Second)
// Create a task group with a context
group := pool.NewGroupContext(timeout)
// Submit a group of tasks
for i := 0; i < 20; i++ {
i := i
group.Submit(func() {
fmt.Printf("Running group task #%d\n", i)
})
}
// Wait for all tasks in the group to complete or the timeout to occur, whichever comes first
err := group.Wait()
Submitting a group of related tasks and waiting for the first error
You can submit a group of tasks that are related to each other and wait for the first error to occur. This is useful when you need to execute a group of tasks concurrently and stop the execution if an error occurs.
// Create a pool with limited concurrency
pool := pond.NewPool(10)
// Create a task group
group := pool.NewGroup()
// Submit a group of tasks
for i := 0; i < 20; i++ {
i := i
group.SubmitErr(func() error {
return doSomethingThatCanFail()
})
}
// Wait for all tasks in the group to complete or the first error to occur
err := group.Wait()
Cancelling tasks immediately
When the first error occurs, tasks that are in the queue will be aborted but any running task will not be disrupted. The call to group.Wait() will not wait for these "in-flight" tasks to complete but they will continue running until completion nonetheless.
If you also need to stop these "in-flight" tasks when the first error occurs, you can reference the group's context (accessible via group.Context()) from any long-running operation carried out within these tasks. Here's an example:
// Create a pool with limited concurrency
pool := pond.NewPool(10)
// Create a task group
group := pool.NewGroup()
// Submit a group of tasks
for i := 0; i < 20; i++ {
i := i
group.SubmitErr(func() error {
// Cancel all in-flight tasks when the first error occurs
return doSomethingThatCanFailInContext(group.Context())
})
}
// Wait for all tasks in the group to complete or the first error to occur
err := group.Wait()
Submitting a group of related tasks that return results
You can submit a group of tasks that are related to each other and return results. This is useful when you need to execute a group of tasks concurrently and process the results. Results are returned in the order they were submitted.
// Create a pool with limited concurrency
pool := pond.NewResultPool[string](10)
// Create a task group
group := pool.NewGroup()
// Submit a group of tasks
for i := 0; i < 20; i++ {
i := i
group.Submit(func() string {
return fmt.Sprintf("Task #%d", i)
})
}
// Wait for all tasks in the group to complete
results, err := group.Wait()
// results = ["Task #0", "Task #1", ..., "Task #19"] and err = nil
Stopping a group of tasks when a context is cancelled
If you need to submit a group of tasks that are associated with a context and stop them when the context is cancelled, you can pass the context directly to the task function.
// Create a pool with limited concurrency
pool := pond.NewPool(10)
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
// Create a task group
group := po
Related Skills
node-connect
339.3kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
xurl
339.3kA CLI tool for making authenticated requests to the X (Twitter) API. Use this skill when you need to post tweets, reply, quote, search, read posts, manage followers, send DMs, upload media, or interact with any X API v2 endpoint.
frontend-design
83.9kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
339.3kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
