istio

Форк
0
346 строк · 11.5 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package distribution
16

17
import (
18
	"context"
19
	"fmt"
20
	"sync"
21
	"time"
22

23
	"gopkg.in/yaml.v2"
24
	corev1 "k8s.io/api/core/v1"
25
	kerrors "k8s.io/apimachinery/pkg/api/errors"
26
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
	"k8s.io/client-go/kubernetes"
28
	v1 "k8s.io/client-go/kubernetes/typed/core/v1"
29
	"k8s.io/utils/clock"
30

31
	"istio.io/istio/pilot/pkg/status"
32
	"istio.io/istio/pilot/pkg/xds"
33
	"istio.io/istio/pkg/config"
34
	"istio.io/istio/pkg/ledger"
35
	"istio.io/istio/pkg/util/sets"
36
)
37

38
func GenStatusReporterMapKey(conID string, distributionType xds.EventType) string {
39
	key := conID + "~" + distributionType
40
	return key
41
}
42

43
type inProgressEntry struct {
44
	// the resource, including resourceVersion, we are currently tracking
45
	status.Resource
46
	// the number of reports we have written with this resource at 100%
47
	completedIterations int
48
}
49

50
type Reporter struct {
51
	mu sync.RWMutex
52
	// map from connection id to latest nonce
53
	status map[string]string
54
	// map from nonce to connection ids for which it is current
55
	// using map[string]struct to approximate a hashset
56
	reverseStatus          map[string]sets.String
57
	inProgressResources    map[string]*inProgressEntry
58
	client                 v1.ConfigMapInterface
59
	cm                     *corev1.ConfigMap
60
	UpdateInterval         time.Duration
61
	PodName                string
62
	clock                  clock.Clock
63
	ledger                 ledger.Ledger
64
	distributionEventQueue chan distributionEvent
65
	controller             *Controller
66
}
67

68
var _ xds.DistributionStatusCache = &Reporter{}
69

70
const (
71
	labelKey  = "internal.istio.io/distribution-report"
72
	dataField = "distribution-report"
73
)
74

75
// Init starts all the read only features of the reporter, used for nonce generation
76
// and responding to istioctl wait.
77
func (r *Reporter) Init(ledger ledger.Ledger, stop <-chan struct{}) {
78
	r.ledger = ledger
79
	if r.clock == nil {
80
		r.clock = clock.RealClock{}
81
	}
82
	r.distributionEventQueue = make(chan distributionEvent, 100_000)
83
	r.status = make(map[string]string)
84
	r.reverseStatus = make(map[string]sets.String)
85
	r.inProgressResources = make(map[string]*inProgressEntry)
86
	go r.readFromEventQueue(stop)
87
}
88

89
// Start starts the reporter, which watches dataplane ack's and resource changes so that it can update status leader
90
// with distribution information.
91
func (r *Reporter) Start(clientSet kubernetes.Interface, namespace string, podname string, stop <-chan struct{}) {
92
	scope.Info("Starting status follower controller")
93
	r.client = clientSet.CoreV1().ConfigMaps(namespace)
94
	r.cm = &corev1.ConfigMap{
95
		ObjectMeta: metav1.ObjectMeta{
96
			Name:   r.PodName + "-distribution",
97
			Labels: map[string]string{labelKey: "true"},
98
		},
99
		Data: make(map[string]string),
100
	}
101
	t := r.clock.Tick(r.UpdateInterval)
102
	ctx := status.NewIstioContext(stop)
103
	x, err := clientSet.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
104
	if err != nil {
105
		scope.Errorf("can't identify pod %s context: %s", podname, err)
106
	} else {
107
		r.cm.OwnerReferences = []metav1.OwnerReference{
108
			*metav1.NewControllerRef(x, corev1.SchemeGroupVersion.WithKind("Pod")),
109
		}
110
	}
111
	go func() {
112
		for {
113
			select {
114
			case <-ctx.Done():
115
				if r.cm != nil {
116
					// TODO: is the use of a cancelled context here a problem?  Maybe set a short timeout context?
117
					if err := r.client.Delete(context.Background(), r.cm.Name, metav1.DeleteOptions{}); err != nil {
118
						scope.Errorf("failed to properly clean up distribution report: %v", err)
119
					}
120
				}
121
				close(r.distributionEventQueue)
122
				return
123
			case <-t:
124
				// TODO, check if report is necessary?  May already be handled by client
125
				r.writeReport(ctx)
126
			}
127
		}
128
	}()
129
}
130

131
// build a distribution report to send to status leader
132
func (r *Reporter) buildReport() (Report, []status.Resource) {
133
	r.mu.RLock()
134
	defer r.mu.RUnlock()
135
	var finishedResources []status.Resource
136
	out := Report{
137
		Reporter:            r.PodName,
138
		DataPlaneCount:      len(r.status),
139
		InProgressResources: map[string]int{},
140
	}
141
	// for every resource in flight
142
	for _, ipr := range r.inProgressResources {
143
		res := ipr.Resource
144
		key := res.String()
145
		// for every version (nonce) of the config currently in play
146
		for nonce, dataplanes := range r.reverseStatus {
147

148
			// check to see if this version of the config contains this version of the resource
149
			// it might be more optimal to provide for a full dump of the config at a certain version?
150
			dpVersion, err := r.ledger.GetPreviousValue(nonce, res.ToModelKey())
151
			if err == nil && dpVersion == res.Generation {
152
				if _, ok := out.InProgressResources[key]; !ok {
153
					out.InProgressResources[key] = len(dataplanes)
154
				} else {
155
					out.InProgressResources[key] += len(dataplanes)
156
				}
157
			} else if err != nil {
158
				scope.Errorf("Encountered error retrieving version %s of key %s from Store: %v", nonce, key, err)
159
				continue
160
			} else if nonce == r.ledger.RootHash() {
161
				scope.Warnf("Cache appears to be missing latest version of %s", key)
162
			}
163
			if out.InProgressResources[key] >= out.DataPlaneCount {
164
				// if this resource is done reconciling, let's not worry about it anymore
165
				finishedResources = append(finishedResources, res)
166
				// deleting it here doesn't work because we have a read lock and are inside an iterator.
167
				// TODO: this will leak when a resource never reaches 100% before it is replaced.
168
				// TODO: do deletes propagate through this thing?
169
			}
170
		}
171
	}
172
	return out, finishedResources
173
}
174

175
// For efficiency, we don't want to be checking on resources that have already reached 100% distribution.
176
// When this happens, we remove them from our watch list.
177
func (r *Reporter) removeCompletedResource(completedResources []status.Resource) {
178
	r.mu.Lock()
179
	defer r.mu.Unlock()
180
	var toDelete []status.Resource
181
	for _, item := range completedResources {
182
		// TODO: handle cache miss
183
		// if cache miss, need to skip current loop, otherwise is will cause errors like
184
		// invalid memory address or nil pointer dereference
185
		if _, ok := r.inProgressResources[item.ToModelKey()]; !ok {
186
			continue
187
		}
188
		total := r.inProgressResources[item.ToModelKey()].completedIterations + 1
189
		if int64(total) > (time.Minute.Milliseconds() / r.UpdateInterval.Milliseconds()) {
190
			// remove from inProgressResources
191
			// TODO: cleanup completedResources
192
			toDelete = append(toDelete, item)
193
		} else {
194
			r.inProgressResources[item.ToModelKey()].completedIterations = total
195
		}
196
	}
197
	for _, resource := range toDelete {
198
		delete(r.inProgressResources, resource.ToModelKey())
199
	}
200
}
201

202
// AddInProgressResource must be called every time a resource change is detected by pilot.  This allows us to lookup
203
// only the resources we expect to be in flight, not the ones that have already distributed
204
func (r *Reporter) AddInProgressResource(res config.Config) {
205
	tryLedgerPut(r.ledger, res)
206
	myRes := status.ResourceFromModelConfig(res)
207
	if myRes == (status.Resource{}) {
208
		scope.Errorf("Unable to locate schema for %v, will not update status.", res)
209
		return
210
	}
211
	r.mu.Lock()
212
	defer r.mu.Unlock()
213
	r.inProgressResources[myRes.ToModelKey()] = &inProgressEntry{
214
		Resource:            myRes,
215
		completedIterations: 0,
216
	}
217
}
218

219
func (r *Reporter) DeleteInProgressResource(res config.Config) {
220
	tryLedgerDelete(r.ledger, res)
221
	if r.controller != nil {
222
		r.controller.configDeleted(res)
223
	}
224
	r.mu.Lock()
225
	defer r.mu.Unlock()
226
	delete(r.inProgressResources, res.Key())
227
}
228

229
// generate a distribution report and write it to a ConfigMap for the leader to read.
230
func (r *Reporter) writeReport(ctx context.Context) {
231
	report, finishedResources := r.buildReport()
232
	go r.removeCompletedResource(finishedResources)
233
	// write to kubernetes here.
234
	reportbytes, err := yaml.Marshal(report)
235
	if err != nil {
236
		scope.Errorf("Error serializing Distribution Report: %v", err)
237
		return
238
	}
239
	r.cm.Data[dataField] = string(reportbytes)
240
	// TODO: short circuit this write in the leader
241
	_, err = CreateOrUpdateConfigMap(ctx, r.cm, r.client)
242
	if err != nil {
243
		scope.Errorf("Error writing Distribution Report: %v", err)
244
	}
245
}
246

247
// CreateOrUpdateConfigMap is lifted with few modifications from kubeadm's apiclient
248
func CreateOrUpdateConfigMap(ctx context.Context, cm *corev1.ConfigMap, client v1.ConfigMapInterface) (res *corev1.ConfigMap, err error) {
249
	if res, err = client.Create(ctx, cm, metav1.CreateOptions{}); err != nil {
250
		if !kerrors.IsAlreadyExists(err) {
251
			scope.Errorf("%v", err)
252
			return nil, fmt.Errorf("unable to create ConfigMap: %w", err)
253
		}
254

255
		if res, err = client.Update(context.TODO(), cm, metav1.UpdateOptions{}); err != nil {
256
			return nil, fmt.Errorf("unable to update ConfigMap: %w", err)
257
		}
258
	}
259
	return res, nil
260
}
261

262
type distributionEvent struct {
263
	conID            string
264
	distributionType xds.EventType
265
	nonce            string
266
}
267

268
func (r *Reporter) QueryLastNonce(conID string, distributionType xds.EventType) (noncePrefix string) {
269
	key := GenStatusReporterMapKey(conID, distributionType)
270
	r.mu.RLock()
271
	defer r.mu.RUnlock()
272
	return r.status[key]
273
}
274

275
// Register that a dataplane has acknowledged a new version of the config.
276
// Theoretically, we could use the ads connections themselves to harvest this data,
277
// but the mutex there is pretty hot, and it seems best to trade memory for time.
278
func (r *Reporter) RegisterEvent(conID string, distributionType xds.EventType, nonce string) {
279
	if nonce == "" {
280
		return
281
	}
282
	// Skip unsupported event types. This ensures we do not leak memory for types
283
	// which may not be handled properly. For example, a type not in AllEventTypes
284
	// will not be properly unregistered.
285
	if _, f := xds.AllTrackingEventTypes[distributionType]; !f {
286
		return
287
	}
288
	d := distributionEvent{nonce: nonce, distributionType: distributionType, conID: conID}
289
	select {
290
	case r.distributionEventQueue <- d:
291
		return
292
	default:
293
		scope.Errorf("Distribution Event Queue overwhelmed, status will be invalid.")
294
	}
295
}
296

297
func (r *Reporter) readFromEventQueue(stop <-chan struct{}) {
298
	for {
299
		select {
300
		case ev := <-r.distributionEventQueue:
301
			// TODO might need to batch this to prevent lock contention
302
			r.processEvent(ev.conID, ev.distributionType, ev.nonce)
303
		case <-stop:
304
			return
305
		}
306
	}
307
}
308

309
func (r *Reporter) processEvent(conID string, distributionType xds.EventType, nonce string) {
310
	r.mu.Lock()
311
	defer r.mu.Unlock()
312
	key := GenStatusReporterMapKey(conID, distributionType)
313
	r.deleteKeyFromReverseMap(key)
314
	var version string
315
	if len(nonce) > 12 {
316
		version = nonce[:xds.VersionLen]
317
	} else {
318
		version = nonce
319
	}
320
	// touch
321
	r.status[key] = version
322
	sets.InsertOrNew(r.reverseStatus, version, key)
323
}
324

325
// This is a helper function for keeping our reverseStatus map in step with status.
326
// must have write lock before calling.
327
func (r *Reporter) deleteKeyFromReverseMap(key string) {
328
	if old, ok := r.status[key]; ok {
329
		sets.DeleteCleanupLast(r.reverseStatus, old, key)
330
	}
331
}
332

333
// RegisterDisconnect : when a dataplane disconnects, we should no longer count it, nor expect it to ack config.
334
func (r *Reporter) RegisterDisconnect(conID string, types sets.Set[xds.EventType]) {
335
	r.mu.Lock()
336
	defer r.mu.Unlock()
337
	for xdsType := range types {
338
		key := GenStatusReporterMapKey(conID, xdsType)
339
		r.deleteKeyFromReverseMap(key)
340
		delete(r.status, key)
341
	}
342
}
343

344
func (r *Reporter) SetController(controller *Controller) {
345
	r.controller = controller
346
}
347

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.