cubefs
253 строки · 7.0 Кб
1package sarama
2
3import (
4"encoding/binary"
5"errors"
6"fmt"
7"io"
8"math"
9"strings"
10"time"
11
12"github.com/jcmturner/gofork/encoding/asn1"
13"github.com/jcmturner/gokrb5/v8/asn1tools"
14"github.com/jcmturner/gokrb5/v8/gssapi"
15"github.com/jcmturner/gokrb5/v8/iana/chksumtype"
16"github.com/jcmturner/gokrb5/v8/iana/keyusage"
17"github.com/jcmturner/gokrb5/v8/messages"
18"github.com/jcmturner/gokrb5/v8/types"
19)
20
21const (
22TOK_ID_KRB_AP_REQ = 256
23GSS_API_GENERIC_TAG = 0x60
24KRB5_USER_AUTH = 1
25KRB5_KEYTAB_AUTH = 2
26GSS_API_INITIAL = 1
27GSS_API_VERIFY = 2
28GSS_API_FINISH = 3
29)
30
31type GSSAPIConfig struct {
32AuthType int
33KeyTabPath string
34KerberosConfigPath string
35ServiceName string
36Username string
37Password string
38Realm string
39DisablePAFXFAST bool
40}
41
42type GSSAPIKerberosAuth struct {
43Config *GSSAPIConfig
44ticket messages.Ticket
45encKey types.EncryptionKey
46NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
47step int
48}
49
50type KerberosClient interface {
51Login() error
52GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
53Domain() string
54CName() types.PrincipalName
55Destroy()
56}
57
58// writePackage appends length in big endian before the payload, and sends it to kafka
59func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
60length := uint64(len(payload))
61size := length + 4 // 4 byte length header + payload
62if size > math.MaxInt32 {
63return 0, errors.New("payload too large, will overflow int32")
64}
65finalPackage := make([]byte, size)
66copy(finalPackage[4:], payload)
67binary.BigEndian.PutUint32(finalPackage, uint32(length))
68bytes, err := broker.conn.Write(finalPackage)
69if err != nil {
70return bytes, err
71}
72return bytes, nil
73}
74
75// readPackage reads payload length (4 bytes) and then reads the payload into []byte
76func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
77bytesRead := 0
78lengthInBytes := make([]byte, 4)
79bytes, err := io.ReadFull(broker.conn, lengthInBytes)
80if err != nil {
81return nil, bytesRead, err
82}
83bytesRead += bytes
84payloadLength := binary.BigEndian.Uint32(lengthInBytes)
85payloadBytes := make([]byte, payloadLength) // buffer for read..
86bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
87if err != nil {
88return payloadBytes, bytesRead, err
89}
90bytesRead += bytes
91return payloadBytes, bytesRead, nil
92}
93
94func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
95a := make([]byte, 24)
96flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
97binary.LittleEndian.PutUint32(a[:4], 16)
98for _, i := range flags {
99f := binary.LittleEndian.Uint32(a[20:24])
100f |= uint32(i)
101binary.LittleEndian.PutUint32(a[20:24], f)
102}
103return a
104}
105
106/*
107*
108* Construct Kerberos AP_REQ package, conforming to RFC-4120
109* https://tools.ietf.org/html/rfc4120#page-84
110*
111*/
112func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
113domain string, cname types.PrincipalName,
114ticket messages.Ticket,
115sessionKey types.EncryptionKey) ([]byte, error) {
116auth, err := types.NewAuthenticator(domain, cname)
117if err != nil {
118return nil, err
119}
120auth.Cksum = types.Checksum{
121CksumType: chksumtype.GSSAPI,
122Checksum: krbAuth.newAuthenticatorChecksum(),
123}
124APReq, err := messages.NewAPReq(
125ticket,
126sessionKey,
127auth,
128)
129if err != nil {
130return nil, err
131}
132aprBytes := make([]byte, 2)
133binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
134tb, err := APReq.Marshal()
135if err != nil {
136return nil, err
137}
138aprBytes = append(aprBytes, tb...)
139return aprBytes, nil
140}
141
142/*
143*
144* Append the GSS-API header to the payload, conforming to RFC-2743
145* Section 3.1, Mechanism-Independent Token Format
146*
147* https://tools.ietf.org/html/rfc2743#page-81
148*
149* GSSAPIHeader + <specific mechanism payload>
150*
151*/
152func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
153oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
154if err != nil {
155return nil, err
156}
157tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
158GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
159GSSHeader = append(GSSHeader, oidBytes...)
160GSSPackage := append(GSSHeader, payload...)
161return GSSPackage, nil
162}
163
164func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
165switch krbAuth.step {
166case GSS_API_INITIAL:
167aprBytes, err := krbAuth.createKrb5Token(
168kerberosClient.Domain(),
169kerberosClient.CName(),
170krbAuth.ticket,
171krbAuth.encKey)
172if err != nil {
173return nil, err
174}
175krbAuth.step = GSS_API_VERIFY
176return krbAuth.appendGSSAPIHeader(aprBytes)
177case GSS_API_VERIFY:
178wrapTokenReq := gssapi.WrapToken{}
179if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
180return nil, err
181}
182// Validate response.
183isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
184if !isValid {
185return nil, err
186}
187
188wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
189if err != nil {
190return nil, err
191}
192krbAuth.step = GSS_API_FINISH
193return wrapTokenResponse.Marshal()
194}
195return nil, nil
196}
197
198/* This does the handshake for authorization */
199func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
200kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
201if err != nil {
202Logger.Printf("Kerberos client error: %s", err)
203return err
204}
205
206err = kerberosClient.Login()
207if err != nil {
208Logger.Printf("Kerberos client error: %s", err)
209return err
210}
211// Construct SPN using serviceName and host
212// SPN format: <SERVICE>/<FQDN>
213
214host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
215spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
216
217ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
218if err != nil {
219Logger.Printf("Error getting Kerberos service ticket : %s", err)
220return err
221}
222krbAuth.ticket = ticket
223krbAuth.encKey = encKey
224krbAuth.step = GSS_API_INITIAL
225var receivedBytes []byte = nil
226defer kerberosClient.Destroy()
227for {
228packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
229if err != nil {
230Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
231return err
232}
233requestTime := time.Now()
234bytesWritten, err := krbAuth.writePackage(broker, packBytes)
235if err != nil {
236Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
237return err
238}
239broker.updateOutgoingCommunicationMetrics(bytesWritten)
240if krbAuth.step == GSS_API_VERIFY {
241bytesRead := 0
242receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
243requestLatency := time.Since(requestTime)
244broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
245if err != nil {
246Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
247return err
248}
249} else if krbAuth.step == GSS_API_FINISH {
250return nil
251}
252}
253}
254