cubefs

Форк
0
/
consul_register.go 
201 строка · 4.8 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package exporter
16

17
import (
18
	"bytes"
19
	"encoding/json"
20
	"fmt"
21
	"io"
22
	"net"
23
	"net/http"
24
	"regexp"
25
	"strings"
26
	"time"
27

28
	"github.com/cubefs/cubefs/proto"
29
	"github.com/cubefs/cubefs/util/log"
30
)
31

32
const (
33
	RegisterPeriod = time.Duration(10) * time.Minute
34
	RegisterPath   = "/v1/agent/service/register"
35
)
36

37
/**
38
 * consul register info for prometheus
39
 * optional for user when set prometheus exporter
40
 */
41
type ConsulRegisterInfo struct {
42
	Name    string            `json:"Name"`
43
	ID      string            `json:"ID"`
44
	Address string            `json:"Address"`
45
	Port    int64             `json:"Port"`
46
	Tags    []string          `json:"Tags"`
47
	Meta    map[string]string `json:",omitempty"`
48
}
49

50
// get consul id
51
func GetConsulId(app string, role string, host string, port int64) string {
52
	return fmt.Sprintf("%s_%s_%s_%d", app, role, host, port)
53
}
54

55
// do consul register process
56
func DoConsulRegisterProc(addr, app, role, cluster, meta, host string, port int64) {
57
	if len(addr) <= 0 {
58
		return
59
	}
60
	log.LogInfof("metrics consul register %v %v %v", addr, cluster, port)
61
	ticker := time.NewTicker(RegisterPeriod)
62
	defer func() {
63
		if err := recover(); err != nil {
64
			log.LogErrorf("RegisterConsul panic,err[%v]", err)
65
		}
66
		ticker.Stop()
67
	}()
68

69
	client := &http.Client{}
70
	req := makeRegisterReq(host, addr, app, role, cluster, meta, port)
71
	if req == nil {
72
		log.LogErrorf("make register req error")
73
		return
74
	}
75

76
	if resp, _ := client.Do(req); resp != nil {
77
		io.ReadAll(resp.Body)
78
		resp.Body.Close()
79
	}
80

81
	for range ticker.C {
82
		req := makeRegisterReq(host, addr, app, role, cluster, meta, port)
83
		if req == nil {
84
			log.LogErrorf("make register req error")
85
			return
86
		}
87
		if resp, _ := client.Do(req); resp != nil {
88
			io.ReadAll(resp.Body)
89
			resp.Body.Close()
90
		}
91
	}
92
}
93

94
// GetLocalIpAddr returns the local IP address.
95
func GetLocalIpAddr(filter string) (ipaddr string, err error) {
96
	addrs, err := net.InterfaceAddrs()
97
	if err != nil {
98
		log.LogError("consul register get local ip failed, ", err)
99
		return
100
	}
101
	for _, addr := range addrs {
102
		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
103
			if ipnet.IP.To4() != nil {
104
				ip := ipnet.IP.String()
105

106
				if filter != "" {
107
					match, err := doFilter(filter, ip)
108
					if err != nil {
109
						return "", fmt.Errorf("regex match err, err %s", err.Error())
110
					}
111

112
					if !match {
113
						continue
114
					}
115
				}
116

117
				return ip, nil
118
			}
119
		}
120
	}
121
	return "", fmt.Errorf("cannot get local ip")
122
}
123

124
// use ! tag to represent to do negative filter
125
func doFilter(filter, ip string) (ok bool, err error) {
126
	// negative filter
127
	if strings.HasPrefix(filter, "!") {
128
		filter = filter[1:]
129
		ok, err := regexp.MatchString(filter, ip)
130
		return !ok, err
131
	}
132

133
	ok, err = regexp.MatchString(filter, ip)
134
	return ok, err
135
}
136

137
// make a consul rest request
138
func makeRegisterReq(host, addr, app, role, cluster, meta string, port int64) (req *http.Request) {
139
	id := GetConsulId(app, role, host, port)
140
	url := addr + RegisterPath
141
	cInfo := &ConsulRegisterInfo{
142
		Name:    app,
143
		ID:      id,
144
		Address: host,
145
		Port:    port,
146
		Tags: []string{
147
			"app=" + app,
148
			"role=" + role,
149
			"cluster=" + cluster,
150
		},
151
	}
152

153
	ok, metas := parseMetaStr(meta)
154
	if ok {
155
		cInfo.Meta = metas
156
		cInfo.Meta["cluster"] = cluster
157
		cInfo.Meta["commit"] = proto.CommitID
158
		if len(cInfo.Meta["metric_path"]) == 0 {
159
			cInfo.Meta["metric_path"] = "/metrics"
160
			log.LogInfo("metric_path is empty, use default /metrics")
161
		}
162
	}
163

164
	cInfoBytes, err := json.Marshal(cInfo)
165
	if err != nil {
166
		log.LogErrorf("marshal error, %v", err.Error())
167
		return nil
168
	}
169
	req, err = http.NewRequest(http.MethodPut, url, bytes.NewBuffer(cInfoBytes))
170
	if err != nil {
171
		log.LogErrorf("new request error, %v", err.Error())
172
		return nil
173
	}
174
	req.Header.Set("Content-Type", "application/json; charset=utf-8")
175
	req.Close = true
176

177
	return
178
}
179

180
// parse k1=v1;k2=v2 as a map
181
func parseMetaStr(meta string) (bool, map[string]string) {
182
	if len(meta) == 0 {
183
		log.LogInfo("meta is empty, use default")
184
		meta = "dataset=custom;category=custom;app=cfs;role=fuseclient;metric_path=/metrics"
185
	}
186

187
	m := map[string]string{}
188

189
	kvs := strings.Split(meta, ";")
190
	for _, kv := range kvs {
191
		arr := strings.Split(kv, "=")
192
		if len(arr) != 2 {
193
			log.LogInfof("meta is invalid, can't use %s", meta)
194
			return false, m
195
		}
196

197
		m[arr[0]] = arr[1]
198
	}
199

200
	return true, m
201
}
202

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.