kubelatte-ce
Форк от sbertech/kubelatte-ce
196 строк · 5.2 Кб
1package kubeapi
2
3import (
4"context"
5"fmt"
6"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
7"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
8"gitverse.ru/synapse/kubelatte/pkg/operator/controllers/clientset"
9v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10"k8s.io/apimachinery/pkg/runtime"
11utilruntime "k8s.io/apimachinery/pkg/util/runtime"
12"k8s.io/client-go/kubernetes"
13clientgoscheme "k8s.io/client-go/kubernetes/scheme"
14"k8s.io/client-go/rest"
15ctrl "sigs.k8s.io/controller-runtime"
16"strings"
17"sync"
18)
19
20type ResourceMeta struct {
21Name string
22Plural string
23Path string
24Namespaced bool
25GroupVersion string
26}
27
28type syncClient struct {
29client *clientset.V1Alpha1Client
30mutex sync.Mutex
31}
32
33var (
34scheme = runtime.NewScheme()
35client = &syncClient{mutex: sync.Mutex{}}
36)
37
38func getResourceFromKubeApi(ctx context.Context, client rest.Interface, resourceMeta *ResourceMeta, resourceName string, namespaceName string) ([]byte, error) {
39request := client.Get().
40Resource(resourceMeta.Plural).
41Name(resourceName).
42Prefix(resourceMeta.Path, resourceMeta.GroupVersion)
43
44if resourceMeta.Namespaced {
45request.Namespace(namespaceName)
46}
47
48res, err := request.Do(ctx).Raw()
49
50return res, err
51}
52
53var GetResource = func(ctx context.Context, resourceMeta *ResourceMeta, resourceName string, namespace string) ([]byte, error) {
54log := logger.FromContext(ctx)
55cliKBLT, err := GetKBLTClient()
56if err != nil {
57log.Errorf("Failed getting KBLT kube client")
58return nil, err
59}
60
61res, err := getResourceFromKubeApi(ctx, cliKBLT.RestClient, resourceMeta, resourceName, namespace)
62if err != nil {
63cliKUBE := GetClient()
64res, err = getResourceFromKubeApi(ctx, cliKUBE.RESTClient(), resourceMeta, resourceName, namespace)
65}
66
67return res, err
68}
69
70var GetKBLTClient = func() (*clientset.V1Alpha1Client, error) {
71client.mutex.Lock()
72defer client.mutex.Unlock()
73
74log := logger.FromContext(context.Background())
75if client.client == nil {
76var err error
77utilruntime.Must(clientgoscheme.AddToScheme(scheme))
78utilruntime.Must(v1alpha1.AddToScheme(scheme))
79//+kubebuilder:scaffold:scheme
80
81configConfs := ctrl.GetConfigOrDie()
82err = v1alpha1.AddToScheme(clientgoscheme.Scheme)
83if err != nil {
84log.Errorf("v1alpha1.AddToScheme error: %s", err.Error())
85return nil, err
86}
87client.client, err = clientset.NewForConfig(configConfs)
88if err != nil {
89return nil, err
90}
91}
92
93return client.client, nil
94}
95
96var GetClient = func() *kubernetes.Clientset {
97log := logger.FromContext(context.Background())
98config := ctrl.GetConfigOrDie()
99config.QPS = 100
100config.Burst = 50
101clt, err := kubernetes.NewForConfig(config)
102if err != nil {
103log.Errorf("GetCoreClient: failed to init kubernetes client: %s", err.Error())
104panic(err.Error())
105}
106
107return clt
108}
109
110var _resourceMetaCache = map[string]ResourceMeta{}
111var _resourcesCache []*v1.APIResourceList
112
113var InvalidateCacheAndTryGetResourceMeta = func(client *kubernetes.Clientset, kind string, version string) *ResourceMeta {
114_resourcesCache = []*v1.APIResourceList{}
115return GetResourceMeta(client, kind, version)
116}
117
118var GetResourceMeta = func(client *kubernetes.Clientset, kind string, version string) *ResourceMeta {
119meta := CheckResourceMeta(client, kind, version)
120return meta
121}
122
123var CheckResourceMeta = func(client *kubernetes.Clientset, kind string, version string) *ResourceMeta {
124log := logger.FromContext(context.Background())
125name := fmt.Sprintf("%s-%s", kind, version)
126result, ok := _resourceMetaCache[name]
127if ok {
128return &result
129}
130
131if len(_resourcesCache) == 0 {
132var err error
133_, _resourcesCache, err = client.ServerGroupsAndResources()
134if err != nil {
135log.Errorf("FactoryController: GetResourceMeta failed: %s", err.Error())
136return nil
137}
138}
139
140path := ""
141for _, resource := range _resourcesCache {
142if strings.ToLower(resource.GroupVersion) == strings.ToLower(version) {
143for _, apiResource := range resource.APIResources {
144if strings.ToLower(apiResource.Kind) == strings.ToLower(kind) {
145if strings.ToLower(resource.GroupVersion) == "v1" {
146path = "api"
147} else {
148path = "apis"
149}
150
151meta := ResourceMeta{
152Name: apiResource.Kind,
153Plural: apiResource.Name,
154Path: path,
155Namespaced: apiResource.Namespaced,
156GroupVersion: resource.GroupVersion,
157}
158_resourceMetaCache[name] = meta
159
160return &meta
161}
162}
163}
164}
165
166return nil
167}
168
169var GetRawResourceFromKubeApi = func(ctx context.Context, client rest.Interface, resourceMeta *ResourceMeta, resourceName string, namespaceName string) ([]byte, error) {
170request := client.Get().
171Resource(resourceMeta.Plural).
172Name(resourceName).
173Prefix(resourceMeta.Path, resourceMeta.GroupVersion)
174
175if resourceMeta.Namespaced {
176request.Namespace(namespaceName)
177}
178
179return request.DoRaw(ctx)
180}
181
182var CreateResourceInKubeApi = func(ctx context.Context, client rest.Interface, resourceMeta *ResourceMeta, jsBody []byte, namespaceName string) ([]byte, error, error) {
183request := client.Post().
184Resource(resourceMeta.Plural).
185Prefix(resourceMeta.Path, resourceMeta.GroupVersion).
186Body(jsBody)
187
188if resourceMeta.Namespaced {
189request.Namespace(namespaceName)
190}
191
192result := request.Do(ctx)
193e := result.Error()
194raw, err := result.Raw()
195return raw, err, e
196}
197