kuma

Форк
0
/
callbacks.go 
165 строк · 5.4 Кб
1
package authn
2

3
import (
4
	"context"
5
	"sync"
6
	"time"
7

8
	envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3"
9
	"github.com/pkg/errors"
10
	"github.com/sethvargo/go-retry"
11
	"google.golang.org/grpc/metadata"
12

13
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
14
	core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
15
	core_store "github.com/kumahq/kuma/pkg/core/resources/store"
16
	"github.com/kumahq/kuma/pkg/core/user"
17
	core_xds "github.com/kumahq/kuma/pkg/core/xds"
18
	hds_callbacks "github.com/kumahq/kuma/pkg/hds/callbacks"
19
	xds_auth "github.com/kumahq/kuma/pkg/xds/auth"
20
)
21

22
const authorization = "authorization"
23

24
// Inspired by pkg/xds/auth/callbacks.go
25

26
type DPNotFoundRetry struct {
27
	Backoff  time.Duration
28
	MaxTimes uint
29
}
30

31
func NewCallbacks(resManager core_manager.ResourceManager, authenticator xds_auth.Authenticator, dpNotFoundRetry DPNotFoundRetry) hds_callbacks.Callbacks {
32
	if dpNotFoundRetry.Backoff == 0 { // backoff cannot be 0
33
		dpNotFoundRetry.Backoff = 1 * time.Millisecond
34
	}
35
	return &authn{
36
		resManager:      resManager,
37
		authenticator:   authenticator,
38
		contexts:        map[core_xds.StreamID]context.Context{},
39
		authenticated:   map[core_xds.StreamID]string{},
40
		dpNotFoundRetry: dpNotFoundRetry,
41
	}
42
}
43

44
type authn struct {
45
	resManager      core_manager.ResourceManager
46
	authenticator   xds_auth.Authenticator
47
	dpNotFoundRetry DPNotFoundRetry
48

49
	sync.RWMutex // protects contexts and authenticated
50
	// contexts stores context for every stream, since Context from which we can extract auth data is only available in OnStreamOpen
51
	contexts map[core_xds.StreamID]context.Context
52
	// authenticated stores authenticated ProxyID for stream. We don't want to authenticate every because since on K8S we execute ReviewToken which is expensive
53
	// as long as client won't change ProxyID it's safe to authenticate only once.
54
	authenticated map[core_xds.StreamID]string
55
}
56

57
var _ hds_callbacks.Callbacks = &authn{}
58

59
func (a *authn) OnStreamOpen(ctx context.Context, streamID int64) error {
60
	a.Lock()
61
	defer a.Unlock()
62

63
	a.contexts[streamID] = ctx
64
	return nil
65
}
66

67
func (a *authn) OnHealthCheckRequest(streamID int64, req *envoy_service_health.HealthCheckRequest) error {
68
	if id, alreadyAuthenticated := a.authNodeId(streamID); alreadyAuthenticated {
69
		if req.GetNode().GetId() != "" && req.GetNode().GetId() != id {
70
			return errors.Errorf("stream was authenticated for ID %s. Received request is for node with ID %s. Node ID cannot be changed after stream is initialized", id, req.GetNode().GetId())
71
		}
72
		return nil
73
	}
74

75
	credential, err := a.credential(streamID)
76
	if err != nil {
77
		return err
78
	}
79
	err = a.authenticate(credential, req.GetNode().GetId())
80
	if err != nil {
81
		return err
82
	}
83
	a.Lock()
84
	a.authenticated[streamID] = req.GetNode().GetId()
85
	a.Unlock()
86
	return nil
87
}
88

89
func (a *authn) OnEndpointHealthResponse(streamID int64, _ *envoy_service_health.EndpointHealthResponse) error {
90
	if id, alreadyAuthenticated := a.authNodeId(streamID); !alreadyAuthenticated {
91
		return errors.Errorf("stream was not authenticated for ID %s", id)
92
	}
93
	return nil
94
}
95

96
func (a *authn) OnStreamClosed(streamID int64) {
97
	a.Lock()
98
	delete(a.contexts, streamID)
99
	delete(a.authenticated, streamID)
100
	a.Unlock()
101
}
102

103
func (a *authn) authNodeId(streamID core_xds.StreamID) (string, bool) {
104
	a.RLock()
105
	defer a.RUnlock()
106
	id, ok := a.authenticated[streamID]
107
	return id, ok
108
}
109

110
func (a *authn) credential(streamID core_xds.StreamID) (xds_auth.Credential, error) {
111
	a.RLock()
112
	defer a.RUnlock()
113

114
	ctx, exists := a.contexts[streamID]
115
	if !exists {
116
		return "", errors.Errorf("there is no context for stream ID %d", streamID)
117
	}
118
	credential, err := extractCredential(ctx)
119
	if err != nil {
120
		return "", errors.Wrap(err, "could not extract credential from DiscoveryRequest")
121
	}
122
	return credential, err
123
}
124

125
func extractCredential(ctx context.Context) (xds_auth.Credential, error) {
126
	metadata, ok := metadata.FromIncomingContext(ctx)
127
	if !ok {
128
		return "", errors.Errorf("request has no metadata")
129
	}
130
	if values, ok := metadata[authorization]; ok {
131
		if len(values) != 1 {
132
			return "", errors.Errorf("request must have exactly 1 %q header, got %d", authorization, len(values))
133
		}
134
		return values[0], nil
135
	}
136
	return "", nil
137
}
138

139
func (a *authn) authenticate(credential xds_auth.Credential, nodeID string) error {
140
	ctx := user.Ctx(context.TODO(), user.ControlPlane)
141
	dataplane := core_mesh.NewDataplaneResource()
142

143
	proxyId, err := core_xds.ParseProxyIdFromString(nodeID)
144
	if err != nil {
145
		return errors.Wrap(err, "HDS request must have a valid Proxy Id")
146
	}
147
	// Retry on DP not found because HDS is initiated in the parallel with XDS.
148
	// It is very likely that Dataplane is not yet created.
149
	// We could just close the stream with an error and Envoy would retry, but to have better UX (not printing confusing logs) it's better to retry
150
	err = retry.Do(ctx, retry.WithMaxRetries(uint64(a.dpNotFoundRetry.MaxTimes), retry.NewConstant(a.dpNotFoundRetry.Backoff)), func(ctx context.Context) error {
151
		err := a.resManager.Get(ctx, dataplane, core_store.GetBy(proxyId.ToResourceKey()))
152
		if core_store.IsResourceNotFound(err) {
153
			return retry.RetryableError(errors.New("dataplane not found. Create Dataplane in Kuma CP first or pass it as an argument to kuma-dp"))
154
		}
155
		return err
156
	})
157
	if err != nil {
158
		return err
159
	}
160

161
	if err := a.authenticator.Authenticate(ctx, dataplane, credential); err != nil {
162
		return errors.Wrap(err, "authentication failed")
163
	}
164
	return nil
165
}
166

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.