istio

Форк
0
190 строк · 6.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package helmreconciler
16

17
import (
18
	"context"
19
	"fmt"
20
	"strings"
21

22
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23
	"sigs.k8s.io/controller-runtime/pkg/client"
24

25
	"istio.io/istio/operator/pkg/cache"
26
	"istio.io/istio/operator/pkg/metrics"
27
	"istio.io/istio/operator/pkg/name"
28
	"istio.io/istio/operator/pkg/object"
29
	"istio.io/istio/operator/pkg/util"
30
	"istio.io/istio/operator/pkg/util/progress"
31
)
32

33
const fieldOwnerOperator = "istio-operator"
34

35
// AppliedResult is the result of applying a Manifest.
36
type AppliedResult struct {
37
	// processedObjects is the list of objects that were processed in this apply operation.
38
	processedObjects object.K8sObjects
39
	// deployed is the number of objects have been deployed which means
40
	// it's in the cache and it's not changed from the cache.
41
	deployed int
42
}
43

44
// Succeed returns true if the apply operation succeeded.
45
func (r AppliedResult) Succeed() bool {
46
	return len(r.processedObjects) > 0 || r.deployed > 0
47
}
48

49
// ApplyManifest applies the manifest to create or update resources. It returns the processed (created or updated)
50
// objects and the number of objects in the manifests.
51
func (h *HelmReconciler) ApplyManifest(manifest name.Manifest) (result AppliedResult, _ error) {
52
	cname := string(manifest.Name)
53
	crHash, err := h.getCRHash(cname)
54
	if err != nil {
55
		return result, err
56
	}
57

58
	scope.Infof("Processing resources from manifest: %s for CR %s", cname, crHash)
59
	allObjects, err := object.ParseK8sObjectsFromYAMLManifest(manifest.Content)
60
	if err != nil {
61
		return result, err
62
	}
63

64
	objectCache := cache.GetCache(crHash)
65

66
	// Ensure that for a given CR crHash only one control loop uses the per-crHash cache at any time.
67
	objectCache.Mu.Lock()
68
	defer objectCache.Mu.Unlock()
69

70
	// No further locking required beyond this point, since we have a ptr to a cache corresponding to a CR crHash and no
71
	// other controller is allowed to work on at the same time.
72
	var changedObjects object.K8sObjects
73
	var changedObjectKeys []string
74
	allObjectsMap := make(map[string]bool)
75

76
	// Check which objects in the manifest have changed from those in the cache.
77
	for _, obj := range allObjects {
78
		oh := obj.Hash()
79
		allObjectsMap[oh] = true
80
		if co, ok := objectCache.Cache[oh]; ok && obj.Equal(co) {
81
			// Object is in the cache and unchanged.
82
			metrics.AddResource(obj.FullName(), obj.GroupVersionKind().GroupKind())
83
			result.deployed++
84
			continue
85
		}
86
		changedObjects = append(changedObjects, obj)
87
		changedObjectKeys = append(changedObjectKeys, oh)
88
	}
89

90
	var plog *progress.ManifestLog
91
	if len(changedObjectKeys) > 0 {
92
		plog = h.opts.ProgressLog.NewComponent(cname)
93
		scope.Infof("The following objects differ between generated manifest and cache: \n - %s", strings.Join(changedObjectKeys, "\n - "))
94
	} else {
95
		scope.Infof("Generated manifest objects are the same as cached for component %s.", cname)
96
	}
97

98
	// Objects are applied in groups: namespaces, CRDs, everything else, with wait for ready in between.
99
	nsObjs := object.KindObjects(changedObjects, name.NamespaceStr)
100
	crdObjs := object.KindObjects(changedObjects, name.CRDStr)
101
	otherObjs := object.ObjectsNotInLists(changedObjects, nsObjs, crdObjs)
102
	for _, objList := range []object.K8sObjects{nsObjs, crdObjs, otherObjs} {
103
		// For a given group of objects, apply in sorted order of priority with no wait in between.
104
		objList.Sort(object.DefaultObjectOrder())
105
		for _, obj := range objList {
106
			obju := obj.UnstructuredObject()
107
			if err := h.applyLabelsAndAnnotations(obju, cname); err != nil {
108
				return result, err
109
			}
110
			if err := h.ApplyObject(obj.UnstructuredObject()); err != nil {
111
				plog.ReportError(err.Error())
112
				return result, err
113
			}
114
			plog.ReportProgress()
115
			metrics.AddResource(obj.FullName(), obj.GroupVersionKind().GroupKind())
116
			result.processedObjects = append(result.processedObjects, obj)
117
			// Update the cache with the latest object.
118
			objectCache.Cache[obj.Hash()] = obj
119
		}
120
	}
121

122
	// Prune anything not in the manifest out of the cache.
123
	var removeKeys []string
124
	for k := range objectCache.Cache {
125
		if !allObjectsMap[k] {
126
			removeKeys = append(removeKeys, k)
127
		}
128
	}
129
	for _, k := range removeKeys {
130
		scope.Infof("Pruning object %s from cache.", k)
131
		delete(objectCache.Cache, k)
132
	}
133

134
	if len(changedObjectKeys) > 0 {
135
		err := WaitForResources(result.processedObjects, h.kubeClient,
136
			h.opts.WaitTimeout, h.opts.DryRun, plog)
137
		if err != nil {
138
			werr := fmt.Errorf("failed to wait for resource: %v", err)
139
			plog.ReportError(werr.Error())
140
			return result, werr
141
		}
142
		plog.ReportFinished()
143

144
	}
145
	return result, nil
146
}
147

148
// ApplyObject creates or updates an object in the API server depending on whether it already exists.
149
// It mutates obj.
150
func (h *HelmReconciler) ApplyObject(obj *unstructured.Unstructured) error {
151
	if obj.GetKind() == "List" {
152
		var errs util.Errors
153
		list, err := obj.ToList()
154
		if err != nil {
155
			scope.Errorf("error converting List object: %s", err)
156
			return err
157
		}
158
		for _, item := range list.Items {
159
			err = h.ApplyObject(&item)
160
			if err != nil {
161
				errs = util.AppendErr(errs, err)
162
			}
163
		}
164
		return errs.ToError()
165
	}
166

167
	objectStr := fmt.Sprintf("%s/%s/%s", obj.GetKind(), obj.GetNamespace(), obj.GetName())
168

169
	if scope.DebugEnabled() {
170
		scope.Debugf("Processing object:\n%s\n\n", util.ToYAML(obj))
171
	}
172

173
	if h.opts.DryRun {
174
		scope.Infof("Not applying object %s because of dry run.", objectStr)
175
		return nil
176
	}
177

178
	return h.serverSideApply(obj)
179
}
180

181
// use server-side apply, require kubernetes 1.16+
182
func (h *HelmReconciler) serverSideApply(obj *unstructured.Unstructured) error {
183
	objectStr := fmt.Sprintf("%s/%s/%s", obj.GetKind(), obj.GetNamespace(), obj.GetName())
184
	scope.Infof("using server side apply to update obj: %v", objectStr)
185
	opts := []client.PatchOption{client.ForceOwnership, client.FieldOwner(fieldOwnerOperator)}
186
	if err := h.client.Patch(context.TODO(), obj, client.Apply, opts...); err != nil {
187
		return fmt.Errorf("failed to update resource with server-side apply for obj %v: %v", objectStr, err)
188
	}
189
	return nil
190
}
191

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.