Tsk is a Go library for handling generic concurrent tasks.
It provides multiple packages that each implement a single pattern.
go get -u github.com/abevier/tsk
The futures package provides a wrapper around an asynchronous computation.
A channel and a goroutine are often used to achieve the same result, but there are differences.
- A Future can be read by any number of go routines, a channel only provides the value once
- A Future can be safely completed multiple times: only the first completion is used, subsequent completions are ignored silently, a channel will panic if written to when closed.
Basic Future Example:
fut := futures.New[Response]()
go func() {
bgCtx := context.WithTimeout(context.Background(), 45 * time.Second)
resp, err := slowServiceClient.MakeRequest(bgCtx, req)
if err != nil {
fut.Fail(err)
return
}
fut.Complete(resp)
}()
// This will block until the slowServiceClient returns a response and the future is completed
resp, err := fut.Get(ctx)
Another basic Future example using FromFunc
fut := futures.FromFunc(func() (Response, error) {
bgCtx := context.WithTimeout(context.Background(), 45 * time.Second)
resp, err := slowServiceClient.MakeRequest(bgCtx, req)
return resp, err
})
// This will block until the slowServiceClient returns a response and the future is completed
resp, err := fut.Get(ctx)
Simple Cache using Future Example
type SimpleCache struct {
m sync.Mutex{}
values map[string]*futures.Future[Response]
}
func NewCache() *SimpleCache {
return &SimpleCache{
m: sync.Mutex{},
values: make(map[string]*futures.Future[Response])
}
}
// Get will return a future if it exists, if it does not then a new future will
// be returned after it is added to the cache
func (c *SimpleCache) Get(key string) *futures.Future[Response] {
c.m.Lock()
defer c.m.Unlock()
f, ok := c.values[key]
if !ok {
f = futures.FromFunc(func() (Response, error) {
bgCtx := context.WithTimeout(context.Background(), 45 * time.Second)
resp, err := slowServiceClient.MakeRequest(bgCtx, Request{Key: key})
return resp, err
})
c.values[key] = f
}
return f
}
func main() {
wg := sync.WaitGroup{}
cache := NewCache()
// All go routines read the same result - only one slow request is made despite 100
// concurrent requests for the same key
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
futureResp := cache.Get("the-key")
resp, _ := futureResp.Get(context.TODO())
log.Printf("response is: %v", resp)
}()
}
wg.Wait()
}
The batch package provides a batch executor implementation that allows multiple go routines to seamlessly batch tasks which are then flushed to a user defined executor function.
A common use case for this is batching multiple http requests that wish to write to a database into a single update to the database. Another common use is multiple http requests that should publish a message to AWS SQS which allows for batching of up to 10 messages in a single request.
Batch Example
be := batch.New(batch.Opts{MaxSize: 10, MaxLinger:250 * time.Millisecond}, runBatch)
func runBatch(events []string) ([]results.Result[string], error) {
serviceRequest := myservice.Request{items: events}
resp, err := myservice.Write(serviceRequest)
if err != nil {
// All items in the batch will fail with this error
return nil, err
}
// The run batch function must return an result for every item passed into the function
var res []results.Result[string]
for _, r := range resp.items {
if r.Err != nil {
res = append(res, results.Fail[string](err))
continue
}
res = append(res, results.Success(item.ID))
}
return res, nil
}
func eventHandler(w http.ResponseWriter, req *http.Request) {
b, _ := ioutil.ReadAll(req.body)
event := string(b)
// Each request will submit individual items to the batch which will flush after 10 items or once the batch is 250ms old
// This will not return until the batch flushes and a result is returned from runBatch
id, err := be.Submit(req.Context(), event)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(id))
}
func main() {
http.HandleFunc("/event", eventHandler)
http.ListenAndServe(":8080", nil)
}
The ratelimiter package provides a rate limiter that utilizes the Token Bucket algorithm to limit the rate that a function will be invoked. A common use case is to prevent a service from overwhelming other systems.
Rate Limiter Example
opts := ratelimiter.Opts{
Limit: 10,
Burst: 1,
MaxQueueDepth: 100,
FullQueueStategy: ratelimiter.ErrorWhenFull,
}
rl := ratelimiter.New(opts, do)
// This function will be called up to 10 times per second
func do(ctx context.Context, request string) (string, error) {
resp, err := myservice.RateLimitedQuery(ctx, request)
return resp, err
}
func requestHandler(w http.ResponseWriter, req *http.Request) {
b, _ := ioutil.ReadAll(req.body)
request := string(b)
// Each call to submit will block until the rate limited function is invoked and returns a result.
// If the number pending request is too high ErrQueueFull is returned and the server will respond
// to the request with an HTTP 429
resp, err := rl.Submit(req.Context(), event)
if err != nil {
if errors.Is(err, ratelimiter.ErrQueueFull) {
w.WriteHeader(http.StatusTooManyRequests)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
return
}
w.Write([]byte(resp))
}
func main() {
http.HandleFunc("/request", requestHandler)
http.ListenAndServe(":8080", nil)
}
The taskqueue package provides a task queue that schedules work on a pool of goroutines. It is used to limit concurrent invocations to a function. Much like a rate limiter a common use case is to prevent overwhelming other services.
Task Queue Example
opts := taskqueue.Opts{
MaxWorkers: 3,
MaxQueueDepth: 100,
FullQueueStategy: taskqueue.ErrorWhenFull,
}
tq := taskqueue.New(opts, do)
// This function will never be invoked more than 3 times concurrently while behind the TaskQueue
func do(ctx context.Context, request string) (string, error) {
resp, err := myservice.ConcurrencyLimitedQuery(ctx, request)
return resp, err
}
func requestHandler(w http.ResponseWriter, req *http.Request) {
b, _ := ioutil.ReadAll(req.body)
request := string(b)
// Each call to submit will block until the concurrency limited function is invoked and returns a result.
// If the number pending request is too high ErrQueueFull is returned and the server will respond
// to the request with an HTTP 429
resp, err := tq.Submit(req.Context(), event)
if err != nil {
if errors.Is(err, taskqueue.ErrQueueFull) {
w.WriteHeader(http.StatusTooManyRequests)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
return
}
w.Write([]byte(resp))
}
func main() {
http.HandleFunc("/request", requestHandler)
http.ListenAndServe(":8080", nil)
}
I wrote this library because I found that I was repeating these patterns in multiple places. My preference is to keep application business logic and task plumbing logic separate when at all possible. This means I try to avoid exposing channels and other concurrency primitives to business services. Many concurrency bugs I've encountered in Go have been due to improperly sending/receiving to unbuffered channels and forgetting to cancel when a context is cancelled.