3
* Copyright 2018 gRPC authors.
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
* See the License for the specific language governing permissions and
15
* limitations under the License.
19
// Package channelz defines APIs for enabling channelz service, entry
20
// registration/deletion, and accessing channelz data. It also defines channelz
21
// metric struct formats.
23
// All APIs in this package are experimental.
33
"google.golang.org/grpc/grpclog"
37
defaultMaxTraceEntry int32 = 30
43
// EntryPerPage defines the number of channelz entries to be shown on a web page.
44
EntryPerPage = int64(50)
46
maxTraceEntry = defaultMaxTraceEntry
49
// TurnOn turns on channelz data collection.
53
atomic.StoreInt32(&curState, 1)
57
// IsOn returns whether channelz data collection is on.
59
return atomic.CompareAndSwapInt32(&curState, 1, 1)
62
// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
63
// Setting it to 0 will disable channel tracing.
64
func SetMaxTraceEntry(i int32) {
65
atomic.StoreInt32(&maxTraceEntry, i)
68
// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
69
func ResetMaxTraceEntryToDefault() {
70
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
73
func getMaxTraceEntry() int {
74
i := atomic.LoadInt32(&maxTraceEntry)
78
// dbWarpper wraps around a reference to internal channelz data storage, and
79
// provide synchronized functionality to set and get the reference.
80
type dbWrapper struct {
85
func (d *dbWrapper) set(db *channelMap) {
91
func (d *dbWrapper) get() *channelMap {
97
// NewChannelzStorage initializes channelz data storage and id generator.
99
// This function returns a cleanup function to wait for all channelz state to be reset by the
100
// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
101
// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
102
// to remove some entity just register by the new test, since the id space is the same.
104
// Note: This function is exported for testing purpose only. User should not call
106
func NewChannelzStorage() (cleanup func() error) {
108
topLevelChannels: make(map[int64]struct{}),
109
channels: make(map[int64]*channel),
110
listenSockets: make(map[int64]*listenSocket),
111
normalSockets: make(map[int64]*normalSocket),
112
servers: make(map[int64]*server),
113
subChannels: make(map[int64]*subChannel),
116
return func() error {
122
for i := 0; i < 1000; i++ {
124
if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
126
// all things stored in the channelz map have been cleared.
130
time.Sleep(10 * time.Millisecond)
134
err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
140
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
141
// boolean indicating whether there's more top channels to be queried for.
143
// The arg id specifies that only top channel with id at or above it will be included
144
// in the result. The returned slice is up to a length of the arg maxResults or
145
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
146
func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
147
return db.get().GetTopChannels(id, maxResults)
150
// GetServers returns a slice of server's ServerMetric, along with a
151
// boolean indicating whether there's more servers to be queried for.
153
// The arg id specifies that only server with id at or above it will be included
154
// in the result. The returned slice is up to a length of the arg maxResults or
155
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
156
func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
157
return db.get().GetServers(id, maxResults)
160
// GetServerSockets returns a slice of server's (identified by id) normal socket's
161
// SocketMetric, along with a boolean indicating whether there's more sockets to
164
// The arg startID specifies that only sockets with id at or above it will be
165
// included in the result. The returned slice is up to a length of the arg maxResults
166
// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
167
func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
168
return db.get().GetServerSockets(id, startID, maxResults)
171
// GetChannel returns the ChannelMetric for the channel (identified by id).
172
func GetChannel(id int64) *ChannelMetric {
173
return db.get().GetChannel(id)
176
// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
177
func GetSubChannel(id int64) *SubChannelMetric {
178
return db.get().GetSubChannel(id)
181
// GetSocket returns the SocketInternalMetric for the socket (identified by id).
182
func GetSocket(id int64) *SocketMetric {
183
return db.get().GetSocket(id)
186
// GetServer returns the ServerMetric for the server (identified by id).
187
func GetServer(id int64) *ServerMetric {
188
return db.get().GetServer(id)
191
// RegisterChannel registers the given channel c in channelz database with ref
192
// as its reference name, and add it to the child list of its parent (identified
193
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
194
// assigned to this channel.
195
func RegisterChannel(c Channel, pid int64, ref string) int64 {
200
subChans: make(map[int64]string),
201
nestedChans: make(map[int64]string),
204
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
207
db.get().addChannel(id, cn, true, pid, ref)
209
db.get().addChannel(id, cn, false, pid, ref)
214
// RegisterSubChannel registers the given channel c in channelz database with ref
215
// as its reference name, and add it to the child list of its parent (identified
216
// by pid). It returns the unique channelz tracking id assigned to this subchannel.
217
func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
219
logger.Error("a SubChannel's parent id cannot be 0")
226
sockets: make(map[int64]string),
229
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
231
db.get().addSubChannel(id, sc, pid, ref)
235
// RegisterServer registers the given server s in channelz database. It returns
236
// the unique channelz tracking id assigned to this server.
237
func RegisterServer(s Server, ref string) int64 {
242
sockets: make(map[int64]string),
243
listenSockets: make(map[int64]string),
246
db.get().addServer(id, svr)
250
// RegisterListenSocket registers the given listen socket s in channelz database
251
// with ref as its reference name, and add it to the child list of its parent
252
// (identified by pid). It returns the unique channelz tracking id assigned to
253
// this listen socket.
254
func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
256
logger.Error("a ListenSocket's parent id cannot be 0")
260
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
261
db.get().addListenSocket(id, ls, pid, ref)
265
// RegisterNormalSocket registers the given normal socket s in channelz database
266
// with ref as its reference name, and add it to the child list of its parent
267
// (identified by pid). It returns the unique channelz tracking id assigned to
268
// this normal socket.
269
func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
271
logger.Error("a NormalSocket's parent id cannot be 0")
275
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
276
db.get().addNormalSocket(id, ns, pid, ref)
280
// RemoveEntry removes an entry with unique channelz trakcing id to be id from
282
func RemoveEntry(id int64) {
283
db.get().removeEntry(id)
286
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
287
// to the channel trace.
288
// The Parent field is optional. It is used for event that will be recorded in the entity's parent
290
type TraceEventDesc struct {
293
Parent *TraceEventDesc
296
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
297
func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEventDesc) {
298
for d := desc; d != nil; d = d.Parent {
300
case CtUnknown, CtInfo:
301
l.InfoDepth(depth+1, d.Desc)
303
l.WarningDepth(depth+1, d.Desc)
305
l.ErrorDepth(depth+1, d.Desc)
308
if getMaxTraceEntry() == 0 {
311
db.get().traceEvent(id, desc)
314
// channelMap is the storage data structure for channelz.
315
// Methods of channelMap can be divided in two two categories with respect to locking.
316
// 1. Methods acquire the global lock.
317
// 2. Methods that can only be called when global lock is held.
318
// A second type of method need always to be called inside a first type of method.
319
type channelMap struct {
321
topLevelChannels map[int64]struct{}
322
servers map[int64]*server
323
channels map[int64]*channel
324
subChannels map[int64]*subChannel
325
listenSockets map[int64]*listenSocket
326
normalSockets map[int64]*normalSocket
329
func (c *channelMap) addServer(id int64, s *server) {
336
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
342
c.topLevelChannels[id] = struct{}{}
344
c.findEntry(pid).addChild(id, cn)
349
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
353
c.subChannels[id] = sc
354
c.findEntry(pid).addChild(id, sc)
358
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
361
c.listenSockets[id] = ls
362
c.findEntry(pid).addChild(id, ls)
366
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
369
c.normalSockets[id] = ns
370
c.findEntry(pid).addChild(id, ns)
374
// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
375
// wait on the deletion of its children and until no other entity's channel trace references it.
376
// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
377
// shutting down server will lead to the server being also deleted.
378
func (c *channelMap) removeEntry(id int64) {
380
c.findEntry(id).triggerDelete()
384
// c.mu must be held by the caller
385
func (c *channelMap) decrTraceRefCount(id int64) {
387
if v, ok := e.(tracedChannel); ok {
388
v.decrTraceRefCount()
389
e.deleteSelfIfReady()
393
// c.mu must be held by the caller.
394
func (c *channelMap) findEntry(id int64) entry {
397
if v, ok = c.channels[id]; ok {
400
if v, ok = c.subChannels[id]; ok {
403
if v, ok = c.servers[id]; ok {
406
if v, ok = c.listenSockets[id]; ok {
409
if v, ok = c.normalSockets[id]; ok {
412
return &dummyEntry{idNotFound: id}
415
// c.mu must be held by the caller
416
// deleteEntry simply deletes an entry from the channelMap. Before calling this
417
// method, caller must check this entry is ready to be deleted, i.e removeEntry()
418
// has been called on it, and no children still exist.
419
// Conditionals are ordered by the expected frequency of deletion of each entity
420
// type, in order to optimize performance.
421
func (c *channelMap) deleteEntry(id int64) {
423
if _, ok = c.normalSockets[id]; ok {
424
delete(c.normalSockets, id)
427
if _, ok = c.subChannels[id]; ok {
428
delete(c.subChannels, id)
431
if _, ok = c.channels[id]; ok {
432
delete(c.channels, id)
433
delete(c.topLevelChannels, id)
436
if _, ok = c.listenSockets[id]; ok {
437
delete(c.listenSockets, id)
440
if _, ok = c.servers[id]; ok {
441
delete(c.servers, id)
446
func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
448
child := c.findEntry(id)
449
childTC, ok := child.(tracedChannel)
454
childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
455
if desc.Parent != nil {
456
parent := c.findEntry(child.getParentID())
457
var chanType RefChannelType
458
switch child.(type) {
460
chanType = RefChannel
462
chanType = RefSubChannel
464
if parentTC, ok := parent.(tracedChannel); ok {
465
parentTC.getChannelTrace().append(&TraceEvent{
466
Desc: desc.Parent.Desc,
467
Severity: desc.Parent.Severity,
468
Timestamp: time.Now(),
470
RefName: childTC.getRefName(),
473
childTC.incrTraceRefCount()
479
type int64Slice []int64
481
func (s int64Slice) Len() int { return len(s) }
482
func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
483
func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
485
func copyMap(m map[int64]string) map[int64]string {
486
n := make(map[int64]string)
487
for k, v := range m {
493
func min(a, b int64) int64 {
500
func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
502
maxResults = EntryPerPage
505
l := int64(len(c.topLevelChannels))
506
ids := make([]int64, 0, l)
507
cns := make([]*channel, 0, min(l, maxResults))
509
for k := range c.topLevelChannels {
512
sort.Sort(int64Slice(ids))
513
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
516
var t []*ChannelMetric
517
for i, v := range ids[idx:] {
518
if count == maxResults {
521
if cn, ok := c.channels[v]; ok {
522
cns = append(cns, cn)
523
t = append(t, &ChannelMetric{
524
NestedChans: copyMap(cn.nestedChans),
525
SubChans: copyMap(cn.subChans),
529
if i == len(ids[idx:])-1 {
539
for i, cn := range cns {
540
t[i].ChannelData = cn.c.ChannelzMetric()
542
t[i].RefName = cn.refName
543
t[i].Trace = cn.trace.dumpData()
548
func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
550
maxResults = EntryPerPage
553
l := int64(len(c.servers))
554
ids := make([]int64, 0, l)
555
ss := make([]*server, 0, min(l, maxResults))
556
for k := range c.servers {
559
sort.Sort(int64Slice(ids))
560
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
563
var s []*ServerMetric
564
for i, v := range ids[idx:] {
565
if count == maxResults {
568
if svr, ok := c.servers[v]; ok {
570
s = append(s, &ServerMetric{
571
ListenSockets: copyMap(svr.listenSockets),
575
if i == len(ids[idx:])-1 {
585
for i, svr := range ss {
586
s[i].ServerData = svr.s.ChannelzMetric()
588
s[i].RefName = svr.refName
593
func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
595
maxResults = EntryPerPage
600
if svr, ok = c.servers[id]; !ok {
601
// server with id doesn't exist.
605
svrskts := svr.sockets
606
l := int64(len(svrskts))
607
ids := make([]int64, 0, l)
608
sks := make([]*normalSocket, 0, min(l, maxResults))
609
for k := range svrskts {
612
sort.Sort(int64Slice(ids))
613
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
616
for i, v := range ids[idx:] {
617
if count == maxResults {
620
if ns, ok := c.normalSockets[v]; ok {
621
sks = append(sks, ns)
624
if i == len(ids[idx:])-1 {
633
var s []*SocketMetric
634
for _, ns := range sks {
635
sm := &SocketMetric{}
636
sm.SocketData = ns.s.ChannelzMetric()
638
sm.RefName = ns.refName
644
func (c *channelMap) GetChannel(id int64) *ChannelMetric {
645
cm := &ChannelMetric{}
649
if cn, ok = c.channels[id]; !ok {
650
// channel with id doesn't exist.
654
cm.NestedChans = copyMap(cn.nestedChans)
655
cm.SubChans = copyMap(cn.subChans)
656
// cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
657
// holding the lock to prevent potential data race.
660
cm.ChannelData = chanCopy.ChannelzMetric()
662
cm.RefName = cn.refName
663
cm.Trace = cn.trace.dumpData()
667
func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
668
cm := &SubChannelMetric{}
672
if sc, ok = c.subChannels[id]; !ok {
673
// subchannel with id doesn't exist.
677
cm.Sockets = copyMap(sc.sockets)
678
// sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
679
// holding the lock to prevent potential data race.
682
cm.ChannelData = chanCopy.ChannelzMetric()
684
cm.RefName = sc.refName
685
cm.Trace = sc.trace.dumpData()
689
func (c *channelMap) GetSocket(id int64) *SocketMetric {
690
sm := &SocketMetric{}
692
if ls, ok := c.listenSockets[id]; ok {
694
sm.SocketData = ls.s.ChannelzMetric()
696
sm.RefName = ls.refName
699
if ns, ok := c.normalSockets[id]; ok {
701
sm.SocketData = ns.s.ChannelzMetric()
703
sm.RefName = ns.refName
710
func (c *channelMap) GetServer(id int64) *ServerMetric {
711
sm := &ServerMetric{}
715
if svr, ok = c.servers[id]; !ok {
719
sm.ListenSockets = copyMap(svr.listenSockets)
722
sm.RefName = svr.refName
723
sm.ServerData = svr.s.ChannelzMetric()
727
type idGenerator struct {
731
func (i *idGenerator) reset() {
732
atomic.StoreInt64(&i.id, 0)
735
func (i *idGenerator) genID() int64 {
736
return atomic.AddInt64(&i.id, 1)