istio
190 строк · 6.4 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package helmreconciler
16
17import (
18"context"
19"fmt"
20"strings"
21
22"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23"sigs.k8s.io/controller-runtime/pkg/client"
24
25"istio.io/istio/operator/pkg/cache"
26"istio.io/istio/operator/pkg/metrics"
27"istio.io/istio/operator/pkg/name"
28"istio.io/istio/operator/pkg/object"
29"istio.io/istio/operator/pkg/util"
30"istio.io/istio/operator/pkg/util/progress"
31)
32
33const fieldOwnerOperator = "istio-operator"
34
35// AppliedResult is the result of applying a Manifest.
36type AppliedResult struct {
37// processedObjects is the list of objects that were processed in this apply operation.
38processedObjects object.K8sObjects
39// deployed is the number of objects have been deployed which means
40// it's in the cache and it's not changed from the cache.
41deployed int
42}
43
44// Succeed returns true if the apply operation succeeded.
45func (r AppliedResult) Succeed() bool {
46return len(r.processedObjects) > 0 || r.deployed > 0
47}
48
49// ApplyManifest applies the manifest to create or update resources. It returns the processed (created or updated)
50// objects and the number of objects in the manifests.
51func (h *HelmReconciler) ApplyManifest(manifest name.Manifest) (result AppliedResult, _ error) {
52cname := string(manifest.Name)
53crHash, err := h.getCRHash(cname)
54if err != nil {
55return result, err
56}
57
58scope.Infof("Processing resources from manifest: %s for CR %s", cname, crHash)
59allObjects, err := object.ParseK8sObjectsFromYAMLManifest(manifest.Content)
60if err != nil {
61return result, err
62}
63
64objectCache := cache.GetCache(crHash)
65
66// Ensure that for a given CR crHash only one control loop uses the per-crHash cache at any time.
67objectCache.Mu.Lock()
68defer objectCache.Mu.Unlock()
69
70// No further locking required beyond this point, since we have a ptr to a cache corresponding to a CR crHash and no
71// other controller is allowed to work on at the same time.
72var changedObjects object.K8sObjects
73var changedObjectKeys []string
74allObjectsMap := make(map[string]bool)
75
76// Check which objects in the manifest have changed from those in the cache.
77for _, obj := range allObjects {
78oh := obj.Hash()
79allObjectsMap[oh] = true
80if co, ok := objectCache.Cache[oh]; ok && obj.Equal(co) {
81// Object is in the cache and unchanged.
82metrics.AddResource(obj.FullName(), obj.GroupVersionKind().GroupKind())
83result.deployed++
84continue
85}
86changedObjects = append(changedObjects, obj)
87changedObjectKeys = append(changedObjectKeys, oh)
88}
89
90var plog *progress.ManifestLog
91if len(changedObjectKeys) > 0 {
92plog = h.opts.ProgressLog.NewComponent(cname)
93scope.Infof("The following objects differ between generated manifest and cache: \n - %s", strings.Join(changedObjectKeys, "\n - "))
94} else {
95scope.Infof("Generated manifest objects are the same as cached for component %s.", cname)
96}
97
98// Objects are applied in groups: namespaces, CRDs, everything else, with wait for ready in between.
99nsObjs := object.KindObjects(changedObjects, name.NamespaceStr)
100crdObjs := object.KindObjects(changedObjects, name.CRDStr)
101otherObjs := object.ObjectsNotInLists(changedObjects, nsObjs, crdObjs)
102for _, objList := range []object.K8sObjects{nsObjs, crdObjs, otherObjs} {
103// For a given group of objects, apply in sorted order of priority with no wait in between.
104objList.Sort(object.DefaultObjectOrder())
105for _, obj := range objList {
106obju := obj.UnstructuredObject()
107if err := h.applyLabelsAndAnnotations(obju, cname); err != nil {
108return result, err
109}
110if err := h.ApplyObject(obj.UnstructuredObject()); err != nil {
111plog.ReportError(err.Error())
112return result, err
113}
114plog.ReportProgress()
115metrics.AddResource(obj.FullName(), obj.GroupVersionKind().GroupKind())
116result.processedObjects = append(result.processedObjects, obj)
117// Update the cache with the latest object.
118objectCache.Cache[obj.Hash()] = obj
119}
120}
121
122// Prune anything not in the manifest out of the cache.
123var removeKeys []string
124for k := range objectCache.Cache {
125if !allObjectsMap[k] {
126removeKeys = append(removeKeys, k)
127}
128}
129for _, k := range removeKeys {
130scope.Infof("Pruning object %s from cache.", k)
131delete(objectCache.Cache, k)
132}
133
134if len(changedObjectKeys) > 0 {
135err := WaitForResources(result.processedObjects, h.kubeClient,
136h.opts.WaitTimeout, h.opts.DryRun, plog)
137if err != nil {
138werr := fmt.Errorf("failed to wait for resource: %v", err)
139plog.ReportError(werr.Error())
140return result, werr
141}
142plog.ReportFinished()
143
144}
145return result, nil
146}
147
148// ApplyObject creates or updates an object in the API server depending on whether it already exists.
149// It mutates obj.
150func (h *HelmReconciler) ApplyObject(obj *unstructured.Unstructured) error {
151if obj.GetKind() == "List" {
152var errs util.Errors
153list, err := obj.ToList()
154if err != nil {
155scope.Errorf("error converting List object: %s", err)
156return err
157}
158for _, item := range list.Items {
159err = h.ApplyObject(&item)
160if err != nil {
161errs = util.AppendErr(errs, err)
162}
163}
164return errs.ToError()
165}
166
167objectStr := fmt.Sprintf("%s/%s/%s", obj.GetKind(), obj.GetNamespace(), obj.GetName())
168
169if scope.DebugEnabled() {
170scope.Debugf("Processing object:\n%s\n\n", util.ToYAML(obj))
171}
172
173if h.opts.DryRun {
174scope.Infof("Not applying object %s because of dry run.", objectStr)
175return nil
176}
177
178return h.serverSideApply(obj)
179}
180
181// use server-side apply, require kubernetes 1.16+
182func (h *HelmReconciler) serverSideApply(obj *unstructured.Unstructured) error {
183objectStr := fmt.Sprintf("%s/%s/%s", obj.GetKind(), obj.GetNamespace(), obj.GetName())
184scope.Infof("using server side apply to update obj: %v", objectStr)
185opts := []client.PatchOption{client.ForceOwnership, client.FieldOwner(fieldOwnerOperator)}
186if err := h.client.Patch(context.TODO(), obj, client.Apply, opts...); err != nil {
187return fmt.Errorf("failed to update resource with server-side apply for obj %v: %v", objectStr, err)
188}
189return nil
190}
191