istio
954 строки · 30.9 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package status
16
17import (
18"context"
19"crypto/tls"
20"encoding/json"
21"errors"
22"fmt"
23"io"
24"mime"
25"net"
26"net/http"
27"net/http/pprof"
28"os"
29"regexp"
30"strconv"
31"strings"
32"sync"
33"syscall"
34"time"
35
36"github.com/prometheus/client_golang/prometheus"
37"github.com/prometheus/client_golang/prometheus/collectors"
38"github.com/prometheus/common/expfmt"
39"golang.org/x/net/http2"
40"google.golang.org/grpc"
41"google.golang.org/grpc/codes"
42"google.golang.org/grpc/credentials/insecure"
43grpcHealth "google.golang.org/grpc/health/grpc_health_v1"
44grpcStatus "google.golang.org/grpc/status"
45"k8s.io/apimachinery/pkg/util/intstr"
46k8sUtilIo "k8s.io/utils/io"
47
48"istio.io/istio/pilot/cmd/pilot-agent/metrics"
49"istio.io/istio/pilot/cmd/pilot-agent/status/grpcready"
50"istio.io/istio/pilot/cmd/pilot-agent/status/ready"
51"istio.io/istio/pilot/pkg/model"
52"istio.io/istio/pkg/config"
53dnsProto "istio.io/istio/pkg/dns/proto"
54"istio.io/istio/pkg/env"
55"istio.io/istio/pkg/kube/apimirror"
56"istio.io/istio/pkg/log"
57"istio.io/istio/pkg/monitoring"
58"istio.io/istio/pkg/network"
59"istio.io/istio/pkg/slices"
60)
61
62const (
63// readyPath is for the pilot agent readiness itself.
64readyPath = "/healthz/ready"
65// quitPath is to notify the pilot agent to quit.
66quitPath = "/quitquitquit"
67drainPath = "/drain"
68// KubeAppProberEnvName is the name of the command line flag for pilot agent to pass app prober config.
69// The json encoded string to pass app HTTP probe information from injector(istioctl or webhook).
70// For example, ISTIO_KUBE_APP_PROBERS='{"/app-health/httpbin/livez":{"httpGet":{"path": "/hello", "port": 8080}}.
71// indicates that httpbin container liveness prober port is 8080 and probing path is /hello.
72// This environment variable should never be set manually.
73KubeAppProberEnvName = "ISTIO_KUBE_APP_PROBERS"
74
75localHostIPv4 = "127.0.0.1"
76localHostIPv6 = "::1"
77maxRespBodyLength = 10 * 1 << 10
78)
79
80var (
81UpstreamLocalAddressIPv4 = &net.TCPAddr{IP: net.ParseIP("127.0.0.6")}
82UpstreamLocalAddressIPv6 = &net.TCPAddr{IP: net.ParseIP("::6")}
83)
84
85var PrometheusScrapingConfig = env.Register("ISTIO_PROMETHEUS_ANNOTATIONS", "", "")
86
87var (
88appProberPattern = regexp.MustCompile(`^/app-health/[^/]+/(livez|readyz|startupz)$`)
89
90EnableHTTP2Probing = env.Register("ISTIO_ENABLE_HTTP2_PROBING", true,
91"If enabled, HTTP2 probes will be enabled for HTTPS probes, following Kubernetes").Get()
92
93LegacyLocalhostProbeDestination = env.Register("REWRITE_PROBE_LEGACY_LOCALHOST_DESTINATION", false,
94"If enabled, readiness probes will be sent to 'localhost'. Otherwise, they will be sent to the Pod's IP, matching Kubernetes' behavior.")
95
96ProbeKeepaliveConnections = env.Register("ENABLE_PROBE_KEEPALIVE_CONNECTIONS", false,
97"If enabled, readiness probes will keep the connection from pilot-agent to the application alive. "+
98"This mirrors older Istio versions' behaviors, but not kubelet's.").Get()
99)
100
101// KubeAppProbers holds the information about a Kubernetes pod prober.
102// It's a map from the prober URL path to the Kubernetes Prober config.
103// For example, "/app-health/hello-world/livez" entry contains liveness prober config for
104// container "hello-world".
105type KubeAppProbers map[string]*Prober
106
107// Prober represents a single container prober
108type Prober struct {
109HTTPGet *apimirror.HTTPGetAction `json:"httpGet,omitempty"`
110TCPSocket *apimirror.TCPSocketAction `json:"tcpSocket,omitempty"`
111GRPC *apimirror.GRPCAction `json:"grpc,omitempty"`
112TimeoutSeconds int32 `json:"timeoutSeconds,omitempty"`
113}
114
115// Options for the status server.
116type Options struct {
117// Ip of the pod. Note: this is only applicable for Kubernetes pods and should only be used for
118// the prober.
119PodIP string
120// KubeAppProbers is a json with Kubernetes application prober config encoded.
121KubeAppProbers string
122NodeType model.NodeType
123StatusPort uint16
124AdminPort uint16
125IPv6 bool
126Probes []ready.Prober
127EnvoyPrometheusPort int
128Context context.Context
129FetchDNS func() *dnsProto.NameTable
130NoEnvoy bool
131GRPCBootstrap string
132EnableProfiling bool
133// PrometheusRegistry to use. Just for testing.
134PrometheusRegistry prometheus.Gatherer
135Shutdown context.CancelFunc
136TriggerDrain func()
137}
138
139// Server provides an endpoint for handling status probes.
140type Server struct {
141ready []ready.Prober
142prometheus *PrometheusScrapeConfiguration
143mutex sync.RWMutex
144appProbersDestination string
145appKubeProbers KubeAppProbers
146appProbeClient map[string]*http.Client
147statusPort uint16
148lastProbeSuccessful bool
149envoyStatsPort int
150fetchDNS func() *dnsProto.NameTable
151upstreamLocalAddress *net.TCPAddr
152config Options
153http *http.Client
154enableProfiling bool
155registry prometheus.Gatherer
156shutdown context.CancelFunc
157drain func()
158}
159
160func initializeMonitoring() (prometheus.Gatherer, error) {
161registry := prometheus.NewRegistry()
162wrapped := prometheus.WrapRegistererWithPrefix("istio_agent_", registry)
163wrapped.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
164wrapped.MustRegister(collectors.NewGoCollector())
165
166_, err := monitoring.RegisterPrometheusExporter(wrapped, registry)
167if err != nil {
168return nil, fmt.Errorf("could not setup exporter: %v", err)
169}
170return registry, nil
171}
172
173// NewServer creates a new status server.
174func NewServer(config Options) (*Server, error) {
175localhost := localHostIPv4
176upstreamLocalAddress := UpstreamLocalAddressIPv4
177if config.IPv6 {
178localhost = localHostIPv6
179upstreamLocalAddress = UpstreamLocalAddressIPv6
180} else {
181// if not ipv6-only, it can be ipv4-only or dual-stack
182// let InstanceIP decide the localhost
183netIP := net.ParseIP(config.PodIP)
184if netIP.To4() == nil && netIP.To16() != nil && !netIP.IsLinkLocalUnicast() {
185localhost = localHostIPv6
186upstreamLocalAddress = UpstreamLocalAddressIPv6
187}
188}
189probes := make([]ready.Prober, 0)
190if !config.NoEnvoy {
191probes = append(probes, &ready.Probe{
192LocalHostAddr: localhost,
193AdminPort: config.AdminPort,
194Context: config.Context,
195NoEnvoy: config.NoEnvoy,
196})
197}
198
199if config.GRPCBootstrap != "" {
200probes = append(probes, grpcready.NewProbe(config.GRPCBootstrap))
201}
202
203probes = append(probes, config.Probes...)
204registry := config.PrometheusRegistry
205if registry == nil {
206var err error
207registry, err = initializeMonitoring()
208if err != nil {
209return nil, err
210}
211}
212s := &Server{
213statusPort: config.StatusPort,
214ready: probes,
215http: &http.Client{},
216appProbersDestination: config.PodIP,
217envoyStatsPort: config.EnvoyPrometheusPort,
218fetchDNS: config.FetchDNS,
219upstreamLocalAddress: upstreamLocalAddress,
220config: config,
221enableProfiling: config.EnableProfiling,
222registry: registry,
223shutdown: func() {
224config.Shutdown()
225},
226drain: config.TriggerDrain,
227}
228if LegacyLocalhostProbeDestination.Get() {
229s.appProbersDestination = "localhost"
230}
231
232// Enable prometheus server if its configured and a sidecar
233// Because port 15020 is exposed in the gateway Services, we cannot safely serve this endpoint
234// If we need to do this in the future, we should use envoy to do routing or have another port to make this internal
235// only. For now, its not needed for gateway, as we can just get Envoy stats directly, but if we
236// want to expose istio-agent metrics we may want to revisit this.
237if cfg, f := PrometheusScrapingConfig.Lookup(); config.NodeType == model.SidecarProxy && f {
238var prom PrometheusScrapeConfiguration
239if err := json.Unmarshal([]byte(cfg), &prom); err != nil {
240return nil, fmt.Errorf("failed to unmarshal %s: %v", PrometheusScrapingConfig.Name, err)
241}
242log.Infof("Prometheus scraping configuration: %v", prom)
243if prom.Scrape != "false" {
244s.prometheus = &prom
245if s.prometheus.Path == "" {
246s.prometheus.Path = "/metrics"
247}
248if s.prometheus.Port == "" {
249s.prometheus.Port = "80"
250}
251if s.prometheus.Port == strconv.Itoa(int(config.StatusPort)) {
252return nil, fmt.Errorf("invalid prometheus scrape configuration: "+
253"application port is the same as agent port, which may lead to a recursive loop. "+
254"Ensure pod does not have prometheus.io/port=%d label, or that injection is not happening multiple times", config.StatusPort)
255}
256}
257}
258
259if config.KubeAppProbers == "" {
260return s, nil
261}
262if err := json.Unmarshal([]byte(config.KubeAppProbers), &s.appKubeProbers); err != nil {
263return nil, fmt.Errorf("failed to decode app prober err = %v, json string = %v", err, config.KubeAppProbers)
264}
265
266s.appProbeClient = make(map[string]*http.Client, len(s.appKubeProbers))
267// Validate the map key matching the regex pattern.
268for path, prober := range s.appKubeProbers {
269err := validateAppKubeProber(path, prober)
270if err != nil {
271return nil, err
272}
273if prober.HTTPGet != nil {
274d := ProbeDialer()
275d.LocalAddr = s.upstreamLocalAddress
276// nolint: gosec
277// This is matching Kubernetes. It is a reasonable usage of this, as it is just a health check over localhost.
278transport, err := setTransportDefaults(&http.Transport{
279TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
280DialContext: d.DialContext,
281// https://github.com/kubernetes/kubernetes/blob/0153febd9f0098d4b8d0d484927710eaf899ef40/pkg/probe/http/http.go#L55
282// Match Kubernetes logic. This also ensures idle timeouts do not trigger probe failures
283DisableKeepAlives: !ProbeKeepaliveConnections,
284})
285if err != nil {
286return nil, err
287}
288// Construct a http client and cache it in order to reuse the connection.
289s.appProbeClient[path] = &http.Client{
290Timeout: time.Duration(prober.TimeoutSeconds) * time.Second,
291// We skip the verification since kubelet skips the verification for HTTPS prober as well
292// https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#configure-probes
293Transport: transport,
294CheckRedirect: redirectChecker(),
295}
296}
297}
298
299return s, nil
300}
301
302// Copies logic from https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L129-L130
303func isRedirect(code int) bool {
304return code >= http.StatusMultipleChoices && code < http.StatusBadRequest
305}
306
307// Using the same redirect logic that kubelet does: https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L141
308// This means that:
309// * If we exceed 10 redirects, the probe fails
310// * If we redirect somewhere external, the probe succeeds (https://github.com/kubernetes/kubernetes/blob/b152001f459/pkg/probe/http/http.go#L130)
311// * If we redirect to the same address, the probe will follow the redirect
312func redirectChecker() func(*http.Request, []*http.Request) error {
313return func(req *http.Request, via []*http.Request) error {
314if req.URL.Hostname() != via[0].URL.Hostname() {
315return http.ErrUseLastResponse
316}
317// Default behavior: stop after 10 redirects.
318if len(via) >= 10 {
319return errors.New("stopped after 10 redirects")
320}
321return nil
322}
323}
324
325func validateAppKubeProber(path string, prober *Prober) error {
326if !appProberPattern.MatchString(path) {
327return fmt.Errorf(`invalid path, must be in form of regex pattern %v`, appProberPattern)
328}
329count := 0
330if prober.HTTPGet != nil {
331count++
332}
333if prober.TCPSocket != nil {
334count++
335}
336if prober.GRPC != nil {
337count++
338}
339if count != 1 {
340return fmt.Errorf(`invalid prober type, must be one of type httpGet, tcpSocket or gRPC`)
341}
342if prober.HTTPGet != nil && prober.HTTPGet.Port.Type != intstr.Int {
343return fmt.Errorf("invalid prober config for %v, the port must be int type", path)
344}
345if prober.TCPSocket != nil && prober.TCPSocket.Port.Type != intstr.Int {
346return fmt.Errorf("invalid prober config for %v, the port must be int type", path)
347}
348return nil
349}
350
351// FormatProberURL returns a set of HTTP URLs that pilot agent will serve to take over Kubernetes
352// app probers.
353func FormatProberURL(container string) (string, string, string) {
354return fmt.Sprintf("/app-health/%v/readyz", container),
355fmt.Sprintf("/app-health/%v/livez", container),
356fmt.Sprintf("/app-health/%v/startupz", container)
357}
358
359// Run opens a the status port and begins accepting probes.
360func (s *Server) Run(ctx context.Context) {
361log.Infof("Opening status port %d", s.statusPort)
362
363mux := http.NewServeMux()
364
365// Add the handler for ready probes.
366mux.HandleFunc(readyPath, s.handleReadyProbe)
367// Default path for prom
368mux.HandleFunc(`/metrics`, s.handleStats)
369// Envoy uses something else - and original agent used the same.
370// Keep for backward compat with configs.
371mux.HandleFunc(`/stats/prometheus`, s.handleStats)
372mux.HandleFunc(quitPath, s.handleQuit)
373mux.HandleFunc(drainPath, s.handleDrain)
374mux.HandleFunc("/app-health/", s.handleAppProbe)
375
376if s.enableProfiling {
377// Add the handler for pprof.
378mux.HandleFunc("/debug/pprof/", s.handlePprofIndex)
379mux.HandleFunc("/debug/pprof/cmdline", s.handlePprofCmdline)
380mux.HandleFunc("/debug/pprof/profile", s.handlePprofProfile)
381mux.HandleFunc("/debug/pprof/symbol", s.handlePprofSymbol)
382mux.HandleFunc("/debug/pprof/trace", s.handlePprofTrace)
383}
384mux.HandleFunc("/debug/ndsz", s.handleNdsz)
385
386l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort))
387if err != nil {
388log.Errorf("Error listening on status port: %v", err.Error())
389return
390}
391// for testing.
392if s.statusPort == 0 {
393_, hostPort, _ := net.SplitHostPort(l.Addr().String())
394allocatedPort, _ := strconv.Atoi(hostPort)
395s.mutex.Lock()
396s.statusPort = uint16(allocatedPort)
397s.mutex.Unlock()
398}
399defer l.Close()
400
401go func() {
402if err := http.Serve(l, mux); err != nil {
403if network.IsUnexpectedListenerError(err) {
404log.Error(err)
405}
406select {
407case <-ctx.Done():
408// We are shutting down already, don't trigger SIGTERM
409return
410default:
411// If the server errors then pilot-agent can never pass readiness or liveness probes
412// Therefore, trigger graceful termination by sending SIGTERM to the binary pid
413notifyExit()
414}
415}
416}()
417
418// Wait for the agent to be shut down.
419<-ctx.Done()
420log.Info("Status server has successfully terminated")
421}
422
423func (s *Server) handlePprofIndex(w http.ResponseWriter, r *http.Request) {
424if !isRequestFromLocalhost(r) {
425http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
426return
427}
428
429pprof.Index(w, r)
430}
431
432func (s *Server) handlePprofCmdline(w http.ResponseWriter, r *http.Request) {
433if !isRequestFromLocalhost(r) {
434http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
435return
436}
437
438pprof.Cmdline(w, r)
439}
440
441func (s *Server) handlePprofSymbol(w http.ResponseWriter, r *http.Request) {
442if !isRequestFromLocalhost(r) {
443http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
444return
445}
446
447pprof.Symbol(w, r)
448}
449
450func (s *Server) handlePprofProfile(w http.ResponseWriter, r *http.Request) {
451if !isRequestFromLocalhost(r) {
452http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
453return
454}
455
456pprof.Profile(w, r)
457}
458
459func (s *Server) handlePprofTrace(w http.ResponseWriter, r *http.Request) {
460if !isRequestFromLocalhost(r) {
461http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
462return
463}
464
465pprof.Trace(w, r)
466}
467
468func (s *Server) handleReadyProbe(w http.ResponseWriter, _ *http.Request) {
469err := s.isReady()
470s.mutex.Lock()
471if err != nil {
472w.WriteHeader(http.StatusServiceUnavailable)
473
474log.Warnf("Envoy proxy is NOT ready: %s", err.Error())
475s.lastProbeSuccessful = false
476} else {
477w.WriteHeader(http.StatusOK)
478
479if !s.lastProbeSuccessful {
480log.Info("Envoy proxy is ready")
481}
482s.lastProbeSuccessful = true
483}
484s.mutex.Unlock()
485}
486
487func (s *Server) isReady() error {
488for _, p := range s.ready {
489if err := p.Check(); err != nil {
490return err
491}
492}
493return nil
494}
495
496func isRequestFromLocalhost(r *http.Request) bool {
497ip, _, err := net.SplitHostPort(r.RemoteAddr)
498if err != nil {
499return false
500}
501
502userIP := net.ParseIP(ip)
503return userIP.IsLoopback()
504}
505
506type PrometheusScrapeConfiguration struct {
507Scrape string `json:"scrape"`
508Path string `json:"path"`
509Port string `json:"port"`
510}
511
512// handleStats handles prometheus stats scraping. This will scrape envoy metrics, and, if configured,
513// the application metrics and merge them together.
514// The merge here is a simple string concatenation. This works for almost all cases, assuming the application
515// is not exposing the same metrics as Envoy.
516// This merging works for both FmtText and FmtOpenMetrics and will use the format of the application metrics
517// Note that we do not return any errors here. If we do, we will drop metrics. For example, the app may be having issues,
518// but we still want Envoy metrics. Instead, errors are tracked in the failed scrape metrics/logs.
519func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
520metrics.ScrapeTotals.Increment()
521var err error
522var envoy, application io.ReadCloser
523var envoyCancel, appCancel context.CancelFunc
524defer func() {
525if envoy != nil {
526err = envoy.Close()
527if err != nil {
528log.Infof("envoy connection is not closed: %v", err)
529}
530}
531if application != nil {
532err = application.Close()
533if err != nil {
534log.Infof("app connection is not closed: %v", err)
535}
536}
537if envoyCancel != nil {
538envoyCancel()
539}
540if appCancel != nil {
541appCancel()
542}
543}()
544
545// Gather all the metrics we will merge
546if !s.config.NoEnvoy {
547if envoy, envoyCancel, _, err = s.scrape(fmt.Sprintf("http://localhost:%d/stats/prometheus", s.envoyStatsPort), r.Header); err != nil {
548log.Errorf("failed scraping envoy metrics: %v", err)
549metrics.EnvoyScrapeErrors.Increment()
550}
551}
552
553// Scrape app metrics if defined and capture their format
554var format expfmt.Format
555if s.prometheus != nil {
556var contentType string
557url := fmt.Sprintf("http://localhost:%s%s", s.prometheus.Port, s.prometheus.Path)
558if application, appCancel, contentType, err = s.scrape(url, r.Header); err != nil {
559log.Errorf("failed scraping application metrics: %v", err)
560metrics.AppScrapeErrors.Increment()
561}
562format = negotiateMetricsFormat(contentType)
563} else {
564// Without app metrics format use a default
565format = FmtText
566}
567
568w.Header().Set("Content-Type", string(format))
569
570// Write out the metrics
571if err = scrapeAndWriteAgentMetrics(s.registry, io.Writer(w)); err != nil {
572log.Errorf("failed scraping and writing agent metrics: %v", err)
573metrics.AgentScrapeErrors.Increment()
574}
575
576if envoy != nil {
577_, err = io.Copy(w, envoy)
578if err != nil {
579log.Errorf("failed to scraping and writing envoy metrics: %v", err)
580metrics.EnvoyScrapeErrors.Increment()
581}
582}
583
584// App metrics must go last because if they are FmtOpenMetrics,
585// they will have a trailing "# EOF" which terminates the full exposition
586if application != nil {
587_, err = io.Copy(w, application)
588if err != nil {
589log.Errorf("failed to scraping and writing application metrics: %v", err)
590metrics.AppScrapeErrors.Increment()
591}
592}
593}
594
595const (
596// nolint: revive, stylecheck
597FmtOpenMetrics_0_0_1 = expfmt.OpenMetricsType + `; version=` + expfmt.OpenMetricsVersion_0_0_1 + `; charset=utf-8`
598// nolint: revive, stylecheck
599FmtOpenMetrics_1_0_0 = expfmt.OpenMetricsType + `; version=` + expfmt.OpenMetricsVersion_1_0_0 + `; charset=utf-8`
600FmtText = `text/plain; version=` + expfmt.TextVersion + `; charset=utf-8`
601)
602
603func negotiateMetricsFormat(contentType string) expfmt.Format {
604mediaType, params, err := mime.ParseMediaType(contentType)
605if err == nil && mediaType == expfmt.OpenMetricsType {
606switch params["version"] {
607case expfmt.OpenMetricsVersion_1_0_0:
608return FmtOpenMetrics_1_0_0
609case expfmt.OpenMetricsVersion_0_0_1, "":
610return FmtOpenMetrics_0_0_1
611}
612}
613return FmtText
614}
615
616func scrapeAndWriteAgentMetrics(registry prometheus.Gatherer, w io.Writer) error {
617mfs, err := registry.Gather()
618enc := expfmt.NewEncoder(w, FmtText)
619if err != nil {
620return err
621}
622for _, mf := range mfs {
623if err := enc.Encode(mf); err != nil {
624return err
625}
626}
627return nil
628}
629
630func applyHeaders(into http.Header, from http.Header, keys ...string) {
631for _, key := range keys {
632val := from.Get(key)
633if val != "" {
634into.Set(key, val)
635}
636}
637}
638
639// getHeaderTimeout parse a string like (1.234) representing number of seconds
640func getHeaderTimeout(timeout string) (time.Duration, error) {
641timeoutSeconds, err := strconv.ParseFloat(timeout, 64)
642if err != nil {
643return 0 * time.Second, err
644}
645
646return time.Duration(timeoutSeconds * 1e9), nil
647}
648
649// scrape will send a request to the provided url to scrape metrics from
650// This will attempt to mimic some of Prometheus functionality by passing some of the headers through
651// such as accept, timeout, and user agent
652// Returns the scraped metrics reader as well as the response's "Content-Type" header to determine the metrics format
653func (s *Server) scrape(url string, header http.Header) (io.ReadCloser, context.CancelFunc, string, error) {
654var cancel context.CancelFunc
655ctx := context.Background()
656if timeoutString := header.Get("X-Prometheus-Scrape-Timeout-Seconds"); timeoutString != "" {
657timeout, err := getHeaderTimeout(timeoutString)
658if err != nil {
659log.Warnf("Failed to parse timeout header %v: %v", timeoutString, err)
660} else {
661ctx, cancel = context.WithTimeout(ctx, timeout)
662}
663}
664req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
665if err != nil {
666return nil, cancel, "", err
667}
668applyHeaders(req.Header, header, "Accept",
669"User-Agent",
670"X-Prometheus-Scrape-Timeout-Seconds",
671)
672
673resp, err := s.http.Do(req)
674if err != nil {
675return nil, cancel, "", fmt.Errorf("error scraping %s: %v", url, err)
676}
677if resp.StatusCode != http.StatusOK {
678resp.Body.Close()
679return nil, cancel, "", fmt.Errorf("error scraping %s, status code: %v", url, resp.StatusCode)
680}
681format := resp.Header.Get("Content-Type")
682return resp.Body, cancel, format, nil
683}
684
685func (s *Server) handleQuit(w http.ResponseWriter, r *http.Request) {
686if !isRequestFromLocalhost(r) {
687http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
688return
689}
690if r.Method != http.MethodPost {
691http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
692return
693}
694w.WriteHeader(http.StatusOK)
695_, _ = w.Write([]byte("OK"))
696log.Infof("handling %s, notifying pilot-agent to exit", quitPath)
697s.shutdown()
698}
699
700func (s *Server) handleDrain(w http.ResponseWriter, r *http.Request) {
701if !isRequestFromLocalhost(r) {
702http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
703return
704}
705if r.Method != http.MethodPost {
706http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
707return
708}
709w.WriteHeader(http.StatusOK)
710_, _ = w.Write([]byte("OK"))
711log.Infof("handling %s, starting drain", drainPath)
712s.drain()
713}
714
715func (s *Server) handleAppProbe(w http.ResponseWriter, req *http.Request) {
716// Validate the request first.
717path := req.URL.Path
718if !strings.HasPrefix(path, "/") {
719path = "/" + req.URL.Path
720}
721prober, exists := s.appKubeProbers[path]
722if !exists {
723log.Errorf("Prober does not exists url %v", path)
724w.WriteHeader(http.StatusBadRequest)
725_, _ = w.Write([]byte(fmt.Sprintf("app prober config does not exists for %v", path)))
726return
727}
728
729switch {
730case prober.HTTPGet != nil:
731s.handleAppProbeHTTPGet(w, req, prober, path)
732case prober.TCPSocket != nil:
733s.handleAppProbeTCPSocket(w, prober)
734case prober.GRPC != nil:
735s.handleAppProbeGRPC(w, req, prober)
736}
737}
738
739func (s *Server) handleAppProbeHTTPGet(w http.ResponseWriter, req *http.Request, prober *Prober, path string) {
740proberPath := prober.HTTPGet.Path
741if !strings.HasPrefix(proberPath, "/") {
742proberPath = "/" + proberPath
743}
744var url string
745
746hostPort := net.JoinHostPort(s.appProbersDestination, strconv.Itoa(prober.HTTPGet.Port.IntValue()))
747if prober.HTTPGet.Scheme == apimirror.URISchemeHTTPS {
748url = fmt.Sprintf("https://%s%s", hostPort, proberPath)
749} else {
750url = fmt.Sprintf("http://%s%s", hostPort, proberPath)
751}
752appReq, err := http.NewRequest(http.MethodGet, url, nil)
753if err != nil {
754log.Errorf("Failed to create request to probe app %v, original url %v", err, path)
755w.WriteHeader(http.StatusInternalServerError)
756return
757}
758
759appReq.Host = req.Host
760if host, port, err := net.SplitHostPort(req.Host); err == nil {
761port, _ := strconv.Atoi(port)
762// the port is same as the status port, then we need to replace the port in the host with the real one
763if port == int(s.statusPort) {
764realPort := strconv.Itoa(prober.HTTPGet.Port.IntValue())
765appReq.Host = net.JoinHostPort(host, realPort)
766}
767}
768// Forward incoming headers to the application.
769for name, values := range req.Header {
770appReq.Header[name] = slices.Clone(values)
771if len(values) > 0 && (name == "Host") {
772// Probe has specific host header override; honor it
773appReq.Host = values[0]
774}
775}
776
777// get the http client must exist because
778httpClient := s.appProbeClient[path]
779
780// Send the request.
781response, err := httpClient.Do(appReq)
782if err != nil {
783log.Errorf("Request to probe app failed: %v, original URL path = %v\napp URL path = %v", err, path, proberPath)
784w.WriteHeader(http.StatusInternalServerError)
785return
786}
787defer func() {
788// Drain and close the body to let the Transport reuse the connection
789_, _ = io.Copy(io.Discard, response.Body)
790_ = response.Body.Close()
791}()
792
793if isRedirect(response.StatusCode) { // Redirect
794// In other cases, we return the original status code. For redirects, it is illegal to
795// not have Location header, so we need to switch to just 200.
796w.WriteHeader(http.StatusOK)
797return
798}
799// We only write the status code to the response.
800w.WriteHeader(response.StatusCode)
801// Return the body from probe as well
802b, _ := k8sUtilIo.ReadAtMost(response.Body, maxRespBodyLength)
803_, _ = w.Write(b)
804}
805
806func (s *Server) handleAppProbeTCPSocket(w http.ResponseWriter, prober *Prober) {
807timeout := time.Duration(prober.TimeoutSeconds) * time.Second
808
809d := ProbeDialer()
810d.LocalAddr = s.upstreamLocalAddress
811d.Timeout = timeout
812
813conn, err := d.Dial("tcp", net.JoinHostPort(s.appProbersDestination, strconv.Itoa(prober.TCPSocket.Port.IntValue())))
814if err != nil {
815w.WriteHeader(http.StatusInternalServerError)
816} else {
817w.WriteHeader(http.StatusOK)
818err = conn.Close()
819if err != nil {
820log.Infof("tcp connection is not closed: %v", err)
821}
822}
823}
824
825func (s *Server) handleAppProbeGRPC(w http.ResponseWriter, req *http.Request, prober *Prober) {
826timeout := time.Duration(prober.TimeoutSeconds) * time.Second
827// the DialOptions are referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L55-L59
828opts := []grpc.DialOption{
829grpc.WithBlock(),
830grpc.WithTransportCredentials(insecure.NewCredentials()), // credentials are currently not supported
831grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
832d := ProbeDialer()
833d.LocalAddr = s.upstreamLocalAddress
834d.Timeout = timeout
835return d.DialContext(ctx, "tcp", addr)
836}),
837}
838if userAgent := req.Header["User-Agent"]; len(userAgent) > 0 {
839// simulate kubelet
840// please refer to:
841// https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L56
842// https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/http/http.go#L103
843opts = append(opts, grpc.WithUserAgent(userAgent[0]))
844}
845
846ctx, cancel := context.WithTimeout(context.Background(), timeout)
847defer cancel()
848
849addr := net.JoinHostPort(s.appProbersDestination, strconv.Itoa(int(prober.GRPC.Port)))
850conn, err := grpc.DialContext(ctx, addr, opts...)
851if err != nil {
852log.Errorf("Failed to create grpc connection to probe app: %v", err)
853w.WriteHeader(http.StatusInternalServerError)
854return
855}
856defer conn.Close()
857
858var svc string
859if prober.GRPC.Service != nil {
860svc = *prober.GRPC.Service
861}
862grpcClient := grpcHealth.NewHealthClient(conn)
863resp, err := grpcClient.Check(ctx, &grpcHealth.HealthCheckRequest{
864Service: svc,
865})
866// the error handling is referenced from https://github.com/kubernetes/kubernetes/blob/v1.23.1/pkg/probe/grpc/grpc.go#L88-L106
867if err != nil {
868status, ok := grpcStatus.FromError(err)
869if ok {
870switch status.Code() {
871case codes.Unimplemented:
872log.Errorf("server does not implement the grpc health protocol (grpc.health.v1.Health): %v", err)
873case codes.DeadlineExceeded:
874log.Errorf("grpc request not finished within timeout: %v", err)
875default:
876log.Errorf("grpc probe failed: %v", err)
877}
878} else {
879log.Errorf("grpc probe failed: %v", err)
880}
881w.WriteHeader(http.StatusInternalServerError)
882return
883}
884
885if resp.GetStatus() == grpcHealth.HealthCheckResponse_SERVING {
886w.WriteHeader(http.StatusOK)
887return
888}
889w.WriteHeader(http.StatusInternalServerError)
890}
891
892func (s *Server) handleNdsz(w http.ResponseWriter, r *http.Request) {
893if !isRequestFromLocalhost(r) {
894http.Error(w, "Only requests from localhost are allowed", http.StatusForbidden)
895return
896}
897nametable := s.fetchDNS()
898if nametable == nil {
899// See https://golang.org/doc/faq#nil_error for why writeJSONProto cannot handle this
900w.WriteHeader(http.StatusNotFound)
901_, _ = w.Write([]byte(`{}`))
902return
903}
904writeJSONProto(w, nametable)
905}
906
907// writeJSONProto writes a protobuf to a json payload, handling content type, marshaling, and errors
908func writeJSONProto(w http.ResponseWriter, obj any) {
909w.Header().Set("Content-Type", "application/json")
910b, err := config.ToJSON(obj)
911if err != nil {
912w.WriteHeader(http.StatusInternalServerError)
913_, _ = w.Write([]byte(err.Error()))
914return
915}
916_, err = w.Write(b)
917if err != nil {
918w.WriteHeader(http.StatusInternalServerError)
919}
920}
921
922// notifyExit sends SIGTERM to itself
923func notifyExit() {
924p, err := os.FindProcess(os.Getpid())
925if err != nil {
926log.Error(err)
927}
928if err := p.Signal(syscall.SIGTERM); err != nil {
929log.Errorf("failed to send SIGTERM to self: %v", err)
930}
931}
932
933var defaultTransport = http.DefaultTransport.(*http.Transport)
934
935// SetTransportDefaults mirrors Kubernetes probe settings
936// https://github.com/kubernetes/kubernetes/blob/0153febd9f0098d4b8d0d484927710eaf899ef40/pkg/probe/http/http.go#L52
937func setTransportDefaults(t *http.Transport) (*http.Transport, error) {
938if !EnableHTTP2Probing {
939return t, nil
940}
941if t.TLSHandshakeTimeout == 0 {
942t.TLSHandshakeTimeout = defaultTransport.TLSHandshakeTimeout
943}
944if t.IdleConnTimeout == 0 {
945t.IdleConnTimeout = defaultTransport.IdleConnTimeout
946}
947t2, err := http2.ConfigureTransports(t)
948if err != nil {
949return nil, err
950}
951t2.ReadIdleTimeout = time.Duration(30) * time.Second
952t2.PingTimeout = time.Duration(15) * time.Second
953return t, nil
954}
955