...
Package errgroup
Package errgroup provides synchronization, error propagation, and Context
cancelation for groups of goroutines working on subtasks of a common task.
A Group is a collection of goroutines working on subtasks that are part of
the same overall task.
A zero Group is valid and does not cancel on error.
type Group struct {
}
▾ Example (JustErrors)
JustErrors illustrates the use of a Group in place of a sync.WaitGroup to
simplify goroutine counting and error handling. This example is derived from
the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.
Code:
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
url := url
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
▾ Example (Parallel)
Parallel illustrates the use of a Group for synchronizing a simple parallel
task: the "Google Search 2.0" function from
https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context
and error-handling.
Code:
Google := func(ctx context.Context, query string) ([]Result, error) {
g, ctx := errgroup.WithContext(ctx)
searches := []Search{Web, Image, Video}
results := make([]Result, len(searches))
for i, search := range searches {
i, search := i, search
g.Go(func() error {
result, err := search(ctx, query)
if err == nil {
results[i] = result
}
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
results, err := Google(context.Background(), "golang")
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
for _, result := range results {
fmt.Println(result)
}
Output:
web result for "golang"
image result for "golang"
video result for "golang"
▾ Example (Pipeline)
Pipeline demonstrates the use of a Group to implement a multi-stage
pipeline: a version of the MD5All function with bounded parallelism from
https://blog.golang.org/pipelines.
Code:
package errgroup_test
import (
"context"
"crypto/md5"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"golang.org/x/sync/errgroup"
)
func ExampleGroup_pipeline() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}
for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}
type result struct {
path string
sum [md5.Size]byte
}
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string)
g.Go(func() error {
defer close(paths)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
})
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for path := range paths {
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
select {
case c <- result{path, md5.Sum(data)}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
go func() {
g.Wait()
close(c)
}()
m := make(map[string][md5.Size]byte)
for r := range c {
m[r.path] = r.sum
}
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}
func WithContext(ctx context.Context) (*Group, context.Context)
WithContext returns a new Group and an associated Context derived from ctx.
The derived Context is canceled the first time a function passed to Go
returns a non-nil error or the first time Wait returns, whichever occurs
first.
func (*Group) Go
¶
func (g *Group) Go(f func() error)
Go calls the given function in a new goroutine.
The first call to return a non-nil error cancels the group; its error will be
returned by Wait.
func (*Group) Wait
¶
func (g *Group) Wait() error
Wait blocks until all function calls from the Go method have returned, then
returns the first non-nil error (if any) from them.