cubefs

Форк
0
205 строк · 5.2 Кб
1
// Copyright 2013 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Package singleflight provides a duplicate function call suppression
6
// mechanism.
7
package singleflight // import "golang.org/x/sync/singleflight"
8

9
import (
10
	"bytes"
11
	"errors"
12
	"fmt"
13
	"runtime"
14
	"runtime/debug"
15
	"sync"
16
)
17

18
// errGoexit indicates the runtime.Goexit was called in
19
// the user given function.
20
var errGoexit = errors.New("runtime.Goexit was called")
21

22
// A panicError is an arbitrary value recovered from a panic
23
// with the stack trace during the execution of given function.
24
type panicError struct {
25
	value interface{}
26
	stack []byte
27
}
28

29
// Error implements error interface.
30
func (p *panicError) Error() string {
31
	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
32
}
33

34
func newPanicError(v interface{}) error {
35
	stack := debug.Stack()
36

37
	// The first line of the stack trace is of the form "goroutine N [status]:"
38
	// but by the time the panic reaches Do the goroutine may no longer exist
39
	// and its status will have changed. Trim out the misleading line.
40
	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
41
		stack = stack[line+1:]
42
	}
43
	return &panicError{value: v, stack: stack}
44
}
45

46
// call is an in-flight or completed singleflight.Do call
47
type call struct {
48
	wg sync.WaitGroup
49

50
	// These fields are written once before the WaitGroup is done
51
	// and are only read after the WaitGroup is done.
52
	val interface{}
53
	err error
54

55
	// These fields are read and written with the singleflight
56
	// mutex held before the WaitGroup is done, and are read but
57
	// not written after the WaitGroup is done.
58
	dups  int
59
	chans []chan<- Result
60
}
61

62
// Group represents a class of work and forms a namespace in
63
// which units of work can be executed with duplicate suppression.
64
type Group struct {
65
	mu sync.Mutex       // protects m
66
	m  map[string]*call // lazily initialized
67
}
68

69
// Result holds the results of Do, so they can be passed
70
// on a channel.
71
type Result struct {
72
	Val    interface{}
73
	Err    error
74
	Shared bool
75
}
76

77
// Do executes and returns the results of the given function, making
78
// sure that only one execution is in-flight for a given key at a
79
// time. If a duplicate comes in, the duplicate caller waits for the
80
// original to complete and receives the same results.
81
// The return value shared indicates whether v was given to multiple callers.
82
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
83
	g.mu.Lock()
84
	if g.m == nil {
85
		g.m = make(map[string]*call)
86
	}
87
	if c, ok := g.m[key]; ok {
88
		c.dups++
89
		g.mu.Unlock()
90
		c.wg.Wait()
91

92
		if e, ok := c.err.(*panicError); ok {
93
			panic(e)
94
		} else if c.err == errGoexit {
95
			runtime.Goexit()
96
		}
97
		return c.val, c.err, true
98
	}
99
	c := new(call)
100
	c.wg.Add(1)
101
	g.m[key] = c
102
	g.mu.Unlock()
103

104
	g.doCall(c, key, fn)
105
	return c.val, c.err, c.dups > 0
106
}
107

108
// DoChan is like Do but returns a channel that will receive the
109
// results when they are ready.
110
//
111
// The returned channel will not be closed.
112
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
113
	ch := make(chan Result, 1)
114
	g.mu.Lock()
115
	if g.m == nil {
116
		g.m = make(map[string]*call)
117
	}
118
	if c, ok := g.m[key]; ok {
119
		c.dups++
120
		c.chans = append(c.chans, ch)
121
		g.mu.Unlock()
122
		return ch
123
	}
124
	c := &call{chans: []chan<- Result{ch}}
125
	c.wg.Add(1)
126
	g.m[key] = c
127
	g.mu.Unlock()
128

129
	go g.doCall(c, key, fn)
130

131
	return ch
132
}
133

134
// doCall handles the single call for a key.
135
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
136
	normalReturn := false
137
	recovered := false
138

139
	// use double-defer to distinguish panic from runtime.Goexit,
140
	// more details see https://golang.org/cl/134395
141
	defer func() {
142
		// the given function invoked runtime.Goexit
143
		if !normalReturn && !recovered {
144
			c.err = errGoexit
145
		}
146

147
		g.mu.Lock()
148
		defer g.mu.Unlock()
149
		c.wg.Done()
150
		if g.m[key] == c {
151
			delete(g.m, key)
152
		}
153

154
		if e, ok := c.err.(*panicError); ok {
155
			// In order to prevent the waiting channels from being blocked forever,
156
			// needs to ensure that this panic cannot be recovered.
157
			if len(c.chans) > 0 {
158
				go panic(e)
159
				select {} // Keep this goroutine around so that it will appear in the crash dump.
160
			} else {
161
				panic(e)
162
			}
163
		} else if c.err == errGoexit {
164
			// Already in the process of goexit, no need to call again
165
		} else {
166
			// Normal return
167
			for _, ch := range c.chans {
168
				ch <- Result{c.val, c.err, c.dups > 0}
169
			}
170
		}
171
	}()
172

173
	func() {
174
		defer func() {
175
			if !normalReturn {
176
				// Ideally, we would wait to take a stack trace until we've determined
177
				// whether this is a panic or a runtime.Goexit.
178
				//
179
				// Unfortunately, the only way we can distinguish the two is to see
180
				// whether the recover stopped the goroutine from terminating, and by
181
				// the time we know that, the part of the stack trace relevant to the
182
				// panic has been discarded.
183
				if r := recover(); r != nil {
184
					c.err = newPanicError(r)
185
				}
186
			}
187
		}()
188

189
		c.val, c.err = fn()
190
		normalReturn = true
191
	}()
192

193
	if !normalReturn {
194
		recovered = true
195
	}
196
}
197

198
// Forget tells the singleflight to forget about a key.  Future calls
199
// to Do for this key will call the function rather than waiting for
200
// an earlier call to complete.
201
func (g *Group) Forget(key string) {
202
	g.mu.Lock()
203
	delete(g.m, key)
204
	g.mu.Unlock()
205
}
206

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.