cubefs

Форк
0
/
offset_manager.go 
593 строки · 15.2 Кб
1
package sarama
2

3
import (
4
	"sync"
5
	"time"
6
)
7

8
// Offset Manager
9

10
// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11
type OffsetManager interface {
12
	// ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13
	// It will return an error if this OffsetManager is already managing the given
14
	// topic/partition.
15
	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16

17
	// Close stops the OffsetManager from managing offsets. It is required to call
18
	// this function before an OffsetManager object passes out of scope, as it
19
	// will otherwise leak memory. You must call this after all the
20
	// PartitionOffsetManagers are closed.
21
	Close() error
22

23
	// Commit commits the offsets. This method can be used if AutoCommit.Enable is
24
	// set to false.
25
	Commit()
26
}
27

28
type offsetManager struct {
29
	client Client
30
	conf   *Config
31
	group  string
32
	ticker *time.Ticker
33

34
	memberID   string
35
	generation int32
36

37
	broker     *Broker
38
	brokerLock sync.RWMutex
39

40
	poms     map[string]map[int32]*partitionOffsetManager
41
	pomsLock sync.RWMutex
42

43
	closeOnce sync.Once
44
	closing   chan none
45
	closed    chan none
46
}
47

48
// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
49
// It is still necessary to call Close() on the underlying client when finished with the partition manager.
50
func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
51
	return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
52
}
53

54
func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
55
	// Check that we are not dealing with a closed Client before processing any other arguments
56
	if client.Closed() {
57
		return nil, ErrClosedClient
58
	}
59

60
	conf := client.Config()
61
	om := &offsetManager{
62
		client: client,
63
		conf:   conf,
64
		group:  group,
65
		poms:   make(map[string]map[int32]*partitionOffsetManager),
66

67
		memberID:   memberID,
68
		generation: generation,
69

70
		closing: make(chan none),
71
		closed:  make(chan none),
72
	}
73
	if conf.Consumer.Offsets.AutoCommit.Enable {
74
		om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
75
		go withRecover(om.mainLoop)
76
	}
77

78
	return om, nil
79
}
80

81
func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
82
	pom, err := om.newPartitionOffsetManager(topic, partition)
83
	if err != nil {
84
		return nil, err
85
	}
86

87
	om.pomsLock.Lock()
88
	defer om.pomsLock.Unlock()
89

90
	topicManagers := om.poms[topic]
91
	if topicManagers == nil {
92
		topicManagers = make(map[int32]*partitionOffsetManager)
93
		om.poms[topic] = topicManagers
94
	}
95

96
	if topicManagers[partition] != nil {
97
		return nil, ConfigurationError("That topic/partition is already being managed")
98
	}
99

100
	topicManagers[partition] = pom
101
	return pom, nil
102
}
103

104
func (om *offsetManager) Close() error {
105
	om.closeOnce.Do(func() {
106
		// exit the mainLoop
107
		close(om.closing)
108
		if om.conf.Consumer.Offsets.AutoCommit.Enable {
109
			<-om.closed
110
		}
111

112
		// mark all POMs as closed
113
		om.asyncClosePOMs()
114

115
		// flush one last time
116
		if om.conf.Consumer.Offsets.AutoCommit.Enable {
117
			for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
118
				om.flushToBroker()
119
				if om.releasePOMs(false) == 0 {
120
					break
121
				}
122
			}
123
		}
124

125
		om.releasePOMs(true)
126
		om.brokerLock.Lock()
127
		om.broker = nil
128
		om.brokerLock.Unlock()
129
	})
130
	return nil
131
}
132

133
func (om *offsetManager) computeBackoff(retries int) time.Duration {
134
	if om.conf.Metadata.Retry.BackoffFunc != nil {
135
		return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
136
	} else {
137
		return om.conf.Metadata.Retry.Backoff
138
	}
139
}
140

141
func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
142
	broker, err := om.coordinator()
143
	if err != nil {
144
		if retries <= 0 {
145
			return 0, "", err
146
		}
147
		return om.fetchInitialOffset(topic, partition, retries-1)
148
	}
149

150
	req := new(OffsetFetchRequest)
151
	req.Version = 1
152
	req.ConsumerGroup = om.group
153
	req.AddPartition(topic, partition)
154

155
	resp, err := broker.FetchOffset(req)
156
	if err != nil {
157
		if retries <= 0 {
158
			return 0, "", err
159
		}
160
		om.releaseCoordinator(broker)
161
		return om.fetchInitialOffset(topic, partition, retries-1)
162
	}
163

164
	block := resp.GetBlock(topic, partition)
165
	if block == nil {
166
		return 0, "", ErrIncompleteResponse
167
	}
168

169
	switch block.Err {
170
	case ErrNoError:
171
		return block.Offset, block.Metadata, nil
172
	case ErrNotCoordinatorForConsumer:
173
		if retries <= 0 {
174
			return 0, "", block.Err
175
		}
176
		om.releaseCoordinator(broker)
177
		return om.fetchInitialOffset(topic, partition, retries-1)
178
	case ErrOffsetsLoadInProgress:
179
		if retries <= 0 {
180
			return 0, "", block.Err
181
		}
182
		backoff := om.computeBackoff(retries)
183
		select {
184
		case <-om.closing:
185
			return 0, "", block.Err
186
		case <-time.After(backoff):
187
		}
188
		return om.fetchInitialOffset(topic, partition, retries-1)
189
	default:
190
		return 0, "", block.Err
191
	}
192
}
193

194
func (om *offsetManager) coordinator() (*Broker, error) {
195
	om.brokerLock.RLock()
196
	broker := om.broker
197
	om.brokerLock.RUnlock()
198

199
	if broker != nil {
200
		return broker, nil
201
	}
202

203
	om.brokerLock.Lock()
204
	defer om.brokerLock.Unlock()
205

206
	if broker := om.broker; broker != nil {
207
		return broker, nil
208
	}
209

210
	if err := om.client.RefreshCoordinator(om.group); err != nil {
211
		return nil, err
212
	}
213

214
	broker, err := om.client.Coordinator(om.group)
215
	if err != nil {
216
		return nil, err
217
	}
218

219
	om.broker = broker
220
	return broker, nil
221
}
222

223
func (om *offsetManager) releaseCoordinator(b *Broker) {
224
	om.brokerLock.Lock()
225
	if om.broker == b {
226
		om.broker = nil
227
	}
228
	om.brokerLock.Unlock()
229
}
230

231
func (om *offsetManager) mainLoop() {
232
	defer om.ticker.Stop()
233
	defer close(om.closed)
234

235
	for {
236
		select {
237
		case <-om.ticker.C:
238
			om.Commit()
239
		case <-om.closing:
240
			return
241
		}
242
	}
243
}
244

245
func (om *offsetManager) Commit() {
246
	om.flushToBroker()
247
	om.releasePOMs(false)
248
}
249

250
func (om *offsetManager) flushToBroker() {
251
	req := om.constructRequest()
252
	if req == nil {
253
		return
254
	}
255

256
	broker, err := om.coordinator()
257
	if err != nil {
258
		om.handleError(err)
259
		return
260
	}
261

262
	resp, err := broker.CommitOffset(req)
263
	if err != nil {
264
		om.handleError(err)
265
		om.releaseCoordinator(broker)
266
		_ = broker.Close()
267
		return
268
	}
269

270
	om.handleResponse(broker, req, resp)
271
}
272

273
func (om *offsetManager) constructRequest() *OffsetCommitRequest {
274
	var r *OffsetCommitRequest
275
	var perPartitionTimestamp int64
276
	if om.conf.Consumer.Offsets.Retention == 0 {
277
		perPartitionTimestamp = ReceiveTime
278
		r = &OffsetCommitRequest{
279
			Version:                 1,
280
			ConsumerGroup:           om.group,
281
			ConsumerID:              om.memberID,
282
			ConsumerGroupGeneration: om.generation,
283
		}
284
	} else {
285
		r = &OffsetCommitRequest{
286
			Version:                 2,
287
			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
288
			ConsumerGroup:           om.group,
289
			ConsumerID:              om.memberID,
290
			ConsumerGroupGeneration: om.generation,
291
		}
292
	}
293

294
	om.pomsLock.RLock()
295
	defer om.pomsLock.RUnlock()
296

297
	for _, topicManagers := range om.poms {
298
		for _, pom := range topicManagers {
299
			pom.lock.Lock()
300
			if pom.dirty {
301
				r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
302
			}
303
			pom.lock.Unlock()
304
		}
305
	}
306

307
	if len(r.blocks) > 0 {
308
		return r
309
	}
310

311
	return nil
312
}
313

314
func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
315
	om.pomsLock.RLock()
316
	defer om.pomsLock.RUnlock()
317

318
	for _, topicManagers := range om.poms {
319
		for _, pom := range topicManagers {
320
			if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
321
				continue
322
			}
323

324
			var err KError
325
			var ok bool
326

327
			if resp.Errors[pom.topic] == nil {
328
				pom.handleError(ErrIncompleteResponse)
329
				continue
330
			}
331
			if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
332
				pom.handleError(ErrIncompleteResponse)
333
				continue
334
			}
335

336
			switch err {
337
			case ErrNoError:
338
				block := req.blocks[pom.topic][pom.partition]
339
				pom.updateCommitted(block.offset, block.metadata)
340
			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
341
				ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
342
				// not a critical error, we just need to redispatch
343
				om.releaseCoordinator(broker)
344
			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
345
				// nothing we can do about this, just tell the user and carry on
346
				pom.handleError(err)
347
			case ErrOffsetsLoadInProgress:
348
				// nothing wrong but we didn't commit, we'll get it next time round
349
			case ErrUnknownTopicOrPartition:
350
				// let the user know *and* try redispatching - if topic-auto-create is
351
				// enabled, redispatching should trigger a metadata req and create the
352
				// topic; if not then re-dispatching won't help, but we've let the user
353
				// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
354
				fallthrough
355
			default:
356
				// dunno, tell the user and try redispatching
357
				pom.handleError(err)
358
				om.releaseCoordinator(broker)
359
			}
360
		}
361
	}
362
}
363

364
func (om *offsetManager) handleError(err error) {
365
	om.pomsLock.RLock()
366
	defer om.pomsLock.RUnlock()
367

368
	for _, topicManagers := range om.poms {
369
		for _, pom := range topicManagers {
370
			pom.handleError(err)
371
		}
372
	}
373
}
374

375
func (om *offsetManager) asyncClosePOMs() {
376
	om.pomsLock.RLock()
377
	defer om.pomsLock.RUnlock()
378

379
	for _, topicManagers := range om.poms {
380
		for _, pom := range topicManagers {
381
			pom.AsyncClose()
382
		}
383
	}
384
}
385

386
// Releases/removes closed POMs once they are clean (or when forced)
387
func (om *offsetManager) releasePOMs(force bool) (remaining int) {
388
	om.pomsLock.Lock()
389
	defer om.pomsLock.Unlock()
390

391
	for topic, topicManagers := range om.poms {
392
		for partition, pom := range topicManagers {
393
			pom.lock.Lock()
394
			releaseDue := pom.done && (force || !pom.dirty)
395
			pom.lock.Unlock()
396

397
			if releaseDue {
398
				pom.release()
399

400
				delete(om.poms[topic], partition)
401
				if len(om.poms[topic]) == 0 {
402
					delete(om.poms, topic)
403
				}
404
			}
405
		}
406
		remaining += len(om.poms[topic])
407
	}
408
	return
409
}
410

411
func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
412
	om.pomsLock.RLock()
413
	defer om.pomsLock.RUnlock()
414

415
	if partitions, ok := om.poms[topic]; ok {
416
		if pom, ok := partitions[partition]; ok {
417
			return pom
418
		}
419
	}
420
	return nil
421
}
422

423
// Partition Offset Manager
424

425
// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
426
// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
427
// out of scope.
428
type PartitionOffsetManager interface {
429
	// NextOffset returns the next offset that should be consumed for the managed
430
	// partition, accompanied by metadata which can be used to reconstruct the state
431
	// of the partition consumer when it resumes. NextOffset() will return
432
	// `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
433
	// was committed for this partition yet.
434
	NextOffset() (int64, string)
435

436
	// MarkOffset marks the provided offset, alongside a metadata string
437
	// that represents the state of the partition consumer at that point in time. The
438
	// metadata string can be used by another consumer to restore that state, so it
439
	// can resume consumption.
440
	//
441
	// To follow upstream conventions, you are expected to mark the offset of the
442
	// next message to read, not the last message read. Thus, when calling `MarkOffset`
443
	// you should typically add one to the offset of the last consumed message.
444
	//
445
	// Note: calling MarkOffset does not necessarily commit the offset to the backend
446
	// store immediately for efficiency reasons, and it may never be committed if
447
	// your application crashes. This means that you may end up processing the same
448
	// message twice, and your processing should ideally be idempotent.
449
	MarkOffset(offset int64, metadata string)
450

451
	// ResetOffset resets to the provided offset, alongside a metadata string that
452
	// represents the state of the partition consumer at that point in time. Reset
453
	// acts as a counterpart to MarkOffset, the difference being that it allows to
454
	// reset an offset to an earlier or smaller value, where MarkOffset only
455
	// allows incrementing the offset. cf MarkOffset for more details.
456
	ResetOffset(offset int64, metadata string)
457

458
	// Errors returns a read channel of errors that occur during offset management, if
459
	// enabled. By default, errors are logged and not returned over this channel. If
460
	// you want to implement any custom error handling, set your config's
461
	// Consumer.Return.Errors setting to true, and read from this channel.
462
	Errors() <-chan *ConsumerError
463

464
	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
465
	// return immediately, after which you should wait until the 'errors' channel has
466
	// been drained and closed. It is required to call this function, or Close before
467
	// a consumer object passes out of scope, as it will otherwise leak memory. You
468
	// must call this before calling Close on the underlying client.
469
	AsyncClose()
470

471
	// Close stops the PartitionOffsetManager from managing offsets. It is required to
472
	// call this function (or AsyncClose) before a PartitionOffsetManager object
473
	// passes out of scope, as it will otherwise leak memory. You must call this
474
	// before calling Close on the underlying client.
475
	Close() error
476
}
477

478
type partitionOffsetManager struct {
479
	parent    *offsetManager
480
	topic     string
481
	partition int32
482

483
	lock     sync.Mutex
484
	offset   int64
485
	metadata string
486
	dirty    bool
487
	done     bool
488

489
	releaseOnce sync.Once
490
	errors      chan *ConsumerError
491
}
492

493
func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
494
	offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
495
	if err != nil {
496
		return nil, err
497
	}
498

499
	return &partitionOffsetManager{
500
		parent:    om,
501
		topic:     topic,
502
		partition: partition,
503
		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
504
		offset:    offset,
505
		metadata:  metadata,
506
	}, nil
507
}
508

509
func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
510
	return pom.errors
511
}
512

513
func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
514
	pom.lock.Lock()
515
	defer pom.lock.Unlock()
516

517
	if offset > pom.offset {
518
		pom.offset = offset
519
		pom.metadata = metadata
520
		pom.dirty = true
521
	}
522
}
523

524
func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
525
	pom.lock.Lock()
526
	defer pom.lock.Unlock()
527

528
	if offset <= pom.offset {
529
		pom.offset = offset
530
		pom.metadata = metadata
531
		pom.dirty = true
532
	}
533
}
534

535
func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
536
	pom.lock.Lock()
537
	defer pom.lock.Unlock()
538

539
	if pom.offset == offset && pom.metadata == metadata {
540
		pom.dirty = false
541
	}
542
}
543

544
func (pom *partitionOffsetManager) NextOffset() (int64, string) {
545
	pom.lock.Lock()
546
	defer pom.lock.Unlock()
547

548
	if pom.offset >= 0 {
549
		return pom.offset, pom.metadata
550
	}
551

552
	return pom.parent.conf.Consumer.Offsets.Initial, ""
553
}
554

555
func (pom *partitionOffsetManager) AsyncClose() {
556
	pom.lock.Lock()
557
	pom.done = true
558
	pom.lock.Unlock()
559
}
560

561
func (pom *partitionOffsetManager) Close() error {
562
	pom.AsyncClose()
563

564
	var errors ConsumerErrors
565
	for err := range pom.errors {
566
		errors = append(errors, err)
567
	}
568

569
	if len(errors) > 0 {
570
		return errors
571
	}
572
	return nil
573
}
574

575
func (pom *partitionOffsetManager) handleError(err error) {
576
	cErr := &ConsumerError{
577
		Topic:     pom.topic,
578
		Partition: pom.partition,
579
		Err:       err,
580
	}
581

582
	if pom.parent.conf.Consumer.Return.Errors {
583
		pom.errors <- cErr
584
	} else {
585
		Logger.Println(cErr)
586
	}
587
}
588

589
func (pom *partitionOffsetManager) release() {
590
	pom.releaseOnce.Do(func() {
591
		close(pom.errors)
592
	})
593
}
594

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.