kuma

Форк
0
/
components.go 
286 строк · 10.2 Кб
1
package global
2

3
import (
4
	"context"
5
	"fmt"
6
	"strings"
7
	"time"
8

9
	"github.com/go-logr/logr"
10
	"github.com/pkg/errors"
11

12
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
13
	system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
14
	config_core "github.com/kumahq/kuma/pkg/config/core"
15
	store_config "github.com/kumahq/kuma/pkg/config/core/resources/store"
16
	"github.com/kumahq/kuma/pkg/core"
17
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
18
	"github.com/kumahq/kuma/pkg/core/resources/apis/system"
19
	core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
20
	"github.com/kumahq/kuma/pkg/core/resources/model"
21
	"github.com/kumahq/kuma/pkg/core/resources/registry"
22
	"github.com/kumahq/kuma/pkg/core/resources/store"
23
	"github.com/kumahq/kuma/pkg/core/runtime"
24
	"github.com/kumahq/kuma/pkg/core/runtime/component"
25
	"github.com/kumahq/kuma/pkg/core/user"
26
	"github.com/kumahq/kuma/pkg/kds/client"
27
	"github.com/kumahq/kuma/pkg/kds/mux"
28
	kds_server "github.com/kumahq/kuma/pkg/kds/server"
29
	"github.com/kumahq/kuma/pkg/kds/service"
30
	sync_store "github.com/kumahq/kuma/pkg/kds/store"
31
	"github.com/kumahq/kuma/pkg/kds/util"
32
	kds_client_v2 "github.com/kumahq/kuma/pkg/kds/v2/client"
33
	kds_server_v2 "github.com/kumahq/kuma/pkg/kds/v2/server"
34
	kds_sync_store_v2 "github.com/kumahq/kuma/pkg/kds/v2/store"
35
	kuma_log "github.com/kumahq/kuma/pkg/log"
36
	resources_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s"
37
	k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model"
38
	util_proto "github.com/kumahq/kuma/pkg/util/proto"
39
)
40

41
var (
42
	kdsGlobalLog      = core.Log.WithName("kds-global")
43
	kdsDeltaGlobalLog = core.Log.WithName("kds-delta-global")
44
)
45

46
func Setup(rt runtime.Runtime) error {
47
	if rt.Config().Mode != config_core.Global {
48
		// Only run on global
49
		return nil
50
	}
51
	reg := registry.Global()
52
	kdsServer, err := kds_server.New(
53
		kdsGlobalLog,
54
		rt,
55
		reg.ObjectTypes(model.HasKDSFlag(model.GlobalToZoneSelector)),
56
		"global",
57
		rt.Config().Multizone.Global.KDS.RefreshInterval.Duration,
58
		rt.KDSContext().GlobalProvidedFilter,
59
		rt.KDSContext().GlobalResourceMapper,
60
		rt.Config().Multizone.Global.KDS.NackBackoff.Duration,
61
	)
62
	if err != nil {
63
		return err
64
	}
65

66
	kdsServerV2, err := kds_server_v2.New(
67
		kdsDeltaGlobalLog,
68
		rt,
69
		reg.ObjectTypes(model.HasKDSFlag(model.GlobalToZoneSelector)),
70
		"global",
71
		rt.Config().Multizone.Global.KDS.RefreshInterval.Duration,
72
		rt.KDSContext().GlobalProvidedFilter,
73
		rt.KDSContext().GlobalResourceMapper,
74
		rt.Config().Multizone.Global.KDS.NackBackoff.Duration,
75
	)
76
	if err != nil {
77
		return err
78
	}
79

80
	resourceSyncer := sync_store.NewResourceSyncer(kdsGlobalLog, rt.ResourceStore())
81
	resourceSyncerV2, err := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore(), rt.Transactions(), rt.Metrics(), rt.Extensions())
82
	if err != nil {
83
		return err
84
	}
85
	kubeFactory := resources_k8s.NewSimpleKubeFactory()
86
	onSessionStarted := mux.OnSessionStartedFunc(func(session mux.Session) error {
87
		log := kdsGlobalLog.WithValues("peer-id", session.PeerID())
88
		log = kuma_log.AddFieldsFromCtx(log, session.ClientStream().Context(), rt.Extensions())
89
		log.Info("new session created")
90
		go func() {
91
			if err := kdsServer.StreamKumaResources(session.ServerStream()); err != nil {
92
				log.Error(err, "StreamKumaResources finished with an error")
93
				session.SetError(err)
94
			} else {
95
				log.V(1).Info("StreamKumaResources finished gracefully")
96
			}
97
		}()
98
		kdsStream := client.NewKDSStream(session.ClientStream(), session.PeerID(), "") // we only care about Zone CP config. Zone CP should not receive Global CP config.
99
		if err := createZoneIfAbsent(session.ClientStream().Context(), log, session.PeerID(), rt.ResourceManager()); err != nil {
100
			log.Error(err, "Global CP could not create a zone")
101
			return errors.New("Global CP could not create a zone") // send back message without details. Zone CP will retry
102
		}
103
		sink := client.NewKDSSink(log, reg.ObjectTypes(model.HasKDSFlag(model.ZoneToGlobalFlag)), kdsStream, Callbacks(resourceSyncer, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory))
104
		go func() {
105
			if err := sink.Receive(); err != nil {
106
				log.Error(err, "KDSSink finished with an error")
107
				session.SetError(err)
108
			} else {
109
				log.V(1).Info("KDSSink finished gracefully")
110
			}
111
		}()
112
		return nil
113
	})
114

115
	onGlobalToZoneSyncConnect := mux.OnGlobalToZoneSyncConnectFunc(func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer, errChan chan error) {
116
		zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
117
		if err != nil {
118
			errChan <- err
119
		}
120
		log := kdsDeltaGlobalLog.WithValues("peer-id", zoneID)
121
		log = kuma_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
122
		log.Info("Global To Zone new session created")
123
		if err := createZoneIfAbsent(stream.Context(), log, zoneID, rt.ResourceManager()); err != nil {
124
			errChan <- errors.Wrap(err, "Global CP could not create a zone")
125
		}
126
		if err := kdsServerV2.GlobalToZoneSync(stream); err != nil {
127
			errChan <- err
128
		} else {
129
			log.V(1).Info("GlobalToZoneSync finished gracefully")
130
		}
131
	})
132

133
	onZoneToGlobalSyncConnect := mux.OnZoneToGlobalSyncConnectFunc(func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer, errChan chan error) {
134
		zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
135
		if err != nil {
136
			errChan <- err
137
		}
138
		log := kdsDeltaGlobalLog.WithValues("peer-id", zoneID)
139
		log = kuma_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
140
		kdsStream := kds_client_v2.NewDeltaKDSStream(stream, zoneID, rt, "")
141
		sink := kds_client_v2.NewKDSSyncClient(
142
			log,
143
			reg.ObjectTypes(model.HasKDSFlag(model.ZoneToGlobalFlag)),
144
			kdsStream,
145
			kds_sync_store_v2.GlobalSyncCallback(stream.Context(), resourceSyncerV2, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace),
146
			rt.Config().Multizone.Global.KDS.ResponseBackoff.Duration,
147
		)
148
		go func() {
149
			if err := sink.Receive(); err != nil {
150
				errChan <- errors.Wrap(err, "KDSSyncClient finished with an error")
151
			} else {
152
				log.V(1).Info("KDSSyncClient finished gracefully")
153
			}
154
		}()
155
	})
156
	var streamInterceptors []service.StreamInterceptor
157
	for _, filter := range rt.KDSContext().GlobalServerFiltersV2 {
158
		streamInterceptors = append(streamInterceptors, filter)
159
	}
160

161
	if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
162
		zwLog := kdsGlobalLog.WithName("zone-watch")
163
		zw, err := mux.NewZoneWatch(
164
			zwLog,
165
			rt.Config().Multizone.Global.KDS.ZoneHealthCheck,
166
			rt.Metrics(),
167
			rt.EventBus(),
168
			rt.ReadOnlyResourceManager(),
169
			rt.Extensions(),
170
		)
171
		if err != nil {
172
			return errors.Wrap(err, "couldn't create ZoneWatch")
173
		}
174
		if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
175
			return err
176
		}
177
	}
178
	return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer(
179
		onSessionStarted,
180
		rt.KDSContext().GlobalServerFilters,
181
		rt.KDSContext().ServerStreamInterceptors,
182
		rt.KDSContext().ServerUnaryInterceptor,
183
		*rt.Config().Multizone.Global.KDS,
184
		rt.Metrics(),
185
		service.NewGlobalKDSServiceServer(
186
			rt.AppContext(),
187
			rt.KDSContext().EnvoyAdminRPCs,
188
			rt.ResourceManager(),
189
			rt.GetInstanceId(),
190
			streamInterceptors,
191
			rt.Extensions(),
192
			rt.Config().Store.Upsert,
193
			rt.EventBus(),
194
			rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration,
195
		),
196
		mux.NewKDSSyncServiceServer(
197
			rt.AppContext(),
198
			onGlobalToZoneSyncConnect,
199
			onZoneToGlobalSyncConnect,
200
			rt.KDSContext().GlobalServerFiltersV2,
201
			rt.Extensions(),
202
			rt.EventBus(),
203
		),
204
	)))
205
}
206

207
func createZoneIfAbsent(ctx context.Context, log logr.Logger, name string, resManager core_manager.ResourceManager) error {
208
	ctx = user.Ctx(ctx, user.ControlPlane)
209
	if err := resManager.Get(ctx, system.NewZoneResource(), store.GetByKey(name, model.NoMesh)); err != nil {
210
		if !store.IsResourceNotFound(err) {
211
			return err
212
		}
213
		log.Info("creating Zone", "name", name)
214
		zone := &system.ZoneResource{
215
			Spec: &system_proto.Zone{
216
				Enabled: util_proto.Bool(true),
217
			},
218
		}
219
		if err := resManager.Create(ctx, zone, store.CreateByKey(name, model.NoMesh)); err != nil {
220
			return err
221
		}
222
	}
223
	return nil
224
}
225

226
func Callbacks(s sync_store.ResourceSyncer, k8sStore bool, kubeFactory resources_k8s.KubeFactory) *client.Callbacks {
227
	return &client.Callbacks{
228
		OnResourcesReceived: func(clusterName string, rs model.ResourceList) error {
229
			supportsHashSuffixes := !isOldZone(rs)
230

231
			if !supportsHashSuffixes {
232
				// todo: remove in 2 releases after 2.6.x
233
				util.AddPrefixToNames(rs.GetItems(), clusterName)
234
			}
235

236
			for _, r := range rs.GetItems() {
237
				r.SetMeta(util.CloneResourceMeta(r.GetMeta(),
238
					util.WithLabel(mesh_proto.ZoneTag, clusterName),
239
					util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.ZoneResourceOrigin)),
240
				))
241
			}
242

243
			if k8sStore {
244
				// if type of Store is Kubernetes then we want to store upstream resources in dedicated Namespace.
245
				// KubernetesStore parses Name and considers substring after the last dot as a Namespace's Name.
246
				kubeObject, err := kubeFactory.NewObject(rs.NewItem())
247
				if err != nil {
248
					return errors.Wrap(err, "could not convert object")
249
				}
250
				if kubeObject.Scope() == k8s_model.ScopeNamespace {
251
					util.AddSuffixToNames(rs.GetItems(), "default")
252
				}
253
			}
254

255
			if rs.GetItemType() == core_mesh.ZoneIngressType {
256
				for _, zi := range rs.(*core_mesh.ZoneIngressResourceList).Items {
257
					zi.Spec.Zone = clusterName
258
				}
259
			} else if rs.GetItemType() == core_mesh.ZoneEgressType {
260
				for _, ze := range rs.(*core_mesh.ZoneEgressResourceList).Items {
261
					ze.Spec.Zone = clusterName
262
				}
263
			}
264

265
			return s.Sync(rs, sync_store.PrefilterBy(func(r model.Resource) bool {
266
				if !supportsHashSuffixes {
267
					// todo: remove in 2 releases after 2.6.x
268
					return strings.HasPrefix(r.GetMeta().GetName(), fmt.Sprintf("%s.", clusterName))
269
				}
270
				return r.GetMeta().GetLabels()[mesh_proto.ZoneTag] == clusterName
271
			}), sync_store.Zone(clusterName))
272
		},
273
	}
274
}
275

276
// isOldZone checks if zone is running not the latest version of Kuma CP and doesn't support hash suffixes
277
func isOldZone(rs model.ResourceList) bool {
278
	if len(rs.GetItems()) == 0 {
279
		// if there are no resources it doesn't matter if it's old or new Zone
280
		return false
281
	}
282

283
	r := rs.GetItems()[0]
284
	_, exist := r.GetMeta().GetLabels()[mesh_proto.ZoneTag]
285
	return !exist
286
}
287

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.