cubefs

Форк
0
/
consumer_group.go 
970 строк · 27.9 Кб
1
package sarama
2

3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"sort"
8
	"sync"
9
	"time"
10

11
	"github.com/rcrowley/go-metrics"
12
)
13

14
// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
15
var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
16

17
// ConsumerGroup is responsible for dividing up processing of topics and partitions
18
// over a collection of processes (the members of the consumer group).
19
type ConsumerGroup interface {
20
	// Consume joins a cluster of consumers for a given list of topics and
21
	// starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
22
	//
23
	// The life-cycle of a session is represented by the following steps:
24
	//
25
	// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
26
	//    and is assigned their "fair share" of partitions, aka 'claims'.
27
	// 2. Before processing starts, the handler's Setup() hook is called to notify the user
28
	//    of the claims and allow any necessary preparation or alteration of state.
29
	// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
30
	//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
31
	//    from concurrent reads/writes.
32
	// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
33
	//    parent context is canceled or when a server-side rebalance cycle is initiated.
34
	// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
35
	//    to allow the user to perform any final tasks before a rebalance.
36
	// 6. Finally, marked offsets are committed one last time before claims are released.
37
	//
38
	// Please note, that once a rebalance is triggered, sessions must be completed within
39
	// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
40
	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
41
	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
42
	// commit failures.
43
	// This method should be called inside an infinite loop, when a
44
	// server-side rebalance happens, the consumer session will need to be
45
	// recreated to get the new claims.
46
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
47

48
	// Errors returns a read channel of errors that occurred during the consumer life-cycle.
49
	// By default, errors are logged and not returned over this channel.
50
	// If you want to implement any custom error handling, set your config's
51
	// Consumer.Return.Errors setting to true, and read from this channel.
52
	Errors() <-chan error
53

54
	// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
55
	// this function before the object passes out of scope, as it will otherwise leak memory.
56
	Close() error
57

58
	// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
59
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
60
	// Note that this method does not affect partition subscription.
61
	// In particular, it does not cause a group rebalance when automatic assignment is used.
62
	Pause(partitions map[string][]int32)
63

64
	// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
65
	// New calls to the broker will return records from these partitions if there are any to be fetched.
66
	Resume(partitions map[string][]int32)
67

68
	// Pause suspends fetching from all partitions. Future calls to the broker will not return any
69
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
70
	// Note that this method does not affect partition subscription.
71
	// In particular, it does not cause a group rebalance when automatic assignment is used.
72
	PauseAll()
73

74
	// Resume resumes all partitions which have been paused with Pause()/PauseAll().
75
	// New calls to the broker will return records from these partitions if there are any to be fetched.
76
	ResumeAll()
77
}
78

79
type consumerGroup struct {
80
	client Client
81

82
	config   *Config
83
	consumer Consumer
84
	groupID  string
85
	memberID string
86
	errors   chan error
87

88
	lock      sync.Mutex
89
	closed    chan none
90
	closeOnce sync.Once
91

92
	userData []byte
93
}
94

95
// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
96
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
97
	client, err := NewClient(addrs, config)
98
	if err != nil {
99
		return nil, err
100
	}
101

102
	c, err := newConsumerGroup(groupID, client)
103
	if err != nil {
104
		_ = client.Close()
105
	}
106
	return c, err
107
}
108

109
// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
110
// necessary to call Close() on the underlying client when shutting down this consumer.
111
// PLEASE NOTE: consumer groups can only re-use but not share clients.
112
func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
113
	// For clients passed in by the client, ensure we don't
114
	// call Close() on it.
115
	cli := &nopCloserClient{client}
116
	return newConsumerGroup(groupID, cli)
117
}
118

119
func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
120
	config := client.Config()
121
	if !config.Version.IsAtLeast(V0_10_2_0) {
122
		return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
123
	}
124

125
	consumer, err := NewConsumerFromClient(client)
126
	if err != nil {
127
		return nil, err
128
	}
129

130
	return &consumerGroup{
131
		client:   client,
132
		consumer: consumer,
133
		config:   config,
134
		groupID:  groupID,
135
		errors:   make(chan error, config.ChannelBufferSize),
136
		closed:   make(chan none),
137
		userData: config.Consumer.Group.Member.UserData,
138
	}, nil
139
}
140

141
// Errors implements ConsumerGroup.
142
func (c *consumerGroup) Errors() <-chan error { return c.errors }
143

144
// Close implements ConsumerGroup.
145
func (c *consumerGroup) Close() (err error) {
146
	c.closeOnce.Do(func() {
147
		close(c.closed)
148

149
		// leave group
150
		if e := c.leave(); e != nil {
151
			err = e
152
		}
153

154
		// drain errors
155
		go func() {
156
			close(c.errors)
157
		}()
158
		for e := range c.errors {
159
			err = e
160
		}
161

162
		if e := c.client.Close(); e != nil {
163
			err = e
164
		}
165
	})
166
	return
167
}
168

169
// Consume implements ConsumerGroup.
170
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
171
	// Ensure group is not closed
172
	select {
173
	case <-c.closed:
174
		return ErrClosedConsumerGroup
175
	default:
176
	}
177

178
	c.lock.Lock()
179
	defer c.lock.Unlock()
180

181
	// Quick exit when no topics are provided
182
	if len(topics) == 0 {
183
		return fmt.Errorf("no topics provided")
184
	}
185

186
	// Refresh metadata for requested topics
187
	if err := c.client.RefreshMetadata(topics...); err != nil {
188
		return err
189
	}
190

191
	// Init session
192
	sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
193
	if errors.Is(err, ErrClosedClient) {
194
		return ErrClosedConsumerGroup
195
	} else if err != nil {
196
		return err
197
	}
198

199
	// loop check topic partition numbers changed
200
	// will trigger rebalance when any topic partitions number had changed
201
	// avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
202
	go c.loopCheckPartitionNumbers(topics, sess)
203

204
	// Wait for session exit signal
205
	<-sess.ctx.Done()
206

207
	// Gracefully release session claims
208
	return sess.release(true)
209
}
210

211
// Pause implements ConsumerGroup.
212
func (c *consumerGroup) Pause(partitions map[string][]int32) {
213
	c.consumer.Pause(partitions)
214
}
215

216
// Resume implements ConsumerGroup.
217
func (c *consumerGroup) Resume(partitions map[string][]int32) {
218
	c.consumer.Resume(partitions)
219
}
220

221
// PauseAll implements ConsumerGroup.
222
func (c *consumerGroup) PauseAll() {
223
	c.consumer.PauseAll()
224
}
225

226
// ResumeAll implements ConsumerGroup.
227
func (c *consumerGroup) ResumeAll() {
228
	c.consumer.ResumeAll()
229
}
230

231
func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
232
	select {
233
	case <-c.closed:
234
		return nil, ErrClosedConsumerGroup
235
	case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
236
	}
237

238
	if refreshCoordinator {
239
		err := c.client.RefreshCoordinator(c.groupID)
240
		if err != nil {
241
			return c.retryNewSession(ctx, topics, handler, retries, true)
242
		}
243
	}
244

245
	return c.newSession(ctx, topics, handler, retries-1)
246
}
247

248
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
249
	coordinator, err := c.client.Coordinator(c.groupID)
250
	if err != nil {
251
		if retries <= 0 {
252
			return nil, err
253
		}
254

255
		return c.retryNewSession(ctx, topics, handler, retries, true)
256
	}
257

258
	var (
259
		metricRegistry          = c.config.MetricRegistry
260
		consumerGroupJoinTotal  metrics.Counter
261
		consumerGroupJoinFailed metrics.Counter
262
		consumerGroupSyncTotal  metrics.Counter
263
		consumerGroupSyncFailed metrics.Counter
264
	)
265

266
	if metricRegistry != nil {
267
		consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
268
		consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
269
		consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
270
		consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
271
	}
272

273
	// Join consumer group
274
	join, err := c.joinGroupRequest(coordinator, topics)
275
	if consumerGroupJoinTotal != nil {
276
		consumerGroupJoinTotal.Inc(1)
277
	}
278
	if err != nil {
279
		_ = coordinator.Close()
280
		if consumerGroupJoinFailed != nil {
281
			consumerGroupJoinFailed.Inc(1)
282
		}
283
		return nil, err
284
	}
285
	if !errors.Is(join.Err, ErrNoError) {
286
		if consumerGroupJoinFailed != nil {
287
			consumerGroupJoinFailed.Inc(1)
288
		}
289
	}
290
	switch join.Err {
291
	case ErrNoError:
292
		c.memberID = join.MemberId
293
	case ErrUnknownMemberId, ErrIllegalGeneration:
294
		// reset member ID and retry immediately
295
		c.memberID = ""
296
		return c.newSession(ctx, topics, handler, retries)
297
	case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
298
		// retry after backoff
299
		if retries <= 0 {
300
			return nil, join.Err
301
		}
302
		return c.retryNewSession(ctx, topics, handler, retries, true)
303
	default:
304
		return nil, join.Err
305
	}
306

307
	// Prepare distribution plan if we joined as the leader
308
	var plan BalanceStrategyPlan
309
	if join.LeaderId == join.MemberId {
310
		members, err := join.GetMembers()
311
		if err != nil {
312
			return nil, err
313
		}
314

315
		plan, err = c.balance(members)
316
		if err != nil {
317
			return nil, err
318
		}
319
	}
320

321
	// Sync consumer group
322
	groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
323
	if consumerGroupSyncTotal != nil {
324
		consumerGroupSyncTotal.Inc(1)
325
	}
326
	if err != nil {
327
		_ = coordinator.Close()
328
		if consumerGroupSyncFailed != nil {
329
			consumerGroupSyncFailed.Inc(1)
330
		}
331
		return nil, err
332
	}
333
	if !errors.Is(groupRequest.Err, ErrNoError) {
334
		if consumerGroupSyncFailed != nil {
335
			consumerGroupSyncFailed.Inc(1)
336
		}
337
	}
338

339
	switch groupRequest.Err {
340
	case ErrNoError:
341
	case ErrUnknownMemberId, ErrIllegalGeneration:
342
		// reset member ID and retry immediately
343
		c.memberID = ""
344
		return c.newSession(ctx, topics, handler, retries)
345
	case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
346
		// retry after backoff
347
		if retries <= 0 {
348
			return nil, groupRequest.Err
349
		}
350
		return c.retryNewSession(ctx, topics, handler, retries, true)
351
	default:
352
		return nil, groupRequest.Err
353
	}
354

355
	// Retrieve and sort claims
356
	var claims map[string][]int32
357
	if len(groupRequest.MemberAssignment) > 0 {
358
		members, err := groupRequest.GetMemberAssignment()
359
		if err != nil {
360
			return nil, err
361
		}
362
		claims = members.Topics
363

364
		// in the case of stateful balance strategies, hold on to the returned
365
		// assignment metadata, otherwise, reset the statically defined conusmer
366
		// group metadata
367
		if members.UserData != nil {
368
			c.userData = members.UserData
369
		} else {
370
			c.userData = c.config.Consumer.Group.Member.UserData
371
		}
372

373
		for _, partitions := range claims {
374
			sort.Sort(int32Slice(partitions))
375
		}
376
	}
377

378
	return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
379
}
380

381
func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
382
	req := &JoinGroupRequest{
383
		GroupId:        c.groupID,
384
		MemberId:       c.memberID,
385
		SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
386
		ProtocolType:   "consumer",
387
	}
388
	if c.config.Version.IsAtLeast(V0_10_1_0) {
389
		req.Version = 1
390
		req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
391
	}
392

393
	meta := &ConsumerGroupMemberMetadata{
394
		Topics:   topics,
395
		UserData: c.userData,
396
	}
397
	strategy := c.config.Consumer.Group.Rebalance.Strategy
398
	if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
399
		return nil, err
400
	}
401

402
	return coordinator.JoinGroup(req)
403
}
404

405
func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
406
	req := &SyncGroupRequest{
407
		GroupId:      c.groupID,
408
		MemberId:     c.memberID,
409
		GenerationId: generationID,
410
	}
411
	strategy := c.config.Consumer.Group.Rebalance.Strategy
412
	for memberID, topics := range plan {
413
		assignment := &ConsumerGroupMemberAssignment{Topics: topics}
414
		userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
415
		if err != nil {
416
			return nil, err
417
		}
418
		assignment.UserData = userDataBytes
419
		if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
420
			return nil, err
421
		}
422
	}
423
	return coordinator.SyncGroup(req)
424
}
425

426
func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
427
	req := &HeartbeatRequest{
428
		GroupId:      c.groupID,
429
		MemberId:     memberID,
430
		GenerationId: generationID,
431
	}
432

433
	return coordinator.Heartbeat(req)
434
}
435

436
func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
437
	topics := make(map[string][]int32)
438
	for _, meta := range members {
439
		for _, topic := range meta.Topics {
440
			topics[topic] = nil
441
		}
442
	}
443

444
	for topic := range topics {
445
		partitions, err := c.client.Partitions(topic)
446
		if err != nil {
447
			return nil, err
448
		}
449
		topics[topic] = partitions
450
	}
451

452
	strategy := c.config.Consumer.Group.Rebalance.Strategy
453
	return strategy.Plan(members, topics)
454
}
455

456
// Leaves the cluster, called by Close.
457
func (c *consumerGroup) leave() error {
458
	c.lock.Lock()
459
	defer c.lock.Unlock()
460
	if c.memberID == "" {
461
		return nil
462
	}
463

464
	coordinator, err := c.client.Coordinator(c.groupID)
465
	if err != nil {
466
		return err
467
	}
468

469
	resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
470
		GroupId:  c.groupID,
471
		MemberId: c.memberID,
472
	})
473
	if err != nil {
474
		_ = coordinator.Close()
475
		return err
476
	}
477

478
	// Unset memberID
479
	c.memberID = ""
480

481
	// Check response
482
	switch resp.Err {
483
	case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
484
		return nil
485
	default:
486
		return resp.Err
487
	}
488
}
489

490
func (c *consumerGroup) handleError(err error, topic string, partition int32) {
491
	var consumerError *ConsumerError
492
	if ok := errors.As(err, &consumerError); !ok && topic != "" && partition > -1 {
493
		err = &ConsumerError{
494
			Topic:     topic,
495
			Partition: partition,
496
			Err:       err,
497
		}
498
	}
499

500
	if !c.config.Consumer.Return.Errors {
501
		Logger.Println(err)
502
		return
503
	}
504

505
	select {
506
	case <-c.closed:
507
		// consumer is closed
508
		return
509
	default:
510
	}
511

512
	select {
513
	case c.errors <- err:
514
	default:
515
		// no error listener
516
	}
517
}
518

519
func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
520
	pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
521
	defer session.cancel()
522
	defer pause.Stop()
523
	var oldTopicToPartitionNum map[string]int
524
	var err error
525
	if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
526
		return
527
	}
528
	for {
529
		if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
530
			return
531
		} else {
532
			for topic, num := range oldTopicToPartitionNum {
533
				if newTopicToPartitionNum[topic] != num {
534
					return // trigger the end of the session on exit
535
				}
536
			}
537
		}
538
		select {
539
		case <-pause.C:
540
		case <-session.ctx.Done():
541
			Logger.Printf(
542
				"consumergroup/%s loop check partition number coroutine will exit, topics %s\n",
543
				c.groupID, topics)
544
			// if session closed by other, should be exited
545
			return
546
		case <-c.closed:
547
			return
548
		}
549
	}
550
}
551

552
func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
553
	topicToPartitionNum := make(map[string]int, len(topics))
554
	for _, topic := range topics {
555
		if partitionNum, err := c.client.Partitions(topic); err != nil {
556
			Logger.Printf(
557
				"consumergroup/%s topic %s get partition number failed due to '%v'\n",
558
				c.groupID, topic, err)
559
			return nil, err
560
		} else {
561
			topicToPartitionNum[topic] = len(partitionNum)
562
		}
563
	}
564
	return topicToPartitionNum, nil
565
}
566

567
// --------------------------------------------------------------------
568

569
// ConsumerGroupSession represents a consumer group member session.
570
type ConsumerGroupSession interface {
571
	// Claims returns information about the claimed partitions by topic.
572
	Claims() map[string][]int32
573

574
	// MemberID returns the cluster member ID.
575
	MemberID() string
576

577
	// GenerationID returns the current generation ID.
578
	GenerationID() int32
579

580
	// MarkOffset marks the provided offset, alongside a metadata string
581
	// that represents the state of the partition consumer at that point in time. The
582
	// metadata string can be used by another consumer to restore that state, so it
583
	// can resume consumption.
584
	//
585
	// To follow upstream conventions, you are expected to mark the offset of the
586
	// next message to read, not the last message read. Thus, when calling `MarkOffset`
587
	// you should typically add one to the offset of the last consumed message.
588
	//
589
	// Note: calling MarkOffset does not necessarily commit the offset to the backend
590
	// store immediately for efficiency reasons, and it may never be committed if
591
	// your application crashes. This means that you may end up processing the same
592
	// message twice, and your processing should ideally be idempotent.
593
	MarkOffset(topic string, partition int32, offset int64, metadata string)
594

595
	// Commit the offset to the backend
596
	//
597
	// Note: calling Commit performs a blocking synchronous operation.
598
	Commit()
599

600
	// ResetOffset resets to the provided offset, alongside a metadata string that
601
	// represents the state of the partition consumer at that point in time. Reset
602
	// acts as a counterpart to MarkOffset, the difference being that it allows to
603
	// reset an offset to an earlier or smaller value, where MarkOffset only
604
	// allows incrementing the offset. cf MarkOffset for more details.
605
	ResetOffset(topic string, partition int32, offset int64, metadata string)
606

607
	// MarkMessage marks a message as consumed.
608
	MarkMessage(msg *ConsumerMessage, metadata string)
609

610
	// Context returns the session context.
611
	Context() context.Context
612
}
613

614
type consumerGroupSession struct {
615
	parent       *consumerGroup
616
	memberID     string
617
	generationID int32
618
	handler      ConsumerGroupHandler
619

620
	claims  map[string][]int32
621
	offsets *offsetManager
622
	ctx     context.Context
623
	cancel  func()
624

625
	waitGroup       sync.WaitGroup
626
	releaseOnce     sync.Once
627
	hbDying, hbDead chan none
628
}
629

630
func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
631
	// init offset manager
632
	offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
633
	if err != nil {
634
		return nil, err
635
	}
636

637
	// init context
638
	ctx, cancel := context.WithCancel(ctx)
639

640
	// init session
641
	sess := &consumerGroupSession{
642
		parent:       parent,
643
		memberID:     memberID,
644
		generationID: generationID,
645
		handler:      handler,
646
		offsets:      offsets,
647
		claims:       claims,
648
		ctx:          ctx,
649
		cancel:       cancel,
650
		hbDying:      make(chan none),
651
		hbDead:       make(chan none),
652
	}
653

654
	// start heartbeat loop
655
	go sess.heartbeatLoop()
656

657
	// create a POM for each claim
658
	for topic, partitions := range claims {
659
		for _, partition := range partitions {
660
			pom, err := offsets.ManagePartition(topic, partition)
661
			if err != nil {
662
				_ = sess.release(false)
663
				return nil, err
664
			}
665

666
			// handle POM errors
667
			go func(topic string, partition int32) {
668
				for err := range pom.Errors() {
669
					sess.parent.handleError(err, topic, partition)
670
				}
671
			}(topic, partition)
672
		}
673
	}
674

675
	// perform setup
676
	if err := handler.Setup(sess); err != nil {
677
		_ = sess.release(true)
678
		return nil, err
679
	}
680

681
	// start consuming
682
	for topic, partitions := range claims {
683
		for _, partition := range partitions {
684
			sess.waitGroup.Add(1)
685

686
			go func(topic string, partition int32) {
687
				defer sess.waitGroup.Done()
688

689
				// cancel the as session as soon as the first
690
				// goroutine exits
691
				defer sess.cancel()
692

693
				// consume a single topic/partition, blocking
694
				sess.consume(topic, partition)
695
			}(topic, partition)
696
		}
697
	}
698
	return sess, nil
699
}
700

701
func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
702
func (s *consumerGroupSession) MemberID() string           { return s.memberID }
703
func (s *consumerGroupSession) GenerationID() int32        { return s.generationID }
704

705
func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
706
	if pom := s.offsets.findPOM(topic, partition); pom != nil {
707
		pom.MarkOffset(offset, metadata)
708
	}
709
}
710

711
func (s *consumerGroupSession) Commit() {
712
	s.offsets.Commit()
713
}
714

715
func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
716
	if pom := s.offsets.findPOM(topic, partition); pom != nil {
717
		pom.ResetOffset(offset, metadata)
718
	}
719
}
720

721
func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
722
	s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
723
}
724

725
func (s *consumerGroupSession) Context() context.Context {
726
	return s.ctx
727
}
728

729
func (s *consumerGroupSession) consume(topic string, partition int32) {
730
	// quick exit if rebalance is due
731
	select {
732
	case <-s.ctx.Done():
733
		return
734
	case <-s.parent.closed:
735
		return
736
	default:
737
	}
738

739
	// get next offset
740
	offset := s.parent.config.Consumer.Offsets.Initial
741
	if pom := s.offsets.findPOM(topic, partition); pom != nil {
742
		offset, _ = pom.NextOffset()
743
	}
744

745
	// create new claim
746
	claim, err := newConsumerGroupClaim(s, topic, partition, offset)
747
	if err != nil {
748
		s.parent.handleError(err, topic, partition)
749
		return
750
	}
751

752
	// handle errors
753
	go func() {
754
		for err := range claim.Errors() {
755
			s.parent.handleError(err, topic, partition)
756
		}
757
	}()
758

759
	// trigger close when session is done
760
	go func() {
761
		select {
762
		case <-s.ctx.Done():
763
		case <-s.parent.closed:
764
		}
765
		claim.AsyncClose()
766
	}()
767

768
	// start processing
769
	if err := s.handler.ConsumeClaim(s, claim); err != nil {
770
		s.parent.handleError(err, topic, partition)
771
	}
772

773
	// ensure consumer is closed & drained
774
	claim.AsyncClose()
775
	for _, err := range claim.waitClosed() {
776
		s.parent.handleError(err, topic, partition)
777
	}
778
}
779

780
func (s *consumerGroupSession) release(withCleanup bool) (err error) {
781
	// signal release, stop heartbeat
782
	s.cancel()
783

784
	// wait for consumers to exit
785
	s.waitGroup.Wait()
786

787
	// perform release
788
	s.releaseOnce.Do(func() {
789
		if withCleanup {
790
			if e := s.handler.Cleanup(s); e != nil {
791
				s.parent.handleError(e, "", -1)
792
				err = e
793
			}
794
		}
795

796
		if e := s.offsets.Close(); e != nil {
797
			err = e
798
		}
799

800
		close(s.hbDying)
801
		<-s.hbDead
802
	})
803

804
	Logger.Printf(
805
		"consumergroup/session/%s/%d released\n",
806
		s.MemberID(), s.GenerationID())
807

808
	return
809
}
810

811
func (s *consumerGroupSession) heartbeatLoop() {
812
	defer close(s.hbDead)
813
	defer s.cancel() // trigger the end of the session on exit
814
	defer func() {
815
		Logger.Printf(
816
			"consumergroup/session/%s/%d heartbeat loop stopped\n",
817
			s.MemberID(), s.GenerationID())
818
	}()
819

820
	pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
821
	defer pause.Stop()
822

823
	retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
824
	defer retryBackoff.Stop()
825

826
	retries := s.parent.config.Metadata.Retry.Max
827
	for {
828
		coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
829
		if err != nil {
830
			if retries <= 0 {
831
				s.parent.handleError(err, "", -1)
832
				return
833
			}
834
			retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
835
			select {
836
			case <-s.hbDying:
837
				return
838
			case <-retryBackoff.C:
839
				retries--
840
			}
841
			continue
842
		}
843

844
		resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
845
		if err != nil {
846
			_ = coordinator.Close()
847

848
			if retries <= 0 {
849
				s.parent.handleError(err, "", -1)
850
				return
851
			}
852

853
			retries--
854
			continue
855
		}
856

857
		switch resp.Err {
858
		case ErrNoError:
859
			retries = s.parent.config.Metadata.Retry.Max
860
		case ErrRebalanceInProgress:
861
			retries = s.parent.config.Metadata.Retry.Max
862
			s.cancel()
863
		case ErrUnknownMemberId, ErrIllegalGeneration:
864
			return
865
		default:
866
			s.parent.handleError(resp.Err, "", -1)
867
			return
868
		}
869

870
		select {
871
		case <-pause.C:
872
		case <-s.hbDying:
873
			return
874
		}
875
	}
876
}
877

878
// --------------------------------------------------------------------
879

880
// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
881
// It also provides hooks for your consumer group session life-cycle and allow you to
882
// trigger logic before or after the consume loop(s).
883
//
884
// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
885
// ensure that all state is safely protected against race conditions.
886
type ConsumerGroupHandler interface {
887
	// Setup is run at the beginning of a new session, before ConsumeClaim.
888
	Setup(ConsumerGroupSession) error
889

890
	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
891
	// but before the offsets are committed for the very last time.
892
	Cleanup(ConsumerGroupSession) error
893

894
	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
895
	// Once the Messages() channel is closed, the Handler must finish its processing
896
	// loop and exit.
897
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
898
}
899

900
// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
901
type ConsumerGroupClaim interface {
902
	// Topic returns the consumed topic name.
903
	Topic() string
904

905
	// Partition returns the consumed partition.
906
	Partition() int32
907

908
	// InitialOffset returns the initial offset that was used as a starting point for this claim.
909
	InitialOffset() int64
910

911
	// HighWaterMarkOffset returns the high water mark offset of the partition,
912
	// i.e. the offset that will be used for the next message that will be produced.
913
	// You can use this to determine how far behind the processing is.
914
	HighWaterMarkOffset() int64
915

916
	// Messages returns the read channel for the messages that are returned by
917
	// the broker. The messages channel will be closed when a new rebalance cycle
918
	// is due. You must finish processing and mark offsets within
919
	// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
920
	// re-assigned to another group member.
921
	Messages() <-chan *ConsumerMessage
922
}
923

924
type consumerGroupClaim struct {
925
	topic     string
926
	partition int32
927
	offset    int64
928
	PartitionConsumer
929
}
930

931
func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
932
	pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
933
	if errors.Is(err, ErrOffsetOutOfRange) {
934
		offset = sess.parent.config.Consumer.Offsets.Initial
935
		pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
936
	}
937
	if err != nil {
938
		return nil, err
939
	}
940

941
	go func() {
942
		for err := range pcm.Errors() {
943
			sess.parent.handleError(err, topic, partition)
944
		}
945
	}()
946

947
	return &consumerGroupClaim{
948
		topic:             topic,
949
		partition:         partition,
950
		offset:            offset,
951
		PartitionConsumer: pcm,
952
	}, nil
953
}
954

955
func (c *consumerGroupClaim) Topic() string        { return c.topic }
956
func (c *consumerGroupClaim) Partition() int32     { return c.partition }
957
func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
958

959
// Drains messages and errors, ensures the claim is fully closed.
960
func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
961
	go func() {
962
		for range c.Messages() {
963
		}
964
	}()
965

966
	for err := range c.Errors() {
967
		errs = append(errs, err)
968
	}
969
	return
970
}
971

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.