kuma

Форк
0
/
client.go 
214 строк · 6.1 Кб
1
package admin
2

3
import (
4
	"context"
5
	"crypto/tls"
6
	"crypto/x509"
7
	"fmt"
8
	"io"
9
	"net"
10
	"net/http"
11
	"net/url"
12
	"time"
13

14
	envoy_admin_v3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
15
	"github.com/pkg/errors"
16

17
	"github.com/kumahq/kuma/pkg/core/ca"
18
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
19
	"github.com/kumahq/kuma/pkg/core/resources/manager"
20
	core_model "github.com/kumahq/kuma/pkg/core/resources/model"
21
	envoy_admin_tls "github.com/kumahq/kuma/pkg/envoy/admin/tls"
22
	util_proto "github.com/kumahq/kuma/pkg/util/proto"
23
)
24

25
type EnvoyAdminClient interface {
26
	PostQuit(ctx context.Context, dataplane *core_mesh.DataplaneResource) error
27

28
	Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error)
29
	Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error)
30
	ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error)
31
}
32

33
type envoyAdminClient struct {
34
	rm               manager.ResourceManager
35
	caManagers       ca.Managers
36
	defaultAdminPort uint32
37

38
	caCertPool *x509.CertPool
39
	clientCert *tls.Certificate
40
}
41

42
func NewEnvoyAdminClient(rm manager.ResourceManager, caManagers ca.Managers, adminPort uint32) EnvoyAdminClient {
43
	client := &envoyAdminClient{
44
		rm:               rm,
45
		caManagers:       caManagers,
46
		defaultAdminPort: adminPort,
47
	}
48
	return client
49
}
50

51
func (a *envoyAdminClient) buildHTTPClient(ctx context.Context) (*http.Client, error) {
52
	caCertPool, clientCert, err := a.mtlsCerts(ctx)
53
	if err != nil {
54
		return nil, err
55
	}
56

57
	c := &http.Client{
58
		Transport: &http.Transport{
59
			DialContext: (&net.Dialer{
60
				Timeout: 3 * time.Second,
61
			}).DialContext,
62
			TLSHandshakeTimeout: 3 * time.Second,
63
			TLSClientConfig: &tls.Config{
64
				MinVersion:   tls.VersionTLS12,
65
				RootCAs:      &caCertPool,
66
				Certificates: []tls.Certificate{clientCert},
67
			},
68
		},
69
		Timeout: 5 * time.Second,
70
	}
71
	return c, err
72
}
73

74
func (a *envoyAdminClient) mtlsCerts(ctx context.Context) (x509.CertPool, tls.Certificate, error) {
75
	if a.caCertPool == nil {
76
		ca, err := envoy_admin_tls.LoadCA(ctx, a.rm)
77
		if err != nil {
78
			return x509.CertPool{}, tls.Certificate{}, errors.Wrap(err, "could not load the CA")
79
		}
80
		caCertPool := x509.NewCertPool()
81
		caCert, err := x509.ParseCertificate(ca.Certificate[0])
82
		if err != nil {
83
			return x509.CertPool{}, tls.Certificate{}, errors.Wrap(err, "could not parse CA")
84
		}
85
		caCertPool.AddCert(caCert)
86

87
		pair, err := envoy_admin_tls.GenerateClientCert(ca)
88
		if err != nil {
89
			return x509.CertPool{}, tls.Certificate{}, errors.Wrap(err, "could not generate a client certificate")
90
		}
91
		clientCert, err := tls.X509KeyPair(pair.CertPEM, pair.KeyPEM)
92
		if err != nil {
93
			return x509.CertPool{}, tls.Certificate{}, errors.Wrap(err, "could not parse the client certificate")
94
		}
95

96
		// cache the certs, so we don't have to load the CA and generate them on every single request.
97
		// This means that if we want to change Envoy Admin CA, we need to restart all CP instances.
98
		a.caCertPool = caCertPool
99
		a.clientCert = &clientCert
100
	}
101
	return *a.caCertPool, *a.clientCert, nil
102
}
103

104
const (
105
	quitquitquit = "quitquitquit"
106
)
107

108
func (a *envoyAdminClient) PostQuit(ctx context.Context, dataplane *core_mesh.DataplaneResource) error {
109
	httpClient, err := a.buildHTTPClient(ctx)
110
	if err != nil {
111
		return err
112
	}
113

114
	url := fmt.Sprintf("https://%s/%s", dataplane.AdminAddress(a.defaultAdminPort), quitquitquit)
115
	request, err := http.NewRequestWithContext(ctx, "POST", url, nil)
116
	if err != nil {
117
		return err
118
	}
119

120
	// Envoy will not send back any response, so do we not check the response
121
	response, err := httpClient.Do(request)
122
	if errors.Is(err, io.EOF) {
123
		return nil // Envoy may not respond correctly for this request because it already started the shut-down process.
124
	}
125
	if err != nil {
126
		return errors.Wrapf(err, "unable to send POST to %s", quitquitquit)
127
	}
128
	defer response.Body.Close()
129

130
	if response.StatusCode != http.StatusOK {
131
		return errors.Errorf("envoy response [%d %s] [%s]", response.StatusCode, response.Status, response.Body)
132
	}
133

134
	return nil
135
}
136

137
func (a *envoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
138
	return a.executeRequest(ctx, proxy, "stats")
139
}
140

141
func (a *envoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
142
	return a.executeRequest(ctx, proxy, "clusters")
143
}
144

145
func (a *envoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
146
	configDump, err := a.executeRequest(ctx, proxy, "config_dump")
147
	if err != nil {
148
		return nil, err
149
	}
150

151
	cd := &envoy_admin_v3.ConfigDump{}
152
	if err := util_proto.FromJSON(configDump, cd); err != nil {
153
		return nil, err
154
	}
155

156
	if err := Sanitize(cd); err != nil {
157
		return nil, err
158
	}
159

160
	return util_proto.ToJSONIndent(cd, " ")
161
}
162

163
func (a *envoyAdminClient) executeRequest(ctx context.Context, proxy core_model.ResourceWithAddress, path string) ([]byte, error) {
164
	var httpClient *http.Client
165
	var err error
166
	u := &url.URL{}
167

168
	switch proxy.(type) {
169
	case *core_mesh.DataplaneResource:
170
		httpClient, err = a.buildHTTPClient(ctx)
171
		if err != nil {
172
			return nil, err
173
		}
174
		u.Scheme = "https"
175
	case *core_mesh.ZoneIngressResource, *core_mesh.ZoneEgressResource:
176
		httpClient, err = a.buildHTTPClient(ctx)
177
		if err != nil {
178
			return nil, err
179
		}
180
		u.Scheme = "https"
181
	default:
182
		return nil, errors.New("unsupported proxy type")
183
	}
184

185
	if host, _, err := net.SplitHostPort(proxy.AdminAddress(a.defaultAdminPort)); err == nil && host == "127.0.0.1" {
186
		httpClient = &http.Client{
187
			Timeout: 5 * time.Second,
188
		}
189
		u.Scheme = "http"
190
	}
191

192
	u.Host = proxy.AdminAddress(a.defaultAdminPort)
193
	u.Path = path
194
	request, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
195
	if err != nil {
196
		return nil, err
197
	}
198

199
	response, err := httpClient.Do(request)
200
	if err != nil {
201
		return nil, errors.Wrapf(err, "unable to send GET to %s", "config_dump")
202
	}
203
	defer response.Body.Close()
204

205
	if response.StatusCode != http.StatusOK {
206
		return nil, errors.Errorf("envoy response [%d %s] [%s]", response.StatusCode, response.Status, response.Body)
207
	}
208

209
	resp, err := io.ReadAll(response.Body)
210
	if err != nil {
211
		return nil, err
212
	}
213
	return resp, nil
214
}
215

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.