8
envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3"
9
"github.com/pkg/errors"
10
"github.com/sethvargo/go-retry"
11
"google.golang.org/grpc/metadata"
13
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
14
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
15
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
16
"github.com/kumahq/kuma/pkg/core/user"
17
core_xds "github.com/kumahq/kuma/pkg/core/xds"
18
hds_callbacks "github.com/kumahq/kuma/pkg/hds/callbacks"
19
xds_auth "github.com/kumahq/kuma/pkg/xds/auth"
22
const authorization = "authorization"
24
// Inspired by pkg/xds/auth/callbacks.go
26
type DPNotFoundRetry struct {
31
func NewCallbacks(resManager core_manager.ResourceManager, authenticator xds_auth.Authenticator, dpNotFoundRetry DPNotFoundRetry) hds_callbacks.Callbacks {
32
if dpNotFoundRetry.Backoff == 0 { // backoff cannot be 0
33
dpNotFoundRetry.Backoff = 1 * time.Millisecond
36
resManager: resManager,
37
authenticator: authenticator,
38
contexts: map[core_xds.StreamID]context.Context{},
39
authenticated: map[core_xds.StreamID]string{},
40
dpNotFoundRetry: dpNotFoundRetry,
45
resManager core_manager.ResourceManager
46
authenticator xds_auth.Authenticator
47
dpNotFoundRetry DPNotFoundRetry
49
sync.RWMutex // protects contexts and authenticated
50
// contexts stores context for every stream, since Context from which we can extract auth data is only available in OnStreamOpen
51
contexts map[core_xds.StreamID]context.Context
52
// authenticated stores authenticated ProxyID for stream. We don't want to authenticate every because since on K8S we execute ReviewToken which is expensive
53
// as long as client won't change ProxyID it's safe to authenticate only once.
54
authenticated map[core_xds.StreamID]string
57
var _ hds_callbacks.Callbacks = &authn{}
59
func (a *authn) OnStreamOpen(ctx context.Context, streamID int64) error {
63
a.contexts[streamID] = ctx
67
func (a *authn) OnHealthCheckRequest(streamID int64, req *envoy_service_health.HealthCheckRequest) error {
68
if id, alreadyAuthenticated := a.authNodeId(streamID); alreadyAuthenticated {
69
if req.GetNode().GetId() != "" && req.GetNode().GetId() != id {
70
return errors.Errorf("stream was authenticated for ID %s. Received request is for node with ID %s. Node ID cannot be changed after stream is initialized", id, req.GetNode().GetId())
75
credential, err := a.credential(streamID)
79
err = a.authenticate(credential, req.GetNode().GetId())
84
a.authenticated[streamID] = req.GetNode().GetId()
89
func (a *authn) OnEndpointHealthResponse(streamID int64, _ *envoy_service_health.EndpointHealthResponse) error {
90
if id, alreadyAuthenticated := a.authNodeId(streamID); !alreadyAuthenticated {
91
return errors.Errorf("stream was not authenticated for ID %s", id)
96
func (a *authn) OnStreamClosed(streamID int64) {
98
delete(a.contexts, streamID)
99
delete(a.authenticated, streamID)
103
func (a *authn) authNodeId(streamID core_xds.StreamID) (string, bool) {
106
id, ok := a.authenticated[streamID]
110
func (a *authn) credential(streamID core_xds.StreamID) (xds_auth.Credential, error) {
114
ctx, exists := a.contexts[streamID]
116
return "", errors.Errorf("there is no context for stream ID %d", streamID)
118
credential, err := extractCredential(ctx)
120
return "", errors.Wrap(err, "could not extract credential from DiscoveryRequest")
122
return credential, err
125
func extractCredential(ctx context.Context) (xds_auth.Credential, error) {
126
metadata, ok := metadata.FromIncomingContext(ctx)
128
return "", errors.Errorf("request has no metadata")
130
if values, ok := metadata[authorization]; ok {
131
if len(values) != 1 {
132
return "", errors.Errorf("request must have exactly 1 %q header, got %d", authorization, len(values))
134
return values[0], nil
139
func (a *authn) authenticate(credential xds_auth.Credential, nodeID string) error {
140
ctx := user.Ctx(context.TODO(), user.ControlPlane)
141
dataplane := core_mesh.NewDataplaneResource()
143
proxyId, err := core_xds.ParseProxyIdFromString(nodeID)
145
return errors.Wrap(err, "HDS request must have a valid Proxy Id")
147
// Retry on DP not found because HDS is initiated in the parallel with XDS.
148
// It is very likely that Dataplane is not yet created.
149
// We could just close the stream with an error and Envoy would retry, but to have better UX (not printing confusing logs) it's better to retry
150
err = retry.Do(ctx, retry.WithMaxRetries(uint64(a.dpNotFoundRetry.MaxTimes), retry.NewConstant(a.dpNotFoundRetry.Backoff)), func(ctx context.Context) error {
151
err := a.resManager.Get(ctx, dataplane, core_store.GetBy(proxyId.ToResourceKey()))
152
if core_store.IsResourceNotFound(err) {
153
return retry.RetryableError(errors.New("dataplane not found. Create Dataplane in Kuma CP first or pass it as an argument to kuma-dp"))
161
if err := a.authenticator.Authenticate(ctx, dataplane, credential); err != nil {
162
return errors.Wrap(err, "authentication failed")