istio

Форк
0
954 строки · 30.9 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package status
16

17
import (
18
	"context"
19
	"crypto/tls"
20
	"encoding/json"
21
	"errors"
22
	"fmt"
23
	"io"
24
	"mime"
25
	"net"
26
	"net/http"
27
	"net/http/pprof"
28
	"os"
29
	"regexp"
30
	"strconv"
31
	"strings"
32
	"sync"
33
	"syscall"
34
	"time"
35

36
	"github.com/prometheus/client_golang/prometheus"
37
	"github.com/prometheus/client_golang/prometheus/collectors"
38
	"github.com/prometheus/common/expfmt"
39
	"golang.org/x/net/http2"
40
	"google.golang.org/grpc"
41
	"google.golang.org/grpc/codes"
42
	"google.golang.org/grpc/credentials/insecure"
43
	grpcHealth "google.golang.org/grpc/health/grpc_health_v1"
44
	grpcStatus "google.golang.org/grpc/status"
45
	"k8s.io/apimachinery/pkg/util/intstr"
46
	k8sUtilIo "k8s.io/utils/io"
47

48
	"istio.io/istio/pilot/cmd/pilot-agent/metrics"
49
	"istio.io/istio/pilot/cmd/pilot-agent/status/grpcready"
50
	"istio.io/istio/pilot/cmd/pilot-agent/status/ready"
51
	"istio.io/istio/pilot/pkg/model"
52
	"istio.io/istio/pkg/config"
53
	dnsProto "istio.io/istio/pkg/dns/proto"
54
	"istio.io/istio/pkg/env"
55
	"istio.io/istio/pkg/kube/apimirror"
56
	"istio.io/istio/pkg/log"
57
	"istio.io/istio/pkg/monitoring"
58
	"istio.io/istio/pkg/network"
59
	"istio.io/istio/pkg/slices"
60
)
61

62
const (
63
	// readyPath is for the pilot agent readiness itself.
64
	readyPath = "/healthz/ready"
65
	// quitPath is to notify the pilot agent to quit.
66
	quitPath  = "/quitquitquit"
67
	drainPath = "/drain"
68
	// KubeAppProberEnvName is the name of the command line flag for pilot agent to pass app prober config.
69
	// The json encoded string to pass app HTTP probe information from injector(istioctl or webhook).
70
	// For example, ISTIO_KUBE_APP_PROBERS='{"/app-health/httpbin/livez":{"httpGet":{"path": "/hello", "port": 8080}}.
71
	// indicates that httpbin container liveness prober port is 8080 and probing path is /hello.
72
	// This environment variable should never be set manually.
73
	KubeAppProberEnvName = "ISTIO_KUBE_APP_PROBERS"
74

75
	localHostIPv4     = "127.0.0.1"
76
	localHostIPv6     = "::1"
77
	maxRespBodyLength = 10 * 1 << 10
78
)
79

80
var (
81
	UpstreamLocalAddressIPv4 = &net.TCPAddr{IP: net.ParseIP("127.0.0.6")}
82
	UpstreamLocalAddressIPv6 = &net.TCPAddr{IP: net.ParseIP("::6")}
83
)
84

85
var PrometheusScrapingConfig = env.Register("ISTIO_PROMETHEUS_ANNOTATIONS", "", "")
86

87
var (
88
	appProberPattern = regexp.MustCompile(`^/app-health/[^/]+/(livez|readyz|startupz)$`)
89

90
	EnableHTTP2Probing = env.Register("ISTIO_ENABLE_HTTP2_PROBING", true,
91
		"If enabled, HTTP2 probes will be enabled for HTTPS probes, following Kubernetes").Get()
92

93
	LegacyLocalhostProbeDestination = env.Register("REWRITE_PROBE_LEGACY_LOCALHOST_DESTINATION", false,
94
		"If enabled, readiness probes will be sent to 'localhost'. Otherwise, they will be sent to the Pod's IP, matching Kubernetes' behavior.")
95

96
	ProbeKeepaliveConnections = env.Register("ENABLE_PROBE_KEEPALIVE_CONNECTIONS", false,
97
		"If enabled, readiness probes will keep the connection from pilot-agent to the application alive. "+
98
			"This mirrors older Istio versions' behaviors, but not kubelet's.").Get()
99
)
100

101
// KubeAppProbers holds the information about a Kubernetes pod prober.
102
// It's a map from the prober URL path to the Kubernetes Prober config.
103
// For example, "/app-health/hello-world/livez" entry contains liveness prober config for
104
// container "hello-world".
105
type KubeAppProbers map[string]*Prober
106

107
// Prober represents a single container prober
108
type Prober struct {
109
	HTTPGet        *apimirror.HTTPGetAction   `json:"httpGet,omitempty"`
110
	TCPSocket      *apimirror.TCPSocketAction `json:"tcpSocket,omitempty"`
111
	GRPC           *apimirror.GRPCAction      `json:"grpc,omitempty"`
112
	TimeoutSeconds int32                      `json:"timeoutSeconds,omitempty"`
113
}
114

115
// Options for the status server.
116
type Options struct {
117
	// Ip of the pod. Note: this is only applicable for Kubernetes pods and should only be used for
118
	// the prober.
119
	PodIP string
120
	// KubeAppProbers is a json with Kubernetes application prober config encoded.
121
	KubeAppProbers      string
122
	NodeType            model.NodeType
123
	StatusPort          uint16
124
	AdminPort           uint16
125
	IPv6                bool
126
	Probes              []ready.Prober
127
	EnvoyPrometheusPort int
128
	Context             context.Context
129
	FetchDNS            func() *dnsProto.NameTable
130
	NoEnvoy             bool
131
	GRPCBootstrap       string
132
	EnableProfiling     bool
133
	// PrometheusRegistry to use. Just for testing.
134
	PrometheusRegistry prometheus.Gatherer
135
	Shutdown           context.CancelFunc
136
	TriggerDrain       func()
137
}
138

139
// Server provides an endpoint for handling status probes.
140
type Server struct {
141
	ready                 []ready.Prober
142
	prometheus            *PrometheusScrapeConfiguration
143
	mutex                 sync.RWMutex
144
	appProbersDestination string
145
	appKubeProbers        KubeAppProbers
146
	appProbeClient        map[string]*http.Client
147
	statusPort            uint16
148
	lastProbeSuccessful   bool
149
	envoyStatsPort        int
150
	fetchDNS              func() *dnsProto.NameTable
151
	upstreamLocalAddress  *net.TCPAddr
152
	config                Options
153
	http                  *http.Client
154
	enableProfiling       bool
155
	registry              prometheus.Gatherer
156
	shutdown              context.CancelFunc
157
	drain                 func()
158
}
159

160
func initializeMonitoring() (prometheus.Gatherer, error) {
161
	registry := prometheus.NewRegistry()
162
	wrapped := prometheus.WrapRegistererWithPrefix("istio_agent_", registry)
163
	wrapped.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
164
	wrapped.MustRegister(collectors.NewGoCollector())
165

166
	_, err := monitoring.RegisterPrometheusExporter(wrapped, registry)
167
	if err != nil {
168
		return nil, fmt.Errorf("could not setup exporter: %v", err)
169
	}
170
	return registry, nil
171
}
172

173
// NewServer creates a new status server.
174
func NewServer(config Options) (*Server, error) {
175
	localhost := localHostIPv4
176
	upstreamLocalAddress := UpstreamLocalAddressIPv4
177
	if config.IPv6 {
178
		localhost = localHostIPv6
179
		upstreamLocalAddress = UpstreamLocalAddressIPv6
180
	} else {
181
		// if not ipv6-only, it can be ipv4-only or dual-stack
182
		// let InstanceIP decide the localhost
183
		netIP := net.ParseIP(config.PodIP)
184
		if netIP.To4() == nil && netIP.To16() != nil && !netIP.IsLinkLocalUnicast() {
185
			localhost = localHostIPv6
186
			upstreamLocalAddress = UpstreamLocalAddressIPv6
187
		}
188
	}
189
	probes := make([]ready.Prober, 0)
190
	if !config.NoEnvoy {
191
		probes = append(probes, &ready.Probe{
192
			LocalHostAddr: localhost,
193
			AdminPort:     config.AdminPort,
194
			Context:       config.Context,
195
			NoEnvoy:       config.NoEnvoy,
196
		})
197
	}
198

199
	if config.GRPCBootstrap != "" {
200
		probes = append(probes, grpcready.NewProbe(config.GRPCBootstrap))
201
	}
202

203
	probes = append(probes, config.Probes...)
204
	registry := config.PrometheusRegistry
205
	if registry == nil {
206
		var err error
207
		registry, err = initializeMonitoring()
208
		if err != nil {
209
			return nil, err
210
		}
211
	}
212
	s := &Server{
213
		statusPort:            config.StatusPort,
214
		ready:                 probes,
215
		http:                  &http.Client{},
216
		appProbersDestination: config.PodIP,
217
		envoyStatsPort:        config.EnvoyPrometheusPort,
218
		fetchDNS:              config.FetchDNS,
219
		upstreamLocalAddress:  upstreamLocalAddress,
220
		config:                config,
221
		enableProfiling:       config.EnableProfiling,
222
		registry:              registry,
223
		shutdown: func() {
224
			config.Shutdown()
225
		},
226
		drain: config.TriggerDrain,
227
	}
228
	if LegacyLocalhostProbeDestination.Get() {
229
		s.appProbersDestination = "localhost"
230
	}
231

232
	// Enable prometheus server if its configured and a sidecar
233
	// Because port 15020 is exposed in the gateway Services, we cannot safely serve this endpoint
234
	// If we need to do this in the future, we should use envoy to do routing or have another port to make this internal
235
	// only. For now, its not needed for gateway, as we can just get Envoy stats directly, but if we
236
	// want to expose istio-agent metrics we may want to revisit this.
237
	if cfg, f := PrometheusScrapingConfig.Lookup(); config.NodeType == model.SidecarProxy && f {
238
		var prom PrometheusScrapeConfiguration
239
		if err := json.Unmarshal([]byte(cfg), &prom); err != nil {
240
			return nil, fmt.Errorf("failed to unmarshal %s: %v", PrometheusScrapingConfig.Name, err)
241
		}
242
		log.Infof("Prometheus scraping configuration: %v", prom)
243
		if prom.Scrape != "false" {
244
			s.prometheus = &prom
245
			if s.prometheus.Path == "" {
246
				s.prometheus.Path = "/metrics"
247
			}
248
			if s.prometheus.Port == "" {
249
				s.prometheus.Port = "80"
250
			}
251
			if s.prometheus.Port == strconv.Itoa(int(config.StatusPort)) {
252
				return nil, fmt.Errorf("invalid prometheus scrape configuration: "+
253
					"application port is the same as agent port, which may lead to a recursive loop. "+
254
					"Ensure pod does not have prometheus.io/port=%d label, or that injection is not happening multiple times", config.StatusPort)
255
			}
256
		}
257
	}
258

259
	if config.KubeAppProbers == "" {
260
		return s, nil
261
	}
262
	if err := json.Unmarshal([]byte(config.KubeAppProbers), &s.appKubeProbers); err != nil {
263
		return nil, fmt.Errorf("failed to decode app prober err = %v, json string = %v", err, config.KubeAppProbers)
264
	}
265

266
	s.appProbeClient = make(map[string]*http.Client, len(s.appKubeProbers))
267
	// Validate the map key matching the regex pattern.
268
	for path, prober := range s.appKubeProbers {
269
		err := validateAppKubeProber(path, prober)
270
		if err != nil {
271
			return nil, err
272
		}
273
		if prober.HTTPGet != nil {
274
			d := ProbeDialer()
275
			d.LocalAddr = s.upstreamLocalAddress
276
			// nolint: gosec
277
			// This is matching Kubernetes. It is a reasonable usage of this, as it is just a health check over localhost.
278
			transport, err := setTransportDefaults(&http.Transport{
279
				TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
280
				DialContext:     d.DialContext,
281
				// https://github.com/kubernetes/kubernetes/blob/0153febd9f0098d4b8d0d484927710eaf899ef40/pkg/probe/http/http.go#L55
282
				// Match Kubernetes logic. This also ensures idle timeouts do not trigger probe failures
283
				DisableKeepAlives: !ProbeKeepaliveConnections,
284
			})
285
			if err != nil {
286
				return nil, err
287
			}
288
			// Construct a http client and cache it in order to reuse the connection.
289
			s.appProbeClient[path] = &http.Client{
290
				Timeout: time.Duration(prober.TimeoutSeconds) * time.Second,
291
				// We skip the verification since kubelet skips the verification for HTTPS prober as well
292
				// https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#configure-probes
293
				Transport:     transport,
294
				CheckRedirect: redirectChecker(),
295
			}
296
		}
297
	}
298

299
	return s, nil
300
}
301

302
// Copies logic from https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L129-L130
303
func isRedirect(code int) bool {
304
	return code >= http.StatusMultipleChoices && code < http.StatusBadRequest
305
}
306

307
// Using the same redirect logic that kubelet does: https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L141
308
// This means that:
309
// * If we exceed 10 redirects, the probe fails
310
// * If we redirect somewhere external, the probe succeeds (https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L130)
311
// * If we redirect to the same address, the probe will follow the redirect
312
func redirectChecker() func(*http.Request, []*http.Request) error {
313
	return func(req *http.Request, via []*http.Request) error {
314
		if req.URL.Hostname() != via[0].URL.Hostname() {
315
			return http.ErrUseLastResponse
316
		}
317
		// Default behavior: stop after 10 redirects.
318
		if len(via) >= 10 {
319
			return errors.New("stopped after 10 redirects")
320
		}
321
		return nil
322
	}
323
}
324

325
func validateAppKubeProber(path string, prober *Prober) error {
326
	if !appProberPattern.MatchString(path) {
327
		return fmt.Errorf(`invalid path, must be in form of regex pattern %v`, appProberPattern)
328
	}
329
	count := 0
330
	if prober.HTTPGet != nil {
331
		count++
332
	}
333
	if prober.TCPSocket != nil {
334
		count++
335
	}
336
	if prober.GRPC != nil {
337
		count++
338
	}
339
	if count != 1 {
340
		return fmt.Errorf(`invalid prober type, must be one of type httpGet, tcpSocket or gRPC`)
341
	}
342
	if prober.HTTPGet != nil && prober.HTTPGet.Port.Type != intstr.Int {
343
		return fmt.Errorf("invalid prober config for %v, the port must be int type", path)
344
	}
345
	if prober.TCPSocket != nil && prober.TCPSocket.Port.Type != intstr.Int {
346
		return fmt.Errorf("invalid prober config for %v, the port must be int type", path)
347
	}
348
	return nil
349
}
350

351
// FormatProberURL returns a set of HTTP URLs that pilot agent will serve to take over Kubernetes
352
// app probers.
353
func FormatProberURL(container string) (string, string, string) {
354
	return fmt.Sprintf("/app-health/%v/readyz", container),
355
		fmt.Sprintf("/app-health/%v/livez", container),
356
		fmt.Sprintf("/app-health/%v/startupz", container)
357
}
358

359
// Run opens a the status port and begins accepting probes.
360
func (s *Server) Run(ctx context.Context) {
361
	log.Infof("Opening status port %d", s.statusPort)
362

363
	mux := http.NewServeMux()
364

365
	// Add the handler for ready probes.
366
	mux.HandleFunc(readyPath, s.handleReadyProbe)
367
	// Default path for prom
368
	mux.HandleFunc(`/metrics`, s.handleStats)
369
	// Envoy uses something else - and original agent used the same.
370
	// Keep for backward compat with configs.
371
	mux.HandleFunc(`/stats/prometheus`, s.handleStats)
372
	mux.HandleFunc(quitPath, s.handleQuit)
373
	mux.HandleFunc(drainPath, s.handleDrain)
374
	mux.HandleFunc("/app-health/", s.handleAppProbe)
375

376
	if s.enableProfiling {
377
		// Add the handler for pprof.
378
		mux.HandleFunc("/debug/pprof/", s.handlePprofIndex)
379
		mux.HandleFunc("/debug/pprof/cmdline", s.handlePprofCmdline)
380
		mux.HandleFunc("/debug/pprof/profile", s.handlePprofProfile)
381
		mux.HandleFunc("/debug/pprof/symbol", s.handlePprofSymbol)
382
		mux.HandleFunc("/debug/pprof/trace", s.handlePprofTrace)
383
	}
384
	mux.HandleFunc("/debug/ndsz", s.handleNdsz)
385

386
	l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort))
387
	if err != nil {
388
		log.Errorf("Error listening on status port: %v", err.Error())
389
		return
390
	}
391
	// for testing.
392
	if s.statusPort == 0 {
393
		_, hostPort, _ := net.SplitHostPort(l.Addr().String())
394
		allocatedPort, _ := strconv.Atoi(hostPort)
395
		s.mutex.Lock()
396
		s.statusPort = uint16(allocatedPort)
397
		s.mutex.Unlock()
398
	}
399
	defer l.Close()
400

401
	go func() {
402
		if err := http.Serve(l, mux); err != nil {
403
			if network.IsUnexpectedListenerError(err) {
404
				log.Error(err)
405
			}
406
			select {
407
			case <-ctx.Done():
408
				// We are shutting down already, don't trigger SIGTERM
409
				return
410
			default:
411
				// If the server errors then pilot-agent can never pass readiness or liveness probes
412
				// Therefore, trigger graceful termination by sending SIGTERM to the binary pid
413
				notifyExit()
414
			}
415
		}
416
	}()
417

418
	// Wait for the agent to be shut down.
419
	<-ctx.Done()
420
	log.Info("Status server has successfully terminated")
421
}
422

423
func (s *Server) handlePprofIndex(w http.ResponseWriter, r *http.Request) {
424
	if !isRequestFromLocalhost(r) {
425
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
426
		return
427
	}
428

429
	pprof.Index(w, r)
430
}
431

432
func (s *Server) handlePprofCmdline(w http.ResponseWriter, r *http.Request) {
433
	if !isRequestFromLocalhost(r) {
434
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
435
		return
436
	}
437

438
	pprof.Cmdline(w, r)
439
}
440

441
func (s *Server) handlePprofSymbol(w http.ResponseWriter, r *http.Request) {
442
	if !isRequestFromLocalhost(r) {
443
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
444
		return
445
	}
446

447
	pprof.Symbol(w, r)
448
}
449

450
func (s *Server) handlePprofProfile(w http.ResponseWriter, r *http.Request) {
451
	if !isRequestFromLocalhost(r) {
452
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
453
		return
454
	}
455

456
	pprof.Profile(w, r)
457
}
458

459
func (s *Server) handlePprofTrace(w http.ResponseWriter, r *http.Request) {
460
	if !isRequestFromLocalhost(r) {
461
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
462
		return
463
	}
464

465
	pprof.Trace(w, r)
466
}
467

468
func (s *Server) handleReadyProbe(w http.ResponseWriter, _ *http.Request) {
469
	err := s.isReady()
470
	s.mutex.Lock()
471
	if err != nil {
472
		w.WriteHeader(http.StatusServiceUnavailable)
473

474
		log.Warnf("Envoy proxy is NOT ready: %s", err.Error())
475
		s.lastProbeSuccessful = false
476
	} else {
477
		w.WriteHeader(http.StatusOK)
478

479
		if !s.lastProbeSuccessful {
480
			log.Info("Envoy proxy is ready")
481
		}
482
		s.lastProbeSuccessful = true
483
	}
484
	s.mutex.Unlock()
485
}
486

487
func (s *Server) isReady() error {
488
	for _, p := range s.ready {
489
		if err := p.Check(); err != nil {
490
			return err
491
		}
492
	}
493
	return nil
494
}
495

496
func isRequestFromLocalhost(r *http.Request) bool {
497
	ip, _, err := net.SplitHostPort(r.RemoteAddr)
498
	if err != nil {
499
		return false
500
	}
501

502
	userIP := net.ParseIP(ip)
503
	return userIP.IsLoopback()
504
}
505

506
type PrometheusScrapeConfiguration struct {
507
	Scrape string `json:"scrape"`
508
	Path   string `json:"path"`
509
	Port   string `json:"port"`
510
}
511

512
// handleStats handles prometheus stats scraping. This will scrape envoy metrics, and, if configured,
513
// the application metrics and merge them together.
514
// The merge here is a simple string concatenation. This works for almost all cases, assuming the application
515
// is not exposing the same metrics as Envoy.
516
// This merging works for both FmtText and FmtOpenMetrics and will use the format of the application metrics
517
// Note that we do not return any errors here. If we do, we will drop metrics. For example, the app may be having issues,
518
// but we still want Envoy metrics. Instead, errors are tracked in the failed scrape metrics/logs.
519
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
520
	metrics.ScrapeTotals.Increment()
521
	var err error
522
	var envoy, application io.ReadCloser
523
	var envoyCancel, appCancel context.CancelFunc
524
	defer func() {
525
		if envoy != nil {
526
			err = envoy.Close()
527
			if err != nil {
528
				log.Infof("envoy connection is not closed: %v", err)
529
			}
530
		}
531
		if application != nil {
532
			err = application.Close()
533
			if err != nil {
534
				log.Infof("app connection is not closed: %v", err)
535
			}
536
		}
537
		if envoyCancel != nil {
538
			envoyCancel()
539
		}
540
		if appCancel != nil {
541
			appCancel()
542
		}
543
	}()
544

545
	// Gather all the metrics we will merge
546
	if !s.config.NoEnvoy {
547
		if envoy, envoyCancel, _, err = s.scrape(fmt.Sprintf("http://localhost:%d/stats/prometheus", s.envoyStatsPort), r.Header); err != nil {
548
			log.Errorf("failed scraping envoy metrics: %v", err)
549
			metrics.EnvoyScrapeErrors.Increment()
550
		}
551
	}
552

553
	// Scrape app metrics if defined and capture their format
554
	var format expfmt.Format
555
	if s.prometheus != nil {
556
		var contentType string
557
		url := fmt.Sprintf("http://localhost:%s%s", s.prometheus.Port, s.prometheus.Path)
558
		if application, appCancel, contentType, err = s.scrape(url, r.Header); err != nil {
559
			log.Errorf("failed scraping application metrics: %v", err)
560
			metrics.AppScrapeErrors.Increment()
561
		}
562
		format = negotiateMetricsFormat(contentType)
563
	} else {
564
		// Without app metrics format use a default
565
		format = FmtText
566
	}
567

568
	w.Header().Set("Content-Type", string(format))
569

570
	// Write out the metrics
571
	if err = scrapeAndWriteAgentMetrics(s.registry, io.Writer(w)); err != nil {
572
		log.Errorf("failed scraping and writing agent metrics: %v", err)
573
		metrics.AgentScrapeErrors.Increment()
574
	}
575

576
	if envoy != nil {
577
		_, err = io.Copy(w, envoy)
578
		if err != nil {
579
			log.Errorf("failed to scraping and writing envoy metrics: %v", err)
580
			metrics.EnvoyScrapeErrors.Increment()
581
		}
582
	}
583

584
	// App metrics must go last because if they are FmtOpenMetrics,
585
	// they will have a trailing "# EOF" which terminates the full exposition
586
	if application != nil {
587
		_, err = io.Copy(w, application)
588
		if err != nil {
589
			log.Errorf("failed to scraping and writing application metrics: %v", err)
590
			metrics.AppScrapeErrors.Increment()
591
		}
592
	}
593
}
594

595
const (
596
	// nolint: revive, stylecheck
597
	FmtOpenMetrics_0_0_1 = expfmt.OpenMetricsType + `; version=` + expfmt.OpenMetricsVersion_0_0_1 + `; charset=utf-8`
598
	// nolint: revive, stylecheck
599
	FmtOpenMetrics_1_0_0 = expfmt.OpenMetricsType + `; version=` + expfmt.OpenMetricsVersion_1_0_0 + `; charset=utf-8`
600
	FmtText              = `text/plain; version=` + expfmt.TextVersion + `; charset=utf-8`
601
)
602

603
func negotiateMetricsFormat(contentType string) expfmt.Format {
604
	mediaType, params, err := mime.ParseMediaType(contentType)
605
	if err == nil && mediaType == expfmt.OpenMetricsType {
606
		switch params["version"] {
607
		case expfmt.OpenMetricsVersion_1_0_0:
608
			return FmtOpenMetrics_1_0_0
609
		case expfmt.OpenMetricsVersion_0_0_1, "":
610
			return FmtOpenMetrics_0_0_1
611
		}
612
	}
613
	return FmtText
614
}
615

616
func scrapeAndWriteAgentMetrics(registry prometheus.Gatherer, w io.Writer) error {
617
	mfs, err := registry.Gather()
618
	enc := expfmt.NewEncoder(w, FmtText)
619
	if err != nil {
620
		return err
621
	}
622
	for _, mf := range mfs {
623
		if err := enc.Encode(mf); err != nil {
624
			return err
625
		}
626
	}
627
	return nil
628
}
629

630
func applyHeaders(into http.Header, from http.Header, keys ...string) {
631
	for _, key := range keys {
632
		val := from.Get(key)
633
		if val != "" {
634
			into.Set(key, val)
635
		}
636
	}
637
}
638

639
// getHeaderTimeout parse a string like (1.234) representing number of seconds
640
func getHeaderTimeout(timeout string) (time.Duration, error) {
641
	timeoutSeconds, err := strconv.ParseFloat(timeout, 64)
642
	if err != nil {
643
		return 0 * time.Second, err
644
	}
645

646
	return time.Duration(timeoutSeconds * 1e9), nil
647
}
648

649
// scrape will send a request to the provided url to scrape metrics from
650
// This will attempt to mimic some of Prometheus functionality by passing some of the headers through
651
// such as accept, timeout, and user agent
652
// Returns the scraped metrics reader as well as the response's "Content-Type" header to determine the metrics format
653
func (s *Server) scrape(url string, header http.Header) (io.ReadCloser, context.CancelFunc, string, error) {
654
	var cancel context.CancelFunc
655
	ctx := context.Background()
656
	if timeoutString := header.Get("X-Prometheus-Scrape-Timeout-Seconds"); timeoutString != "" {
657
		timeout, err := getHeaderTimeout(timeoutString)
658
		if err != nil {
659
			log.Warnf("Failed to parse timeout header %v: %v", timeoutString, err)
660
		} else {
661
			ctx, cancel = context.WithTimeout(ctx, timeout)
662
		}
663
	}
664
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
665
	if err != nil {
666
		return nil, cancel, "", err
667
	}
668
	applyHeaders(req.Header, header, "Accept",
669
		"User-Agent",
670
		"X-Prometheus-Scrape-Timeout-Seconds",
671
	)
672

673
	resp, err := s.http.Do(req)
674
	if err != nil {
675
		return nil, cancel, "", fmt.Errorf("error scraping %s: %v", url, err)
676
	}
677
	if resp.StatusCode != http.StatusOK {
678
		resp.Body.Close()
679
		return nil, cancel, "", fmt.Errorf("error scraping %s, status code: %v", url, resp.StatusCode)
680
	}
681
	format := resp.Header.Get("Content-Type")
682
	return resp.Body, cancel, format, nil
683
}
684

685
func (s *Server) handleQuit(w http.ResponseWriter, r *http.Request) {
686
	if !isRequestFromLocalhost(r) {
687
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
688
		return
689
	}
690
	if r.Method != http.MethodPost {
691
		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
692
		return
693
	}
694
	w.WriteHeader(http.StatusOK)
695
	_, _ = w.Write([]byte("OK"))
696
	log.Infof("handling %s, notifying pilot-agent to exit", quitPath)
697
	s.shutdown()
698
}
699

700
func (s *Server) handleDrain(w http.ResponseWriter, r *http.Request) {
701
	if !isRequestFromLocalhost(r) {
702
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
703
		return
704
	}
705
	if r.Method != http.MethodPost {
706
		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
707
		return
708
	}
709
	w.WriteHeader(http.StatusOK)
710
	_, _ = w.Write([]byte("OK"))
711
	log.Infof("handling %s, starting drain", drainPath)
712
	s.drain()
713
}
714

715
func (s *Server) handleAppProbe(w http.ResponseWriter, req *http.Request) {
716
	// Validate the request first.
717
	path := req.URL.Path
718
	if !strings.HasPrefix(path, "/") {
719
		path = "/" + req.URL.Path
720
	}
721
	prober, exists := s.appKubeProbers[path]
722
	if !exists {
723
		log.Errorf("Prober does not exists url %v", path)
724
		w.WriteHeader(http.StatusBadRequest)
725
		_, _ = w.Write([]byte(fmt.Sprintf("app prober config does not exists for %v", path)))
726
		return
727
	}
728

729
	switch {
730
	case prober.HTTPGet != nil:
731
		s.handleAppProbeHTTPGet(w, req, prober, path)
732
	case prober.TCPSocket != nil:
733
		s.handleAppProbeTCPSocket(w, prober)
734
	case prober.GRPC != nil:
735
		s.handleAppProbeGRPC(w, req, prober)
736
	}
737
}
738

739
func (s *Server) handleAppProbeHTTPGet(w http.ResponseWriter, req *http.Request, prober *Prober, path string) {
740
	proberPath := prober.HTTPGet.Path
741
	if !strings.HasPrefix(proberPath, "/") {
742
		proberPath = "/" + proberPath
743
	}
744
	var url string
745

746
	hostPort := net.JoinHostPort(s.appProbersDestination, strconv.Itoa(prober.HTTPGet.Port.IntValue()))
747
	if prober.HTTPGet.Scheme == apimirror.URISchemeHTTPS {
748
		url = fmt.Sprintf("https://%s%s", hostPort, proberPath)
749
	} else {
750
		url = fmt.Sprintf("http://%s%s", hostPort, proberPath)
751
	}
752
	appReq, err := http.NewRequest(http.MethodGet, url, nil)
753
	if err != nil {
754
		log.Errorf("Failed to create request to probe app %v, original url %v", err, path)
755
		w.WriteHeader(http.StatusInternalServerError)
756
		return
757
	}
758

759
	appReq.Host = req.Host
760
	if host, port, err := net.SplitHostPort(req.Host); err == nil {
761
		port, _ := strconv.Atoi(port)
762
		// the port is same as the status port, then we need to replace the port in the host with the real one
763
		if port == int(s.statusPort) {
764
			realPort := strconv.Itoa(prober.HTTPGet.Port.IntValue())
765
			appReq.Host = net.JoinHostPort(host, realPort)
766
		}
767
	}
768
	// Forward incoming headers to the application.
769
	for name, values := range req.Header {
770
		appReq.Header[name] = slices.Clone(values)
771
		if len(values) > 0 && (name == "Host") {
772
			// Probe has specific host header override; honor it
773
			appReq.Host = values[0]
774
		}
775
	}
776

777
	// get the http client must exist because
778
	httpClient := s.appProbeClient[path]
779

780
	// Send the request.
781
	response, err := httpClient.Do(appReq)
782
	if err != nil {
783
		log.Errorf("Request to probe app failed: %v, original URL path = %v\napp URL path = %v", err, path, proberPath)
784
		w.WriteHeader(http.StatusInternalServerError)
785
		return
786
	}
787
	defer func() {
788
		// Drain and close the body to let the Transport reuse the connection
789
		_, _ = io.Copy(io.Discard, response.Body)
790
		_ = response.Body.Close()
791
	}()
792

793
	if isRedirect(response.StatusCode) { // Redirect
794
		// In other cases, we return the original status code. For redirects, it is illegal to
795
		// not have Location header, so we need to switch to just 200.
796
		w.WriteHeader(http.StatusOK)
797
		return
798
	}
799
	// We only write the status code to the response.
800
	w.WriteHeader(response.StatusCode)
801
	// Return the body from probe as well
802
	b, _ := k8sUtilIo.ReadAtMost(response.Body, maxRespBodyLength)
803
	_, _ = w.Write(b)
804
}
805

806
func (s *Server) handleAppProbeTCPSocket(w http.ResponseWriter, prober *Prober) {
807
	timeout := time.Duration(prober.TimeoutSeconds) * time.Second
808

809
	d := ProbeDialer()
810
	d.LocalAddr = s.upstreamLocalAddress
811
	d.Timeout = timeout
812

813
	conn, err := d.Dial("tcp", net.JoinHostPort(s.appProbersDestination, strconv.Itoa(prober.TCPSocket.Port.IntValue())))
814
	if err != nil {
815
		w.WriteHeader(http.StatusInternalServerError)
816
	} else {
817
		w.WriteHeader(http.StatusOK)
818
		err = conn.Close()
819
		if err != nil {
820
			log.Infof("tcp connection is not closed: %v", err)
821
		}
822
	}
823
}
824

825
func (s *Server) handleAppProbeGRPC(w http.ResponseWriter, req *http.Request, prober *Prober) {
826
	timeout := time.Duration(prober.TimeoutSeconds) * time.Second
827
	// the DialOptions are referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L55-L59
828
	opts := []grpc.DialOption{
829
		grpc.WithBlock(),
830
		grpc.WithTransportCredentials(insecure.NewCredentials()), // credentials are currently not supported
831
		grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
832
			d := ProbeDialer()
833
			d.LocalAddr = s.upstreamLocalAddress
834
			d.Timeout = timeout
835
			return d.DialContext(ctx, "tcp", addr)
836
		}),
837
	}
838
	if userAgent := req.Header["User-Agent"]; len(userAgent) > 0 {
839
		// simulate kubelet
840
		// please refer to:
841
		// https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L56
842
		// https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/http/http.go#L103
843
		opts = append(opts, grpc.WithUserAgent(userAgent[0]))
844
	}
845

846
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
847
	defer cancel()
848

849
	addr := net.JoinHostPort(s.appProbersDestination, strconv.Itoa(int(prober.GRPC.Port)))
850
	conn, err := grpc.DialContext(ctx, addr, opts...)
851
	if err != nil {
852
		log.Errorf("Failed to create grpc connection to probe app: %v", err)
853
		w.WriteHeader(http.StatusInternalServerError)
854
		return
855
	}
856
	defer conn.Close()
857

858
	var svc string
859
	if prober.GRPC.Service != nil {
860
		svc = *prober.GRPC.Service
861
	}
862
	grpcClient := grpcHealth.NewHealthClient(conn)
863
	resp, err := grpcClient.Check(ctx, &grpcHealth.HealthCheckRequest{
864
		Service: svc,
865
	})
866
	// the error handling is referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L88-L106
867
	if err != nil {
868
		status, ok := grpcStatus.FromError(err)
869
		if ok {
870
			switch status.Code() {
871
			case codes.Unimplemented:
872
				log.Errorf("server does not implement the grpc health protocol (grpc.health.v1.Health): %v", err)
873
			case codes.DeadlineExceeded:
874
				log.Errorf("grpc request not finished within timeout: %v", err)
875
			default:
876
				log.Errorf("grpc probe failed: %v", err)
877
			}
878
		} else {
879
			log.Errorf("grpc probe failed: %v", err)
880
		}
881
		w.WriteHeader(http.StatusInternalServerError)
882
		return
883
	}
884

885
	if resp.GetStatus() == grpcHealth.HealthCheckResponse_SERVING {
886
		w.WriteHeader(http.StatusOK)
887
		return
888
	}
889
	w.WriteHeader(http.StatusInternalServerError)
890
}
891

892
func (s *Server) handleNdsz(w http.ResponseWriter, r *http.Request) {
893
	if !isRequestFromLocalhost(r) {
894
		http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
895
		return
896
	}
897
	nametable := s.fetchDNS()
898
	if nametable == nil {
899
		// See https://golang.org/doc/faq#nil_error for why writeJSONProto cannot handle this
900
		w.WriteHeader(http.StatusNotFound)
901
		_, _ = w.Write([]byte(`{}`))
902
		return
903
	}
904
	writeJSONProto(w, nametable)
905
}
906

907
// writeJSONProto writes a protobuf to a json payload, handling content type, marshaling, and errors
908
func writeJSONProto(w http.ResponseWriter, obj any) {
909
	w.Header().Set("Content-Type", "application/json")
910
	b, err := config.ToJSON(obj)
911
	if err != nil {
912
		w.WriteHeader(http.StatusInternalServerError)
913
		_, _ = w.Write([]byte(err.Error()))
914
		return
915
	}
916
	_, err = w.Write(b)
917
	if err != nil {
918
		w.WriteHeader(http.StatusInternalServerError)
919
	}
920
}
921

922
// notifyExit sends SIGTERM to itself
923
func notifyExit() {
924
	p, err := os.FindProcess(os.Getpid())
925
	if err != nil {
926
		log.Error(err)
927
	}
928
	if err := p.Signal(syscall.SIGTERM); err != nil {
929
		log.Errorf("failed to send SIGTERM to self: %v", err)
930
	}
931
}
932

933
var defaultTransport = http.DefaultTransport.(*http.Transport)
934

935
// SetTransportDefaults mirrors Kubernetes probe settings
936
// https://github.com/kubernetes/kubernetes/blob/0153febd9f0098d4b8d0d484927710eaf899ef40/pkg/probe/http/http.go#L52
937
func setTransportDefaults(t *http.Transport) (*http.Transport, error) {
938
	if !EnableHTTP2Probing {
939
		return t, nil
940
	}
941
	if t.TLSHandshakeTimeout == 0 {
942
		t.TLSHandshakeTimeout = defaultTransport.TLSHandshakeTimeout
943
	}
944
	if t.IdleConnTimeout == 0 {
945
		t.IdleConnTimeout = defaultTransport.IdleConnTimeout
946
	}
947
	t2, err := http2.ConfigureTransports(t)
948
	if err != nil {
949
		return nil, err
950
	}
951
	t2.ReadIdleTimeout = time.Duration(30) * time.Second
952
	t2.PingTimeout = time.Duration(15) * time.Second
953
	return t, nil
954
}
955

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.