prometheus
331 строка · 9.9 Кб
1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package zookeeper
15
16import (
17"context"
18"encoding/json"
19"errors"
20"fmt"
21"net"
22"strconv"
23"strings"
24"time"
25
26"github.com/go-kit/log"
27"github.com/go-zookeeper/zk"
28"github.com/prometheus/client_golang/prometheus"
29"github.com/prometheus/common/model"
30
31"github.com/prometheus/prometheus/discovery"
32"github.com/prometheus/prometheus/discovery/targetgroup"
33"github.com/prometheus/prometheus/util/strutil"
34"github.com/prometheus/prometheus/util/treecache"
35)
36
37var (
38// DefaultServersetSDConfig is the default Serverset SD configuration.
39DefaultServersetSDConfig = ServersetSDConfig{
40Timeout: model.Duration(10 * time.Second),
41}
42// DefaultNerveSDConfig is the default Nerve SD configuration.
43DefaultNerveSDConfig = NerveSDConfig{
44Timeout: model.Duration(10 * time.Second),
45}
46)
47
48func init() {
49discovery.RegisterConfig(&ServersetSDConfig{})
50discovery.RegisterConfig(&NerveSDConfig{})
51}
52
53// ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery.
54type ServersetSDConfig struct {
55Servers []string `yaml:"servers"`
56Paths []string `yaml:"paths"`
57Timeout model.Duration `yaml:"timeout,omitempty"`
58}
59
60// NewDiscovererMetrics implements discovery.Config.
61func (*ServersetSDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
62return &discovery.NoopDiscovererMetrics{}
63}
64
65// Name returns the name of the Config.
66func (*ServersetSDConfig) Name() string { return "serverset" }
67
68// NewDiscoverer returns a Discoverer for the Config.
69func (c *ServersetSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
70return NewServersetDiscovery(c, opts.Logger)
71}
72
73// UnmarshalYAML implements the yaml.Unmarshaler interface.
74func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
75*c = DefaultServersetSDConfig
76type plain ServersetSDConfig
77err := unmarshal((*plain)(c))
78if err != nil {
79return err
80}
81if len(c.Servers) == 0 {
82return errors.New("serverset SD config must contain at least one Zookeeper server")
83}
84if len(c.Paths) == 0 {
85return errors.New("serverset SD config must contain at least one path")
86}
87for _, path := range c.Paths {
88if !strings.HasPrefix(path, "/") {
89return fmt.Errorf("serverset SD config paths must begin with '/': %s", path)
90}
91}
92return nil
93}
94
95// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery.
96type NerveSDConfig struct {
97Servers []string `yaml:"servers"`
98Paths []string `yaml:"paths"`
99Timeout model.Duration `yaml:"timeout,omitempty"`
100}
101
102// NewDiscovererMetrics implements discovery.Config.
103func (*NerveSDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
104return &discovery.NoopDiscovererMetrics{}
105}
106
107// Name returns the name of the Config.
108func (*NerveSDConfig) Name() string { return "nerve" }
109
110// NewDiscoverer returns a Discoverer for the Config.
111func (c *NerveSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
112return NewNerveDiscovery(c, opts.Logger)
113}
114
115// UnmarshalYAML implements the yaml.Unmarshaler interface.
116func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
117*c = DefaultNerveSDConfig
118type plain NerveSDConfig
119err := unmarshal((*plain)(c))
120if err != nil {
121return err
122}
123if len(c.Servers) == 0 {
124return errors.New("nerve SD config must contain at least one Zookeeper server")
125}
126if len(c.Paths) == 0 {
127return errors.New("nerve SD config must contain at least one path")
128}
129for _, path := range c.Paths {
130if !strings.HasPrefix(path, "/") {
131return fmt.Errorf("nerve SD config paths must begin with '/': %s", path)
132}
133}
134return nil
135}
136
137// Discovery implements the Discoverer interface for discovering
138// targets from Zookeeper.
139type Discovery struct {
140conn *zk.Conn
141
142sources map[string]*targetgroup.Group
143
144updates chan treecache.ZookeeperTreeCacheEvent
145pathUpdates []chan treecache.ZookeeperTreeCacheEvent
146treeCaches []*treecache.ZookeeperTreeCache
147
148parse func(data []byte, path string) (model.LabelSet, error)
149logger log.Logger
150}
151
152// NewNerveDiscovery returns a new Discovery for the given Nerve config.
153func NewNerveDiscovery(conf *NerveSDConfig, logger log.Logger) (*Discovery, error) {
154return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember)
155}
156
157// NewServersetDiscovery returns a new Discovery for the given serverset config.
158func NewServersetDiscovery(conf *ServersetSDConfig, logger log.Logger) (*Discovery, error) {
159return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember)
160}
161
162// NewDiscovery returns a new discovery along Zookeeper parses with
163// the given parse function.
164func NewDiscovery(
165srvs []string,
166timeout time.Duration,
167paths []string,
168logger log.Logger,
169pf func(data []byte, path string) (model.LabelSet, error),
170) (*Discovery, error) {
171if logger == nil {
172logger = log.NewNopLogger()
173}
174
175conn, _, err := zk.Connect(
176srvs, timeout,
177func(c *zk.Conn) {
178c.SetLogger(treecache.NewZookeeperLogger(logger))
179})
180if err != nil {
181return nil, err
182}
183updates := make(chan treecache.ZookeeperTreeCacheEvent)
184sd := &Discovery{
185conn: conn,
186updates: updates,
187sources: map[string]*targetgroup.Group{},
188parse: pf,
189logger: logger,
190}
191for _, path := range paths {
192pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent)
193sd.pathUpdates = append(sd.pathUpdates, pathUpdate)
194sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger))
195}
196return sd, nil
197}
198
199// Run implements the Discoverer interface.
200func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
201defer func() {
202for _, tc := range d.treeCaches {
203tc.Stop()
204}
205for _, pathUpdate := range d.pathUpdates {
206// Drain event channel in case the treecache leaks goroutines otherwise.
207for range pathUpdate {
208}
209}
210d.conn.Close()
211}()
212
213for _, pathUpdate := range d.pathUpdates {
214go func(update chan treecache.ZookeeperTreeCacheEvent) {
215for event := range update {
216select {
217case d.updates <- event:
218case <-ctx.Done():
219return
220}
221}
222}(pathUpdate)
223}
224
225for {
226select {
227case <-ctx.Done():
228return
229case event := <-d.updates:
230tg := &targetgroup.Group{
231Source: event.Path,
232}
233if event.Data != nil {
234labelSet, err := d.parse(*event.Data, event.Path)
235if err == nil {
236tg.Targets = []model.LabelSet{labelSet}
237d.sources[event.Path] = tg
238} else {
239delete(d.sources, event.Path)
240}
241} else {
242delete(d.sources, event.Path)
243}
244select {
245case <-ctx.Done():
246return
247case ch <- []*targetgroup.Group{tg}:
248}
249}
250}
251}
252
253const (
254serversetLabelPrefix = model.MetaLabelPrefix + "serverset_"
255serversetStatusLabel = serversetLabelPrefix + "status"
256serversetPathLabel = serversetLabelPrefix + "path"
257serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint"
258serversetShardLabel = serversetLabelPrefix + "shard"
259)
260
261type serversetMember struct {
262ServiceEndpoint serversetEndpoint
263AdditionalEndpoints map[string]serversetEndpoint
264Status string `json:"status"`
265Shard int `json:"shard"`
266}
267
268type serversetEndpoint struct {
269Host string
270Port int
271}
272
273func parseServersetMember(data []byte, path string) (model.LabelSet, error) {
274member := serversetMember{}
275
276if err := json.Unmarshal(data, &member); err != nil {
277return nil, fmt.Errorf("error unmarshaling serverset member %q: %w", path, err)
278}
279
280labels := model.LabelSet{}
281labels[serversetPathLabel] = model.LabelValue(path)
282labels[model.AddressLabel] = model.LabelValue(
283net.JoinHostPort(member.ServiceEndpoint.Host, strconv.Itoa(member.ServiceEndpoint.Port)))
284
285labels[serversetEndpointLabelPrefix+"_host"] = model.LabelValue(member.ServiceEndpoint.Host)
286labels[serversetEndpointLabelPrefix+"_port"] = model.LabelValue(strconv.Itoa(member.ServiceEndpoint.Port))
287
288for name, endpoint := range member.AdditionalEndpoints {
289cleanName := model.LabelName(strutil.SanitizeLabelName(name))
290labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = model.LabelValue(
291endpoint.Host)
292labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = model.LabelValue(
293strconv.Itoa(endpoint.Port))
294}
295
296labels[serversetStatusLabel] = model.LabelValue(member.Status)
297labels[serversetShardLabel] = model.LabelValue(strconv.Itoa(member.Shard))
298
299return labels, nil
300}
301
302const (
303nerveLabelPrefix = model.MetaLabelPrefix + "nerve_"
304nervePathLabel = nerveLabelPrefix + "path"
305nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint"
306)
307
308type nerveMember struct {
309Host string `json:"host"`
310Port int `json:"port"`
311Name string `json:"name"`
312}
313
314func parseNerveMember(data []byte, path string) (model.LabelSet, error) {
315member := nerveMember{}
316err := json.Unmarshal(data, &member)
317if err != nil {
318return nil, fmt.Errorf("error unmarshaling nerve member %q: %w", path, err)
319}
320
321labels := model.LabelSet{}
322labels[nervePathLabel] = model.LabelValue(path)
323labels[model.AddressLabel] = model.LabelValue(
324net.JoinHostPort(member.Host, strconv.Itoa(member.Port)))
325
326labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host)
327labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(strconv.Itoa(member.Port))
328labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name)
329
330return labels, nil
331}
332