istio

Форк
0
261 строка · 7.7 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package distribution
16

17
import (
18
	"fmt"
19
	"strings"
20
	"sync"
21
	"time"
22

23
	"google.golang.org/protobuf/types/known/timestamppb"
24
	v1 "k8s.io/api/core/v1"
25
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
	"k8s.io/apimachinery/pkg/labels"
27
	"k8s.io/client-go/dynamic"
28
	"k8s.io/client-go/informers"
29
	"k8s.io/client-go/kubernetes"
30
	"k8s.io/client-go/rest"
31
	"k8s.io/client-go/tools/cache"
32
	"k8s.io/utils/clock"
33

34
	"istio.io/api/meta/v1alpha1"
35
	"istio.io/istio/pilot/pkg/features"
36
	"istio.io/istio/pilot/pkg/model"
37
	"istio.io/istio/pilot/pkg/status"
38
	"istio.io/istio/pkg/config"
39
	"istio.io/istio/pkg/log"
40
)
41

42
var scope = log.RegisterScope("status",
43
	"CRD distribution status debugging")
44

45
type Progress struct {
46
	AckedInstances int
47
	TotalInstances int
48
}
49

50
func (p *Progress) PlusEquals(p2 Progress) {
51
	p.TotalInstances += p2.TotalInstances
52
	p.AckedInstances += p2.AckedInstances
53
}
54

55
type Controller struct {
56
	configStore     model.ConfigStore
57
	mu              sync.RWMutex
58
	CurrentState    map[status.Resource]map[string]Progress
59
	ObservationTime map[string]time.Time
60
	UpdateInterval  time.Duration
61
	dynamicClient   dynamic.Interface
62
	clock           clock.Clock
63
	workers         *status.Controller
64
	StaleInterval   time.Duration
65
	cmInformer      cache.SharedIndexInformer
66
	cmHandle        cache.ResourceEventHandlerRegistration
67
}
68

69
func NewController(restConfig *rest.Config, namespace string, cs model.ConfigStore, m *status.Manager) *Controller {
70
	c := &Controller{
71
		CurrentState:    make(map[status.Resource]map[string]Progress),
72
		ObservationTime: make(map[string]time.Time),
73
		UpdateInterval:  200 * time.Millisecond,
74
		StaleInterval:   time.Minute,
75
		clock:           clock.RealClock{},
76
		configStore:     cs,
77
		workers: m.CreateIstioStatusController(func(status *v1alpha1.IstioStatus, context any) *v1alpha1.IstioStatus {
78
			if status == nil {
79
				return nil
80
			}
81
			distributionState := context.(Progress)
82
			if needsReconcile, desiredStatus := ReconcileStatuses(status, distributionState); needsReconcile {
83
				return desiredStatus
84
			}
85
			return status
86
		}),
87
	}
88

89
	// client-go defaults to 5 QPS, with 10 Boost, which is insufficient for updating status on all the config
90
	// in the mesh.  These values can be configured using environment variables for tuning (see pilot/pkg/features)
91
	restConfig.QPS = float32(features.StatusQPS)
92
	restConfig.Burst = features.StatusBurst
93
	var err error
94
	if c.dynamicClient, err = dynamic.NewForConfig(restConfig); err != nil {
95
		scope.Fatalf("Could not connect to kubernetes: %s", err)
96
	}
97

98
	// configmap informer
99
	i := informers.NewSharedInformerFactoryWithOptions(kubernetes.NewForConfigOrDie(restConfig), 1*time.Minute,
100
		informers.WithNamespace(namespace),
101
		informers.WithTweakListOptions(func(listOptions *metav1.ListOptions) {
102
			listOptions.LabelSelector = labels.Set(map[string]string{labelKey: "true"}).AsSelector().String()
103
		})).
104
		Core().V1().ConfigMaps()
105
	c.cmInformer = i.Informer()
106
	c.cmHandle, _ = c.cmInformer.AddEventHandler(&DistroReportHandler{dc: c})
107

108
	return c
109
}
110

111
func (c *Controller) Start(stop <-chan struct{}) {
112
	scope.Info("Starting status leader controller")
113

114
	// this will list all existing configmaps, as well as updates, right?
115
	go c.cmInformer.Run(stop)
116

117
	//  create Status Writer
118
	t := c.clock.Tick(c.UpdateInterval)
119
	for {
120
		select {
121
		case <-stop:
122
			_ = c.cmInformer.RemoveEventHandler(c.cmHandle)
123
			return
124
		case <-t:
125
			staleReporters := c.writeAllStatus()
126
			if len(staleReporters) > 0 {
127
				c.removeStaleReporters(staleReporters)
128
			}
129
		}
130
	}
131
}
132

133
func (c *Controller) handleReport(d Report) {
134
	defer c.mu.Unlock()
135
	c.mu.Lock()
136
	for resstr := range d.InProgressResources {
137
		res := *status.ResourceFromString(resstr)
138
		if _, ok := c.CurrentState[res]; !ok {
139
			c.CurrentState[res] = make(map[string]Progress)
140
		}
141
		c.CurrentState[res][d.Reporter] = Progress{d.InProgressResources[resstr], d.DataPlaneCount}
142
	}
143
	c.ObservationTime[d.Reporter] = c.clock.Now()
144
}
145

146
func (c *Controller) writeAllStatus() (staleReporters []string) {
147
	defer c.mu.RUnlock()
148
	c.mu.RLock()
149
	for config, fractions := range c.CurrentState {
150
		if !strings.HasSuffix(config.Group, "istio.io") {
151
			// don't try to write status for non-istio types
152
			continue
153
		}
154
		var distributionState Progress
155
		for reporter, w := range fractions {
156
			// check for stale data here
157
			if c.clock.Since(c.ObservationTime[reporter]) > c.StaleInterval {
158
				scope.Warnf("Status reporter %s has not been heard from since %v, deleting report.",
159
					reporter, c.ObservationTime[reporter])
160
				staleReporters = append(staleReporters, reporter)
161
			} else {
162
				distributionState.PlusEquals(w)
163
			}
164
		}
165
		if distributionState.TotalInstances > 0 { // this is necessary when all reports are stale.
166
			c.queueWriteStatus(config, distributionState)
167
		}
168
	}
169
	return
170
}
171

172
func (c *Controller) removeStaleReporters(staleReporters []string) {
173
	defer c.mu.Unlock()
174
	c.mu.Lock()
175
	for key, fractions := range c.CurrentState {
176
		for _, staleReporter := range staleReporters {
177
			delete(fractions, staleReporter)
178
		}
179
		c.CurrentState[key] = fractions
180
	}
181
}
182

183
func (c *Controller) queueWriteStatus(config status.Resource, state Progress) {
184
	c.workers.EnqueueStatusUpdateResource(state, config)
185
}
186

187
func (c *Controller) configDeleted(res config.Config) {
188
	r := status.ResourceFromModelConfig(res)
189
	c.workers.Delete(r)
190
}
191

192
func boolToConditionStatus(b bool) string {
193
	if b {
194
		return "True"
195
	}
196
	return "False"
197
}
198

199
func ReconcileStatuses(current *v1alpha1.IstioStatus, desired Progress) (bool, *v1alpha1.IstioStatus) {
200
	needsReconcile := false
201
	desiredCondition := v1alpha1.IstioCondition{
202
		Type:               "Reconciled",
203
		Status:             boolToConditionStatus(desired.AckedInstances == desired.TotalInstances),
204
		LastProbeTime:      timestamppb.Now(),
205
		LastTransitionTime: timestamppb.Now(),
206
		Message:            fmt.Sprintf("%d/%d proxies up to date.", desired.AckedInstances, desired.TotalInstances),
207
	}
208
	current = current.DeepCopy()
209
	var currentCondition *v1alpha1.IstioCondition
210
	conditionIndex := -1
211
	for i, c := range current.Conditions {
212
		if c.Type == "Reconciled" {
213
			currentCondition = current.Conditions[i]
214
			conditionIndex = i
215
			break
216
		}
217
	}
218
	if currentCondition == nil ||
219
		currentCondition.Message != desiredCondition.Message ||
220
		currentCondition.Status != desiredCondition.Status {
221
		needsReconcile = true
222
	}
223
	if conditionIndex > -1 {
224
		current.Conditions[conditionIndex] = &desiredCondition
225
	} else {
226
		current.Conditions = append(current.Conditions, &desiredCondition)
227
	}
228
	return needsReconcile, current
229
}
230

231
type DistroReportHandler struct {
232
	dc *Controller
233
}
234

235
func (drh *DistroReportHandler) OnAdd(obj any, _ bool) {
236
	drh.HandleNew(obj)
237
}
238

239
func (drh *DistroReportHandler) OnUpdate(oldObj, newObj any) {
240
	drh.HandleNew(newObj)
241
}
242

243
func (drh *DistroReportHandler) HandleNew(obj any) {
244
	cm, ok := obj.(*v1.ConfigMap)
245
	if !ok {
246
		scope.Warnf("expected configmap, but received %v, discarding", obj)
247
		return
248
	}
249
	rptStr := cm.Data[dataField]
250
	scope.Debugf("using report: %s", rptStr)
251
	dr, err := ReportFromYaml([]byte(cm.Data[dataField]))
252
	if err != nil {
253
		scope.Warnf("received malformed distributionReport %s, discarding: %v", cm.Name, err)
254
		return
255
	}
256
	drh.dc.handleReport(dr)
257
}
258

259
func (drh *DistroReportHandler) OnDelete(obj any) {
260
	// TODO: what do we do here?  will these ever be deleted?
261
}
262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.