cubefs

Форк
0
132 строки · 3.1 Кб
1
// Copyright 2016 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Package errgroup provides synchronization, error propagation, and Context
6
// cancelation for groups of goroutines working on subtasks of a common task.
7
package errgroup
8

9
import (
10
	"context"
11
	"fmt"
12
	"sync"
13
)
14

15
type token struct{}
16

17
// A Group is a collection of goroutines working on subtasks that are part of
18
// the same overall task.
19
//
20
// A zero Group is valid, has no limit on the number of active goroutines,
21
// and does not cancel on error.
22
type Group struct {
23
	cancel func()
24

25
	wg sync.WaitGroup
26

27
	sem chan token
28

29
	errOnce sync.Once
30
	err     error
31
}
32

33
func (g *Group) done() {
34
	if g.sem != nil {
35
		<-g.sem
36
	}
37
	g.wg.Done()
38
}
39

40
// WithContext returns a new Group and an associated Context derived from ctx.
41
//
42
// The derived Context is canceled the first time a function passed to Go
43
// returns a non-nil error or the first time Wait returns, whichever occurs
44
// first.
45
func WithContext(ctx context.Context) (*Group, context.Context) {
46
	ctx, cancel := context.WithCancel(ctx)
47
	return &Group{cancel: cancel}, ctx
48
}
49

50
// Wait blocks until all function calls from the Go method have returned, then
51
// returns the first non-nil error (if any) from them.
52
func (g *Group) Wait() error {
53
	g.wg.Wait()
54
	if g.cancel != nil {
55
		g.cancel()
56
	}
57
	return g.err
58
}
59

60
// Go calls the given function in a new goroutine.
61
// It blocks until the new goroutine can be added without the number of
62
// active goroutines in the group exceeding the configured limit.
63
//
64
// The first call to return a non-nil error cancels the group's context, if the
65
// group was created by calling WithContext. The error will be returned by Wait.
66
func (g *Group) Go(f func() error) {
67
	if g.sem != nil {
68
		g.sem <- token{}
69
	}
70

71
	g.wg.Add(1)
72
	go func() {
73
		defer g.done()
74

75
		if err := f(); err != nil {
76
			g.errOnce.Do(func() {
77
				g.err = err
78
				if g.cancel != nil {
79
					g.cancel()
80
				}
81
			})
82
		}
83
	}()
84
}
85

86
// TryGo calls the given function in a new goroutine only if the number of
87
// active goroutines in the group is currently below the configured limit.
88
//
89
// The return value reports whether the goroutine was started.
90
func (g *Group) TryGo(f func() error) bool {
91
	if g.sem != nil {
92
		select {
93
		case g.sem <- token{}:
94
			// Note: this allows barging iff channels in general allow barging.
95
		default:
96
			return false
97
		}
98
	}
99

100
	g.wg.Add(1)
101
	go func() {
102
		defer g.done()
103

104
		if err := f(); err != nil {
105
			g.errOnce.Do(func() {
106
				g.err = err
107
				if g.cancel != nil {
108
					g.cancel()
109
				}
110
			})
111
		}
112
	}()
113
	return true
114
}
115

116
// SetLimit limits the number of active goroutines in this group to at most n.
117
// A negative value indicates no limit.
118
//
119
// Any subsequent call to the Go method will block until it can add an active
120
// goroutine without exceeding the configured limit.
121
//
122
// The limit must not be modified while any goroutines in the group are active.
123
func (g *Group) SetLimit(n int) {
124
	if n < 0 {
125
		g.sem = nil
126
		return
127
	}
128
	if len(g.sem) != 0 {
129
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
130
	}
131
	g.sem = make(chan token, n)
132
}
133

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.