oceanbase
314 строк · 10.2 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13package server
14
15import (
16"context"
17"encoding/json"
18"fmt"
19"strconv"
20"sync"
21
22"github.com/gin-gonic/gin"
23"github.com/pkg/errors"
24log "github.com/sirupsen/logrus"
25
26"github.com/oceanbase/configserver/ent"
27"github.com/oceanbase/configserver/ent/obcluster"
28"github.com/oceanbase/configserver/model"
29)
30
31var obRootServiceGetOnce sync.Once
32var obRootServiceGetFunc func(*gin.Context)
33var obRootServicePostOnce sync.Once
34var obRootServicePostFunc func(*gin.Context)
35var obRootServiceDeleteOnce sync.Once
36var obRootServiceDeleteFunc func(*gin.Context)
37var obIdcRegionInfoOnce sync.Once
38var obIdcRegionInfoFunc func(*gin.Context)
39
40func getObIdcRegionInfoFunc() func(c *gin.Context) {
41obIdcRegionInfoOnce.Do(func() {
42obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
43
44})
45return obIdcRegionInfoFunc
46}
47
48func getObRootServiceGetFunc() func(*gin.Context) {
49obRootServiceGetOnce.Do(func() {
50obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
51})
52return obRootServiceGetFunc
53}
54
55func getObRootServicePostFunc() func(*gin.Context) {
56obRootServicePostOnce.Do(func() {
57obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
58})
59return obRootServicePostFunc
60}
61
62func getObRootServiceDeleteFunc() func(*gin.Context) {
63obRootServiceDeleteOnce.Do(func() {
64obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
65})
66return obRootServiceDeleteFunc
67}
68
69type RootServiceInfoParam struct {
70ObCluster string
71ObClusterId int64
72Version int
73}
74
75func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
76var err error
77name := ""
78obCluster, obClusterOk := c.GetQuery("ObCluster")
79obRegion, obRegionOk := c.GetQuery("ObRegion")
80if obClusterOk {
81name = obCluster
82}
83if obRegionOk {
84name = obRegion
85}
86
87if len(name) == 0 {
88return nil, errors.New("no obcluster or obregion")
89}
90
91obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
92obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
93
94var clusterId int64
95var clusterIdStr string
96if obClusterIdOk {
97clusterIdStr = obClusterId
98}
99if obRegionIdOk {
100clusterIdStr = obRegionId
101}
102if clusterIdStr != "" {
103clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
104if err != nil {
105return nil, errors.Wrap(err, "parse ob cluster id")
106}
107}
108
109version := 0
110versionStr, versionOk := c.GetQuery("version")
111if versionOk {
112version, err = strconv.Atoi(versionStr)
113if err != nil {
114return nil, errors.Wrap(err, "parse version")
115}
116}
117return &RootServiceInfoParam{
118ObCluster: name,
119ObClusterId: clusterId,
120Version: version,
121}, nil
122}
123
124func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
125var primaryCluster *model.ObRootServiceInfo
126for _, cluster := range clusters {
127if primaryCluster == nil {
128primaryCluster = cluster
129} else {
130if primaryCluster.Type != "PRIMARY" {
131if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
132primaryCluster = cluster
133}
134} else {
135if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
136primaryCluster = cluster
137}
138}
139}
140}
141return primaryCluster
142}
143
144func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
145// return empty idc list
146param, err := getCommonParam(c)
147if err != nil {
148return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
149}
150
151rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
152if err != nil {
153if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
154return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
155} else {
156return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
157}
158}
159
160idcList := make([]*model.IdcRegionInfo, 0, 0)
161if param.Version < 2 || param.ObClusterId > 0 {
162primaryCluster := selectPrimaryCluster(rootServiceInfoList)
163obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
164Cluster: primaryCluster.ObCluster,
165ClusterId: primaryCluster.ObClusterId,
166IdcList: idcList,
167ReadonlyRsList: "",
168}
169return NewSuccessResponse(obClusterIdcRegionInfo)
170} else {
171obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
172for _, cluster := range rootServiceInfoList {
173obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
174Cluster: cluster.ObCluster,
175ClusterId: cluster.ObClusterId,
176IdcList: idcList,
177ReadonlyRsList: "",
178}
179obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
180}
181return NewSuccessResponse(obClusterIdcRegionInfoList)
182}
183}
184
185func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
186var clusters []*ent.ObCluster
187var err error
188rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
189client := GetConfigServer().Client
190
191if obClusterId != 0 {
192log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
193clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
194} else {
195log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
196clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
197}
198if err != nil {
199return nil, errors.Wrap(err, "query ob clusters from db")
200}
201if len(clusters) == 0 {
202return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
203}
204for _, cluster := range clusters {
205var rootServiceInfo model.ObRootServiceInfo
206err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
207if err != nil {
208return nil, errors.Wrap(err, "deserialize root service info")
209}
210rootServiceInfo.Fill()
211rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
212}
213return rootServiceInfoList, nil
214}
215
216func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
217var response *ApiResponse
218param, err := getCommonParam(c)
219if err != nil {
220return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
221}
222rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
223if err != nil {
224if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
225return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
226} else {
227return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
228}
229}
230
231if param.Version < 2 || param.ObClusterId > 0 {
232log.WithContext(ctxlog).Infof("return primary ob cluster")
233response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
234} else {
235log.WithContext(ctxlog).Infof("return all ob clusters")
236response = NewSuccessResponse(rootServiceInfoList)
237}
238return response
239}
240
241func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
242var response *ApiResponse
243client := GetConfigServer().Client
244obRootServiceInfo := new(model.ObRootServiceInfo)
245err := c.ShouldBindJSON(obRootServiceInfo)
246if err != nil {
247return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
248}
249obRootServiceInfo.Fill()
250param, err := getCommonParam(c)
251if err != nil {
252return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
253}
254if len(obRootServiceInfo.ObCluster) == 0 {
255return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
256}
257if param.Version > 1 {
258if len(obRootServiceInfo.Type) == 0 {
259return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
260}
261}
262
263rsBytes, err := json.Marshal(obRootServiceInfo)
264if err != nil {
265response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
266} else {
267rootServiceInfoJson := string(rsBytes)
268log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
269
270err := client.ObCluster.
271Create().
272SetName(obRootServiceInfo.ObCluster).
273SetObClusterID(obRootServiceInfo.ObClusterId).
274SetType(obRootServiceInfo.Type).
275SetRootserviceJSON(rootServiceInfoJson).
276OnConflict().
277SetRootserviceJSON(rootServiceInfoJson).
278Exec(context.Background())
279if err != nil {
280response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
281} else {
282response = NewSuccessResponse("successful")
283}
284}
285return response
286}
287
288func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
289var response *ApiResponse
290client := GetConfigServer().Client
291
292param, err := getCommonParam(c)
293if err != nil {
294return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
295}
296if param.Version < 2 {
297response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
298} else if param.ObClusterId == 0 {
299response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
300} else {
301affected, err := client.ObCluster.
302Delete().
303Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
304Exec(context.Background())
305if err != nil {
306response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
307} else {
308log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
309response = NewSuccessResponse("success")
310}
311}
312return response
313
314}
315