istio

Форк
0
371 строка · 13.1 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package helmreconciler
16

17
import (
18
	"context"
19
	"fmt"
20
	"sort"
21
	"strings"
22
	"time"
23

24
	appsv1 "k8s.io/api/apps/v1"
25
	corev1 "k8s.io/api/core/v1"
26
	apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
27
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
	"k8s.io/apimachinery/pkg/labels"
29
	"k8s.io/apimachinery/pkg/util/wait"
30
	"k8s.io/client-go/kubernetes"
31
	kctldeployment "k8s.io/kubectl/pkg/util/deployment"
32

33
	"istio.io/istio/operator/pkg/name"
34
	"istio.io/istio/operator/pkg/object"
35
	"istio.io/istio/operator/pkg/util/progress"
36
	"istio.io/istio/pkg/kube"
37
)
38

39
const (
40
	// defaultWaitResourceTimeout is the maximum wait time for all resources(namespace/deployment/pod) to be created.
41
	defaultWaitResourceTimeout = 300 * time.Second
42
	// cRDPollInterval is how often the state of CRDs is polled when waiting for their creation.
43
	cRDPollInterval = 500 * time.Millisecond
44
	// cRDPollTimeout is the maximum wait time for all CRDs to be created.
45
	cRDPollTimeout = 60 * time.Second
46
)
47

48
// deployment holds associated replicaSets for a deployment
49
type deployment struct {
50
	replicaSets *appsv1.ReplicaSet
51
	deployment  *appsv1.Deployment
52
}
53

54
// WaitForResources polls to get the current status of all pods, PVCs, and Services
55
// until all are ready or a timeout is reached
56
func WaitForResources(objects object.K8sObjects, client kube.Client,
57
	waitTimeout time.Duration, dryRun bool, l *progress.ManifestLog,
58
) error {
59
	if dryRun || TestMode {
60
		return nil
61
	}
62

63
	if err := waitForCRDs(objects, client); err != nil {
64
		return err
65
	}
66

67
	var notReady []string
68
	var debugInfo map[string]string
69

70
	// Check if we are ready immediately, to avoid the 2s delay below when we are already ready
71
	if ready, _, _, err := waitForResources(objects, client.Kube(), l); err == nil && ready {
72
		return nil
73
	}
74

75
	errPoll := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, waitTimeout, false, func(context.Context) (bool, error) {
76
		isReady, notReadyObjects, debugInfoObjects, err := waitForResources(objects, client.Kube(), l)
77
		notReady = notReadyObjects
78
		debugInfo = debugInfoObjects
79
		return isReady, err
80
	})
81

82
	messages := []string{}
83
	for _, id := range notReady {
84
		debug, f := debugInfo[id]
85
		if f {
86
			messages = append(messages, fmt.Sprintf("  %s (%s)", id, debug))
87
		} else {
88
			messages = append(messages, fmt.Sprintf("  %s", debug))
89
		}
90
	}
91
	if errPoll != nil {
92
		msg := fmt.Sprintf("resources not ready after %v: %v\n%s", waitTimeout, errPoll, strings.Join(messages, "\n"))
93
		return fmt.Errorf(msg)
94
	}
95
	return nil
96
}
97

98
func waitForResources(objects object.K8sObjects, cs kubernetes.Interface, l *progress.ManifestLog) (bool, []string, map[string]string, error) {
99
	pods := []corev1.Pod{}
100
	deployments := []deployment{}
101
	daemonsets := []*appsv1.DaemonSet{}
102
	statefulsets := []*appsv1.StatefulSet{}
103
	namespaces := []corev1.Namespace{}
104

105
	for _, o := range objects {
106
		kind := o.GroupVersionKind().Kind
107
		switch kind {
108
		case name.NamespaceStr:
109
			namespace, err := cs.CoreV1().Namespaces().Get(context.TODO(), o.Name, metav1.GetOptions{})
110
			if err != nil {
111
				return false, nil, nil, err
112
			}
113
			namespaces = append(namespaces, *namespace)
114
		case name.DeploymentStr:
115
			currentDeployment, err := cs.AppsV1().Deployments(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
116
			if err != nil {
117
				return false, nil, nil, err
118
			}
119
			_, _, newReplicaSet, err := kctldeployment.GetAllReplicaSets(currentDeployment, cs.AppsV1())
120
			if err != nil || newReplicaSet == nil {
121
				return false, nil, nil, err
122
			}
123
			newDeployment := deployment{
124
				newReplicaSet,
125
				currentDeployment,
126
			}
127
			deployments = append(deployments, newDeployment)
128
		case name.DaemonSetStr:
129
			ds, err := cs.AppsV1().DaemonSets(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
130
			if err != nil {
131
				return false, nil, nil, err
132
			}
133
			daemonsets = append(daemonsets, ds)
134
		case name.StatefulSetStr:
135
			sts, err := cs.AppsV1().StatefulSets(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
136
			if err != nil {
137
				return false, nil, nil, err
138
			}
139
			statefulsets = append(statefulsets, sts)
140
		}
141
	}
142

143
	resourceDebugInfo := map[string]string{}
144
	dr, dnr := deploymentsReady(cs, deployments, resourceDebugInfo)
145
	dsr, dsnr := daemonsetsReady(daemonsets)
146
	stsr, stsnr := statefulsetsReady(statefulsets)
147
	nsr, nnr := namespacesReady(namespaces)
148
	pr, pnr := podsReady(pods)
149
	isReady := dr && nsr && dsr && stsr && pr
150
	notReady := append(append(append(append(nnr, dnr...), pnr...), dsnr...), stsnr...)
151
	if !isReady {
152
		l.ReportWaiting(notReady)
153
	}
154
	return isReady, notReady, resourceDebugInfo, nil
155
}
156

157
func waitForCRDs(objects object.K8sObjects, client kube.Client) error {
158
	var crdNames []string
159
	for _, o := range object.KindObjects(objects, name.CRDStr) {
160
		crdNames = append(crdNames, o.Name)
161
	}
162
	if len(crdNames) == 0 {
163
		return nil
164
	}
165

166
	errPoll := wait.PollUntilContextTimeout(context.Background(), cRDPollInterval, cRDPollTimeout, false, func(context.Context) (bool, error) {
167
	descriptor:
168
		for _, crdName := range crdNames {
169
			crd, errGet := client.Ext().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdName, metav1.GetOptions{})
170
			if errGet != nil {
171
				return false, errGet
172
			}
173
			for _, cond := range crd.Status.Conditions {
174
				switch cond.Type {
175
				case apiextensions.Established:
176
					if cond.Status == apiextensions.ConditionTrue {
177
						scope.Infof("established CRD %s", crdName)
178
						continue descriptor
179
					}
180
				case apiextensions.NamesAccepted:
181
					if cond.Status == apiextensions.ConditionFalse {
182
						scope.Warnf("name conflict for %v: %v", crdName, cond.Reason)
183
					}
184
				}
185
			}
186
			scope.Infof("missing status condition for %q", crdName)
187
			return false, nil
188
		}
189
		return true, nil
190
	})
191

192
	if errPoll != nil {
193
		scope.Errorf("failed to verify CRD creation; %s", errPoll)
194
		return fmt.Errorf("failed to verify CRD creation: %s", errPoll)
195
	}
196

197
	scope.Info("Finished applying CRDs.")
198
	return nil
199
}
200

201
func getPods(client kubernetes.Interface, namespace string, selector labels.Selector) ([]corev1.Pod, error) {
202
	list, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
203
		LabelSelector: selector.String(),
204
	})
205
	return list.Items, err
206
}
207

208
func namespacesReady(namespaces []corev1.Namespace) (bool, []string) {
209
	var notReady []string
210
	for _, namespace := range namespaces {
211
		if namespace.Status.Phase != corev1.NamespaceActive {
212
			notReady = append(notReady, "Namespace/"+namespace.Name)
213
		}
214
	}
215
	return len(notReady) == 0, notReady
216
}
217

218
func podsReady(pods []corev1.Pod) (bool, []string) {
219
	var notReady []string
220
	for _, pod := range pods {
221
		if !isPodReady(&pod) {
222
			notReady = append(notReady, "Pod/"+pod.Namespace+"/"+pod.Name)
223
		}
224
	}
225
	return len(notReady) == 0, notReady
226
}
227

228
func isPodReady(pod *corev1.Pod) bool {
229
	if len(pod.Status.Conditions) > 0 {
230
		for _, condition := range pod.Status.Conditions {
231
			if condition.Type == corev1.PodReady &&
232
				condition.Status == corev1.ConditionTrue {
233
				return true
234
			}
235
		}
236
	}
237
	return false
238
}
239

240
func deploymentsReady(cs kubernetes.Interface, deployments []deployment, info map[string]string) (bool, []string) {
241
	var notReady []string
242
	for _, v := range deployments {
243
		if v.replicaSets.Status.ReadyReplicas >= *v.deployment.Spec.Replicas {
244
			// Ready
245
			continue
246
		}
247
		id := "Deployment/" + v.deployment.Namespace + "/" + v.deployment.Name
248
		notReady = append(notReady, id)
249
		failure := extractPodFailureReason(cs, v.deployment.Namespace, v.deployment.Spec.Selector)
250
		if failure != "" {
251
			info[id] = failure
252
		}
253
	}
254
	return len(notReady) == 0, notReady
255
}
256

257
func extractPodFailureReason(client kubernetes.Interface, namespace string, selector *metav1.LabelSelector) string {
258
	sel, err := metav1.LabelSelectorAsSelector(selector)
259
	if err != nil {
260
		return fmt.Sprintf("failed to get label selector: %v", err)
261
	}
262
	pods, err := getPods(client, namespace, sel)
263
	if err != nil {
264
		return fmt.Sprintf("failed to fetch pods: %v", err)
265
	}
266
	sort.Slice(pods, func(i, j int) bool {
267
		return pods[i].CreationTimestamp.After(pods[j].CreationTimestamp.Time)
268
	})
269
	for _, pod := range pods {
270
		for _, cs := range pod.Status.ContainerStatuses {
271
			if cs.State.Waiting != nil {
272
				return fmt.Sprintf("container failed to start: %v: %v", cs.State.Waiting.Reason, cs.State.Waiting.Message)
273
			}
274
		}
275
		if c := getCondition(pod.Status.Conditions, corev1.PodReady); c != nil && c.Status == corev1.ConditionFalse {
276
			return fmt.Sprintf(c.Message)
277
		}
278
	}
279
	return ""
280
}
281

282
func getCondition(conditions []corev1.PodCondition, condition corev1.PodConditionType) *corev1.PodCondition {
283
	for _, cond := range conditions {
284
		if cond.Type == condition {
285
			return &cond
286
		}
287
	}
288
	return nil
289
}
290

291
func daemonsetsReady(daemonsets []*appsv1.DaemonSet) (bool, []string) {
292
	var notReady []string
293
	for _, ds := range daemonsets {
294
		// Check if the wanting generation is same as the observed generation
295
		// Only when the observed generation is the same as the generation,
296
		// other checks will make sense. If not the same, daemon set is not
297
		// ready
298
		if ds.Status.ObservedGeneration != ds.Generation {
299
			scope.Infof("DaemonSet is not ready: %s/%s. Observed generation: %d expected generation: %d",
300
				ds.Namespace, ds.Name, ds.Status.ObservedGeneration, ds.Generation)
301
			notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
302
		} else {
303
			// Make sure all the updated pods have been scheduled
304
			if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType &&
305
				ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
306
				scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
307
					ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
308
				notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
309
			}
310
			if ds.Spec.UpdateStrategy.Type == appsv1.RollingUpdateDaemonSetStrategyType {
311
				if ds.Status.DesiredNumberScheduled <= 0 {
312
					// If DesiredNumberScheduled less then or equal 0, there some cases:
313
					// 1) daemonset is just created
314
					// 2) daemonset desired no pod
315
					// 3) somebody changed it manually
316
					// All the case is not a ready signal
317
					scope.Infof("DaemonSet is not ready: %s/%s. Initializing, no pods is running",
318
						ds.Namespace, ds.Name)
319
					notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
320
				} else if ds.Status.NumberReady < ds.Status.DesiredNumberScheduled {
321
					// Make sure every node has a ready pod
322
					scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready",
323
						ds.Namespace, ds.Name, ds.Status.NumberReady, ds.Status.UpdatedNumberScheduled)
324
					notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
325
				} else if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
326
					// Make sure all the updated pods have been scheduled
327
					scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
328
						ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
329
					notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
330
				}
331
			}
332
		}
333
	}
334
	return len(notReady) == 0, notReady
335
}
336

337
func statefulsetsReady(statefulsets []*appsv1.StatefulSet) (bool, []string) {
338
	var notReady []string
339
	for _, sts := range statefulsets {
340
		// Make sure all the updated pods have been scheduled
341
		if sts.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType &&
342
			sts.Status.UpdatedReplicas != sts.Status.Replicas {
343
			scope.Infof("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
344
				sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, sts.Status.Replicas)
345
			notReady = append(notReady, "StatefulSet/"+sts.Namespace+"/"+sts.Name)
346
		}
347
		if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
348
			// Dereference all the pointers because StatefulSets like them
349
			var partition int
350
			// default replicas for sts is 1
351
			replicas := 1
352
			// the rollingUpdate field can be nil even if the update strategy is a rolling update.
353
			if sts.Spec.UpdateStrategy.RollingUpdate != nil &&
354
				sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
355
				partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
356
			}
357
			if sts.Spec.Replicas != nil {
358
				replicas = int(*sts.Spec.Replicas)
359
			}
360
			expectedReplicas := replicas - partition
361
			// Make sure all the updated pods have been scheduled
362
			if int(sts.Status.UpdatedReplicas) != expectedReplicas {
363
				scope.Infof("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
364
					sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
365
				notReady = append(notReady, "StatefulSet/"+sts.Namespace+"/"+sts.Name)
366
				continue
367
			}
368
		}
369
	}
370
	return len(notReady) == 0, notReady
371
}
372

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.