oceanbase

Форк
0
/
observer_handler.go 
314 строк · 10.2 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
package server
14

15
import (
16
	"context"
17
	"encoding/json"
18
	"fmt"
19
	"strconv"
20
	"sync"
21

22
	"github.com/gin-gonic/gin"
23
	"github.com/pkg/errors"
24
	log "github.com/sirupsen/logrus"
25

26
	"github.com/oceanbase/configserver/ent"
27
	"github.com/oceanbase/configserver/ent/obcluster"
28
	"github.com/oceanbase/configserver/model"
29
)
30

31
var obRootServiceGetOnce sync.Once
32
var obRootServiceGetFunc func(*gin.Context)
33
var obRootServicePostOnce sync.Once
34
var obRootServicePostFunc func(*gin.Context)
35
var obRootServiceDeleteOnce sync.Once
36
var obRootServiceDeleteFunc func(*gin.Context)
37
var obIdcRegionInfoOnce sync.Once
38
var obIdcRegionInfoFunc func(*gin.Context)
39

40
func getObIdcRegionInfoFunc() func(c *gin.Context) {
41
	obIdcRegionInfoOnce.Do(func() {
42
		obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
43

44
	})
45
	return obIdcRegionInfoFunc
46
}
47

48
func getObRootServiceGetFunc() func(*gin.Context) {
49
	obRootServiceGetOnce.Do(func() {
50
		obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
51
	})
52
	return obRootServiceGetFunc
53
}
54

55
func getObRootServicePostFunc() func(*gin.Context) {
56
	obRootServicePostOnce.Do(func() {
57
		obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
58
	})
59
	return obRootServicePostFunc
60
}
61

62
func getObRootServiceDeleteFunc() func(*gin.Context) {
63
	obRootServiceDeleteOnce.Do(func() {
64
		obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
65
	})
66
	return obRootServiceDeleteFunc
67
}
68

69
type RootServiceInfoParam struct {
70
	ObCluster   string
71
	ObClusterId int64
72
	Version     int
73
}
74

75
func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
76
	var err error
77
	name := ""
78
	obCluster, obClusterOk := c.GetQuery("ObCluster")
79
	obRegion, obRegionOk := c.GetQuery("ObRegion")
80
	if obClusterOk {
81
		name = obCluster
82
	}
83
	if obRegionOk {
84
		name = obRegion
85
	}
86

87
	if len(name) == 0 {
88
		return nil, errors.New("no obcluster or obregion")
89
	}
90

91
	obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
92
	obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
93

94
	var clusterId int64
95
	var clusterIdStr string
96
	if obClusterIdOk {
97
		clusterIdStr = obClusterId
98
	}
99
	if obRegionIdOk {
100
		clusterIdStr = obRegionId
101
	}
102
	if clusterIdStr != "" {
103
		clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
104
		if err != nil {
105
			return nil, errors.Wrap(err, "parse ob cluster id")
106
		}
107
	}
108

109
	version := 0
110
	versionStr, versionOk := c.GetQuery("version")
111
	if versionOk {
112
		version, err = strconv.Atoi(versionStr)
113
		if err != nil {
114
			return nil, errors.Wrap(err, "parse version")
115
		}
116
	}
117
	return &RootServiceInfoParam{
118
		ObCluster:   name,
119
		ObClusterId: clusterId,
120
		Version:     version,
121
	}, nil
122
}
123

124
func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
125
	var primaryCluster *model.ObRootServiceInfo
126
	for _, cluster := range clusters {
127
		if primaryCluster == nil {
128
			primaryCluster = cluster
129
		} else {
130
			if primaryCluster.Type != "PRIMARY" {
131
				if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
132
					primaryCluster = cluster
133
				}
134
			} else {
135
				if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
136
					primaryCluster = cluster
137
				}
138
			}
139
		}
140
	}
141
	return primaryCluster
142
}
143

144
func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
145
	// return empty idc list
146
	param, err := getCommonParam(c)
147
	if err != nil {
148
		return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
149
	}
150

151
	rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
152
	if err != nil {
153
		if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
154
			return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
155
		} else {
156
			return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
157
		}
158
	}
159

160
	idcList := make([]*model.IdcRegionInfo, 0, 0)
161
	if param.Version < 2 || param.ObClusterId > 0 {
162
		primaryCluster := selectPrimaryCluster(rootServiceInfoList)
163
		obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
164
			Cluster:        primaryCluster.ObCluster,
165
			ClusterId:      primaryCluster.ObClusterId,
166
			IdcList:        idcList,
167
			ReadonlyRsList: "",
168
		}
169
		return NewSuccessResponse(obClusterIdcRegionInfo)
170
	} else {
171
		obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
172
		for _, cluster := range rootServiceInfoList {
173
			obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
174
				Cluster:        cluster.ObCluster,
175
				ClusterId:      cluster.ObClusterId,
176
				IdcList:        idcList,
177
				ReadonlyRsList: "",
178
			}
179
			obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
180
		}
181
		return NewSuccessResponse(obClusterIdcRegionInfoList)
182
	}
183
}
184

185
func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
186
	var clusters []*ent.ObCluster
187
	var err error
188
	rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
189
	client := GetConfigServer().Client
190

191
	if obClusterId != 0 {
192
		log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
193
		clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
194
	} else {
195
		log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
196
		clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
197
	}
198
	if err != nil {
199
		return nil, errors.Wrap(err, "query ob clusters from db")
200
	}
201
	if len(clusters) == 0 {
202
		return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
203
	}
204
	for _, cluster := range clusters {
205
		var rootServiceInfo model.ObRootServiceInfo
206
		err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
207
		if err != nil {
208
			return nil, errors.Wrap(err, "deserialize root service info")
209
		}
210
		rootServiceInfo.Fill()
211
		rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
212
	}
213
	return rootServiceInfoList, nil
214
}
215

216
func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
217
	var response *ApiResponse
218
	param, err := getCommonParam(c)
219
	if err != nil {
220
		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
221
	}
222
	rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
223
	if err != nil {
224
		if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
225
			return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
226
		} else {
227
			return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
228
		}
229
	}
230

231
	if param.Version < 2 || param.ObClusterId > 0 {
232
		log.WithContext(ctxlog).Infof("return primary ob cluster")
233
		response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
234
	} else {
235
		log.WithContext(ctxlog).Infof("return all ob clusters")
236
		response = NewSuccessResponse(rootServiceInfoList)
237
	}
238
	return response
239
}
240

241
func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
242
	var response *ApiResponse
243
	client := GetConfigServer().Client
244
	obRootServiceInfo := new(model.ObRootServiceInfo)
245
	err := c.ShouldBindJSON(obRootServiceInfo)
246
	if err != nil {
247
		return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
248
	}
249
	obRootServiceInfo.Fill()
250
	param, err := getCommonParam(c)
251
	if err != nil {
252
		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
253
	}
254
	if len(obRootServiceInfo.ObCluster) == 0 {
255
		return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
256
	}
257
	if param.Version > 1 {
258
		if len(obRootServiceInfo.Type) == 0 {
259
			return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
260
		}
261
	}
262

263
	rsBytes, err := json.Marshal(obRootServiceInfo)
264
	if err != nil {
265
		response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
266
	} else {
267
		rootServiceInfoJson := string(rsBytes)
268
		log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
269

270
		err := client.ObCluster.
271
			Create().
272
			SetName(obRootServiceInfo.ObCluster).
273
			SetObClusterID(obRootServiceInfo.ObClusterId).
274
			SetType(obRootServiceInfo.Type).
275
			SetRootserviceJSON(rootServiceInfoJson).
276
			OnConflict().
277
			SetRootserviceJSON(rootServiceInfoJson).
278
			Exec(context.Background())
279
		if err != nil {
280
			response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
281
		} else {
282
			response = NewSuccessResponse("successful")
283
		}
284
	}
285
	return response
286
}
287

288
func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
289
	var response *ApiResponse
290
	client := GetConfigServer().Client
291

292
	param, err := getCommonParam(c)
293
	if err != nil {
294
		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
295
	}
296
	if param.Version < 2 {
297
		response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
298
	} else if param.ObClusterId == 0 {
299
		response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
300
	} else {
301
		affected, err := client.ObCluster.
302
			Delete().
303
			Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
304
			Exec(context.Background())
305
		if err != nil {
306
			response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
307
		} else {
308
			log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
309
			response = NewSuccessResponse("success")
310
		}
311
	}
312
	return response
313

314
}
315

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.