istio
371 строка · 13.1 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package helmreconciler
16
17import (
18"context"
19"fmt"
20"sort"
21"strings"
22"time"
23
24appsv1 "k8s.io/api/apps/v1"
25corev1 "k8s.io/api/core/v1"
26apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
27metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28"k8s.io/apimachinery/pkg/labels"
29"k8s.io/apimachinery/pkg/util/wait"
30"k8s.io/client-go/kubernetes"
31kctldeployment "k8s.io/kubectl/pkg/util/deployment"
32
33"istio.io/istio/operator/pkg/name"
34"istio.io/istio/operator/pkg/object"
35"istio.io/istio/operator/pkg/util/progress"
36"istio.io/istio/pkg/kube"
37)
38
39const (
40// defaultWaitResourceTimeout is the maximum wait time for all resources(namespace/deployment/pod) to be created.
41defaultWaitResourceTimeout = 300 * time.Second
42// cRDPollInterval is how often the state of CRDs is polled when waiting for their creation.
43cRDPollInterval = 500 * time.Millisecond
44// cRDPollTimeout is the maximum wait time for all CRDs to be created.
45cRDPollTimeout = 60 * time.Second
46)
47
48// deployment holds associated replicaSets for a deployment
49type deployment struct {
50replicaSets *appsv1.ReplicaSet
51deployment *appsv1.Deployment
52}
53
54// WaitForResources polls to get the current status of all pods, PVCs, and Services
55// until all are ready or a timeout is reached
56func WaitForResources(objects object.K8sObjects, client kube.Client,
57waitTimeout time.Duration, dryRun bool, l *progress.ManifestLog,
58) error {
59if dryRun || TestMode {
60return nil
61}
62
63if err := waitForCRDs(objects, client); err != nil {
64return err
65}
66
67var notReady []string
68var debugInfo map[string]string
69
70// Check if we are ready immediately, to avoid the 2s delay below when we are already ready
71if ready, _, _, err := waitForResources(objects, client.Kube(), l); err == nil && ready {
72return nil
73}
74
75errPoll := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, waitTimeout, false, func(context.Context) (bool, error) {
76isReady, notReadyObjects, debugInfoObjects, err := waitForResources(objects, client.Kube(), l)
77notReady = notReadyObjects
78debugInfo = debugInfoObjects
79return isReady, err
80})
81
82messages := []string{}
83for _, id := range notReady {
84debug, f := debugInfo[id]
85if f {
86messages = append(messages, fmt.Sprintf(" %s (%s)", id, debug))
87} else {
88messages = append(messages, fmt.Sprintf(" %s", debug))
89}
90}
91if errPoll != nil {
92msg := fmt.Sprintf("resources not ready after %v: %v\n%s", waitTimeout, errPoll, strings.Join(messages, "\n"))
93return fmt.Errorf(msg)
94}
95return nil
96}
97
98func waitForResources(objects object.K8sObjects, cs kubernetes.Interface, l *progress.ManifestLog) (bool, []string, map[string]string, error) {
99pods := []corev1.Pod{}
100deployments := []deployment{}
101daemonsets := []*appsv1.DaemonSet{}
102statefulsets := []*appsv1.StatefulSet{}
103namespaces := []corev1.Namespace{}
104
105for _, o := range objects {
106kind := o.GroupVersionKind().Kind
107switch kind {
108case name.NamespaceStr:
109namespace, err := cs.CoreV1().Namespaces().Get(context.TODO(), o.Name, metav1.GetOptions{})
110if err != nil {
111return false, nil, nil, err
112}
113namespaces = append(namespaces, *namespace)
114case name.DeploymentStr:
115currentDeployment, err := cs.AppsV1().Deployments(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
116if err != nil {
117return false, nil, nil, err
118}
119_, _, newReplicaSet, err := kctldeployment.GetAllReplicaSets(currentDeployment, cs.AppsV1())
120if err != nil || newReplicaSet == nil {
121return false, nil, nil, err
122}
123newDeployment := deployment{
124newReplicaSet,
125currentDeployment,
126}
127deployments = append(deployments, newDeployment)
128case name.DaemonSetStr:
129ds, err := cs.AppsV1().DaemonSets(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
130if err != nil {
131return false, nil, nil, err
132}
133daemonsets = append(daemonsets, ds)
134case name.StatefulSetStr:
135sts, err := cs.AppsV1().StatefulSets(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{})
136if err != nil {
137return false, nil, nil, err
138}
139statefulsets = append(statefulsets, sts)
140}
141}
142
143resourceDebugInfo := map[string]string{}
144dr, dnr := deploymentsReady(cs, deployments, resourceDebugInfo)
145dsr, dsnr := daemonsetsReady(daemonsets)
146stsr, stsnr := statefulsetsReady(statefulsets)
147nsr, nnr := namespacesReady(namespaces)
148pr, pnr := podsReady(pods)
149isReady := dr && nsr && dsr && stsr && pr
150notReady := append(append(append(append(nnr, dnr...), pnr...), dsnr...), stsnr...)
151if !isReady {
152l.ReportWaiting(notReady)
153}
154return isReady, notReady, resourceDebugInfo, nil
155}
156
157func waitForCRDs(objects object.K8sObjects, client kube.Client) error {
158var crdNames []string
159for _, o := range object.KindObjects(objects, name.CRDStr) {
160crdNames = append(crdNames, o.Name)
161}
162if len(crdNames) == 0 {
163return nil
164}
165
166errPoll := wait.PollUntilContextTimeout(context.Background(), cRDPollInterval, cRDPollTimeout, false, func(context.Context) (bool, error) {
167descriptor:
168for _, crdName := range crdNames {
169crd, errGet := client.Ext().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdName, metav1.GetOptions{})
170if errGet != nil {
171return false, errGet
172}
173for _, cond := range crd.Status.Conditions {
174switch cond.Type {
175case apiextensions.Established:
176if cond.Status == apiextensions.ConditionTrue {
177scope.Infof("established CRD %s", crdName)
178continue descriptor
179}
180case apiextensions.NamesAccepted:
181if cond.Status == apiextensions.ConditionFalse {
182scope.Warnf("name conflict for %v: %v", crdName, cond.Reason)
183}
184}
185}
186scope.Infof("missing status condition for %q", crdName)
187return false, nil
188}
189return true, nil
190})
191
192if errPoll != nil {
193scope.Errorf("failed to verify CRD creation; %s", errPoll)
194return fmt.Errorf("failed to verify CRD creation: %s", errPoll)
195}
196
197scope.Info("Finished applying CRDs.")
198return nil
199}
200
201func getPods(client kubernetes.Interface, namespace string, selector labels.Selector) ([]corev1.Pod, error) {
202list, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
203LabelSelector: selector.String(),
204})
205return list.Items, err
206}
207
208func namespacesReady(namespaces []corev1.Namespace) (bool, []string) {
209var notReady []string
210for _, namespace := range namespaces {
211if namespace.Status.Phase != corev1.NamespaceActive {
212notReady = append(notReady, "Namespace/"+namespace.Name)
213}
214}
215return len(notReady) == 0, notReady
216}
217
218func podsReady(pods []corev1.Pod) (bool, []string) {
219var notReady []string
220for _, pod := range pods {
221if !isPodReady(&pod) {
222notReady = append(notReady, "Pod/"+pod.Namespace+"/"+pod.Name)
223}
224}
225return len(notReady) == 0, notReady
226}
227
228func isPodReady(pod *corev1.Pod) bool {
229if len(pod.Status.Conditions) > 0 {
230for _, condition := range pod.Status.Conditions {
231if condition.Type == corev1.PodReady &&
232condition.Status == corev1.ConditionTrue {
233return true
234}
235}
236}
237return false
238}
239
240func deploymentsReady(cs kubernetes.Interface, deployments []deployment, info map[string]string) (bool, []string) {
241var notReady []string
242for _, v := range deployments {
243if v.replicaSets.Status.ReadyReplicas >= *v.deployment.Spec.Replicas {
244// Ready
245continue
246}
247id := "Deployment/" + v.deployment.Namespace + "/" + v.deployment.Name
248notReady = append(notReady, id)
249failure := extractPodFailureReason(cs, v.deployment.Namespace, v.deployment.Spec.Selector)
250if failure != "" {
251info[id] = failure
252}
253}
254return len(notReady) == 0, notReady
255}
256
257func extractPodFailureReason(client kubernetes.Interface, namespace string, selector *metav1.LabelSelector) string {
258sel, err := metav1.LabelSelectorAsSelector(selector)
259if err != nil {
260return fmt.Sprintf("failed to get label selector: %v", err)
261}
262pods, err := getPods(client, namespace, sel)
263if err != nil {
264return fmt.Sprintf("failed to fetch pods: %v", err)
265}
266sort.Slice(pods, func(i, j int) bool {
267return pods[i].CreationTimestamp.After(pods[j].CreationTimestamp.Time)
268})
269for _, pod := range pods {
270for _, cs := range pod.Status.ContainerStatuses {
271if cs.State.Waiting != nil {
272return fmt.Sprintf("container failed to start: %v: %v", cs.State.Waiting.Reason, cs.State.Waiting.Message)
273}
274}
275if c := getCondition(pod.Status.Conditions, corev1.PodReady); c != nil && c.Status == corev1.ConditionFalse {
276return fmt.Sprintf(c.Message)
277}
278}
279return ""
280}
281
282func getCondition(conditions []corev1.PodCondition, condition corev1.PodConditionType) *corev1.PodCondition {
283for _, cond := range conditions {
284if cond.Type == condition {
285return &cond
286}
287}
288return nil
289}
290
291func daemonsetsReady(daemonsets []*appsv1.DaemonSet) (bool, []string) {
292var notReady []string
293for _, ds := range daemonsets {
294// Check if the wanting generation is same as the observed generation
295// Only when the observed generation is the same as the generation,
296// other checks will make sense. If not the same, daemon set is not
297// ready
298if ds.Status.ObservedGeneration != ds.Generation {
299scope.Infof("DaemonSet is not ready: %s/%s. Observed generation: %d expected generation: %d",
300ds.Namespace, ds.Name, ds.Status.ObservedGeneration, ds.Generation)
301notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
302} else {
303// Make sure all the updated pods have been scheduled
304if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType &&
305ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
306scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
307ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
308notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
309}
310if ds.Spec.UpdateStrategy.Type == appsv1.RollingUpdateDaemonSetStrategyType {
311if ds.Status.DesiredNumberScheduled <= 0 {
312// If DesiredNumberScheduled less then or equal 0, there some cases:
313// 1) daemonset is just created
314// 2) daemonset desired no pod
315// 3) somebody changed it manually
316// All the case is not a ready signal
317scope.Infof("DaemonSet is not ready: %s/%s. Initializing, no pods is running",
318ds.Namespace, ds.Name)
319notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
320} else if ds.Status.NumberReady < ds.Status.DesiredNumberScheduled {
321// Make sure every node has a ready pod
322scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready",
323ds.Namespace, ds.Name, ds.Status.NumberReady, ds.Status.UpdatedNumberScheduled)
324notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
325} else if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
326// Make sure all the updated pods have been scheduled
327scope.Infof("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
328ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
329notReady = append(notReady, "DaemonSet/"+ds.Namespace+"/"+ds.Name)
330}
331}
332}
333}
334return len(notReady) == 0, notReady
335}
336
337func statefulsetsReady(statefulsets []*appsv1.StatefulSet) (bool, []string) {
338var notReady []string
339for _, sts := range statefulsets {
340// Make sure all the updated pods have been scheduled
341if sts.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType &&
342sts.Status.UpdatedReplicas != sts.Status.Replicas {
343scope.Infof("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
344sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, sts.Status.Replicas)
345notReady = append(notReady, "StatefulSet/"+sts.Namespace+"/"+sts.Name)
346}
347if sts.Spec.UpdateStrategy.Type == appsv1.RollingUpdateStatefulSetStrategyType {
348// Dereference all the pointers because StatefulSets like them
349var partition int
350// default replicas for sts is 1
351replicas := 1
352// the rollingUpdate field can be nil even if the update strategy is a rolling update.
353if sts.Spec.UpdateStrategy.RollingUpdate != nil &&
354sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
355partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
356}
357if sts.Spec.Replicas != nil {
358replicas = int(*sts.Spec.Replicas)
359}
360expectedReplicas := replicas - partition
361// Make sure all the updated pods have been scheduled
362if int(sts.Status.UpdatedReplicas) != expectedReplicas {
363scope.Infof("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled",
364sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
365notReady = append(notReady, "StatefulSet/"+sts.Namespace+"/"+sts.Name)
366continue
367}
368}
369}
370return len(notReady) == 0, notReady
371}
372