cubefs

Форк
0
422 строки · 10.3 Кб
1
/*
2
 *
3
 * Copyright 2018 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package binarylog
20

21
import (
22
	"net"
23
	"strings"
24
	"sync/atomic"
25
	"time"
26

27
	"github.com/golang/protobuf/proto"
28
	"github.com/golang/protobuf/ptypes"
29
	pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
30
	"google.golang.org/grpc/metadata"
31
	"google.golang.org/grpc/status"
32
)
33

34
type callIDGenerator struct {
35
	id uint64
36
}
37

38
func (g *callIDGenerator) next() uint64 {
39
	id := atomic.AddUint64(&g.id, 1)
40
	return id
41
}
42

43
// reset is for testing only, and doesn't need to be thread safe.
44
func (g *callIDGenerator) reset() {
45
	g.id = 0
46
}
47

48
var idGen callIDGenerator
49

50
// MethodLogger is the sub-logger for each method.
51
type MethodLogger struct {
52
	headerMaxLen, messageMaxLen uint64
53

54
	callID          uint64
55
	idWithinCallGen *callIDGenerator
56

57
	sink Sink // TODO(blog): make this plugable.
58
}
59

60
func newMethodLogger(h, m uint64) *MethodLogger {
61
	return &MethodLogger{
62
		headerMaxLen:  h,
63
		messageMaxLen: m,
64

65
		callID:          idGen.next(),
66
		idWithinCallGen: &callIDGenerator{},
67

68
		sink: DefaultSink, // TODO(blog): make it plugable.
69
	}
70
}
71

72
// Log creates a proto binary log entry, and logs it to the sink.
73
func (ml *MethodLogger) Log(c LogEntryConfig) {
74
	m := c.toProto()
75
	timestamp, _ := ptypes.TimestampProto(time.Now())
76
	m.Timestamp = timestamp
77
	m.CallId = ml.callID
78
	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
79

80
	switch pay := m.Payload.(type) {
81
	case *pb.GrpcLogEntry_ClientHeader:
82
		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
83
	case *pb.GrpcLogEntry_ServerHeader:
84
		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
85
	case *pb.GrpcLogEntry_Message:
86
		m.PayloadTruncated = ml.truncateMessage(pay.Message)
87
	}
88

89
	ml.sink.Write(m)
90
}
91

92
func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
93
	if ml.headerMaxLen == maxUInt {
94
		return false
95
	}
96
	var (
97
		bytesLimit = ml.headerMaxLen
98
		index      int
99
	)
100
	// At the end of the loop, index will be the first entry where the total
101
	// size is greater than the limit:
102
	//
103
	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
104
	for ; index < len(mdPb.Entry); index++ {
105
		entry := mdPb.Entry[index]
106
		if entry.Key == "grpc-trace-bin" {
107
			// "grpc-trace-bin" is a special key. It's kept in the log entry,
108
			// but not counted towards the size limit.
109
			continue
110
		}
111
		currentEntryLen := uint64(len(entry.Value))
112
		if currentEntryLen > bytesLimit {
113
			break
114
		}
115
		bytesLimit -= currentEntryLen
116
	}
117
	truncated = index < len(mdPb.Entry)
118
	mdPb.Entry = mdPb.Entry[:index]
119
	return truncated
120
}
121

122
func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
123
	if ml.messageMaxLen == maxUInt {
124
		return false
125
	}
126
	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
127
		return false
128
	}
129
	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
130
	return true
131
}
132

133
// LogEntryConfig represents the configuration for binary log entry.
134
type LogEntryConfig interface {
135
	toProto() *pb.GrpcLogEntry
136
}
137

138
// ClientHeader configs the binary log entry to be a ClientHeader entry.
139
type ClientHeader struct {
140
	OnClientSide bool
141
	Header       metadata.MD
142
	MethodName   string
143
	Authority    string
144
	Timeout      time.Duration
145
	// PeerAddr is required only when it's on server side.
146
	PeerAddr net.Addr
147
}
148

149
func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
150
	// This function doesn't need to set all the fields (e.g. seq ID). The Log
151
	// function will set the fields when necessary.
152
	clientHeader := &pb.ClientHeader{
153
		Metadata:   mdToMetadataProto(c.Header),
154
		MethodName: c.MethodName,
155
		Authority:  c.Authority,
156
	}
157
	if c.Timeout > 0 {
158
		clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
159
	}
160
	ret := &pb.GrpcLogEntry{
161
		Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
162
		Payload: &pb.GrpcLogEntry_ClientHeader{
163
			ClientHeader: clientHeader,
164
		},
165
	}
166
	if c.OnClientSide {
167
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
168
	} else {
169
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
170
	}
171
	if c.PeerAddr != nil {
172
		ret.Peer = addrToProto(c.PeerAddr)
173
	}
174
	return ret
175
}
176

177
// ServerHeader configs the binary log entry to be a ServerHeader entry.
178
type ServerHeader struct {
179
	OnClientSide bool
180
	Header       metadata.MD
181
	// PeerAddr is required only when it's on client side.
182
	PeerAddr net.Addr
183
}
184

185
func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
186
	ret := &pb.GrpcLogEntry{
187
		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
188
		Payload: &pb.GrpcLogEntry_ServerHeader{
189
			ServerHeader: &pb.ServerHeader{
190
				Metadata: mdToMetadataProto(c.Header),
191
			},
192
		},
193
	}
194
	if c.OnClientSide {
195
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
196
	} else {
197
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
198
	}
199
	if c.PeerAddr != nil {
200
		ret.Peer = addrToProto(c.PeerAddr)
201
	}
202
	return ret
203
}
204

205
// ClientMessage configs the binary log entry to be a ClientMessage entry.
206
type ClientMessage struct {
207
	OnClientSide bool
208
	// Message can be a proto.Message or []byte. Other messages formats are not
209
	// supported.
210
	Message interface{}
211
}
212

213
func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
214
	var (
215
		data []byte
216
		err  error
217
	)
218
	if m, ok := c.Message.(proto.Message); ok {
219
		data, err = proto.Marshal(m)
220
		if err != nil {
221
			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
222
		}
223
	} else if b, ok := c.Message.([]byte); ok {
224
		data = b
225
	} else {
226
		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
227
	}
228
	ret := &pb.GrpcLogEntry{
229
		Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
230
		Payload: &pb.GrpcLogEntry_Message{
231
			Message: &pb.Message{
232
				Length: uint32(len(data)),
233
				Data:   data,
234
			},
235
		},
236
	}
237
	if c.OnClientSide {
238
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
239
	} else {
240
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
241
	}
242
	return ret
243
}
244

245
// ServerMessage configs the binary log entry to be a ServerMessage entry.
246
type ServerMessage struct {
247
	OnClientSide bool
248
	// Message can be a proto.Message or []byte. Other messages formats are not
249
	// supported.
250
	Message interface{}
251
}
252

253
func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
254
	var (
255
		data []byte
256
		err  error
257
	)
258
	if m, ok := c.Message.(proto.Message); ok {
259
		data, err = proto.Marshal(m)
260
		if err != nil {
261
			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
262
		}
263
	} else if b, ok := c.Message.([]byte); ok {
264
		data = b
265
	} else {
266
		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
267
	}
268
	ret := &pb.GrpcLogEntry{
269
		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
270
		Payload: &pb.GrpcLogEntry_Message{
271
			Message: &pb.Message{
272
				Length: uint32(len(data)),
273
				Data:   data,
274
			},
275
		},
276
	}
277
	if c.OnClientSide {
278
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
279
	} else {
280
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
281
	}
282
	return ret
283
}
284

285
// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
286
type ClientHalfClose struct {
287
	OnClientSide bool
288
}
289

290
func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
291
	ret := &pb.GrpcLogEntry{
292
		Type:    pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
293
		Payload: nil, // No payload here.
294
	}
295
	if c.OnClientSide {
296
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
297
	} else {
298
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
299
	}
300
	return ret
301
}
302

303
// ServerTrailer configs the binary log entry to be a ServerTrailer entry.
304
type ServerTrailer struct {
305
	OnClientSide bool
306
	Trailer      metadata.MD
307
	// Err is the status error.
308
	Err error
309
	// PeerAddr is required only when it's on client side and the RPC is trailer
310
	// only.
311
	PeerAddr net.Addr
312
}
313

314
func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
315
	st, ok := status.FromError(c.Err)
316
	if !ok {
317
		grpclogLogger.Info("binarylogging: error in trailer is not a status error")
318
	}
319
	var (
320
		detailsBytes []byte
321
		err          error
322
	)
323
	stProto := st.Proto()
324
	if stProto != nil && len(stProto.Details) != 0 {
325
		detailsBytes, err = proto.Marshal(stProto)
326
		if err != nil {
327
			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
328
		}
329
	}
330
	ret := &pb.GrpcLogEntry{
331
		Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
332
		Payload: &pb.GrpcLogEntry_Trailer{
333
			Trailer: &pb.Trailer{
334
				Metadata:      mdToMetadataProto(c.Trailer),
335
				StatusCode:    uint32(st.Code()),
336
				StatusMessage: st.Message(),
337
				StatusDetails: detailsBytes,
338
			},
339
		},
340
	}
341
	if c.OnClientSide {
342
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
343
	} else {
344
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
345
	}
346
	if c.PeerAddr != nil {
347
		ret.Peer = addrToProto(c.PeerAddr)
348
	}
349
	return ret
350
}
351

352
// Cancel configs the binary log entry to be a Cancel entry.
353
type Cancel struct {
354
	OnClientSide bool
355
}
356

357
func (c *Cancel) toProto() *pb.GrpcLogEntry {
358
	ret := &pb.GrpcLogEntry{
359
		Type:    pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
360
		Payload: nil,
361
	}
362
	if c.OnClientSide {
363
		ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
364
	} else {
365
		ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
366
	}
367
	return ret
368
}
369

370
// metadataKeyOmit returns whether the metadata entry with this key should be
371
// omitted.
372
func metadataKeyOmit(key string) bool {
373
	switch key {
374
	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
375
		return true
376
	case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
377
		return false
378
	}
379
	return strings.HasPrefix(key, "grpc-")
380
}
381

382
func mdToMetadataProto(md metadata.MD) *pb.Metadata {
383
	ret := &pb.Metadata{}
384
	for k, vv := range md {
385
		if metadataKeyOmit(k) {
386
			continue
387
		}
388
		for _, v := range vv {
389
			ret.Entry = append(ret.Entry,
390
				&pb.MetadataEntry{
391
					Key:   k,
392
					Value: []byte(v),
393
				},
394
			)
395
		}
396
	}
397
	return ret
398
}
399

400
func addrToProto(addr net.Addr) *pb.Address {
401
	ret := &pb.Address{}
402
	switch a := addr.(type) {
403
	case *net.TCPAddr:
404
		if a.IP.To4() != nil {
405
			ret.Type = pb.Address_TYPE_IPV4
406
		} else if a.IP.To16() != nil {
407
			ret.Type = pb.Address_TYPE_IPV6
408
		} else {
409
			ret.Type = pb.Address_TYPE_UNKNOWN
410
			// Do not set address and port fields.
411
			break
412
		}
413
		ret.Address = a.IP.String()
414
		ret.IpPort = uint32(a.Port)
415
	case *net.UnixAddr:
416
		ret.Type = pb.Address_TYPE_UNIX
417
		ret.Address = a.String()
418
	default:
419
		ret.Type = pb.Address_TYPE_UNKNOWN
420
	}
421
	return ret
422
}
423

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.