cubefs

Форк
0
1124 строки · 34.3 Кб
1
package sarama
2

3
import (
4
	"errors"
5
	"math/rand"
6
	"sort"
7
	"sync"
8
	"time"
9
)
10

11
// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
12
// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
13
// automatically when it passes out of scope. It is safe to share a client amongst many
14
// users, however Kafka will process requests from a single client strictly in serial,
15
// so it is generally more efficient to use the default one client per producer/consumer.
16
type Client interface {
17
	// Config returns the Config struct of the client. This struct should not be
18
	// altered after it has been created.
19
	Config() *Config
20

21
	// Controller returns the cluster controller broker. It will return a
22
	// locally cached value if it's available. You can call RefreshController
23
	// to update the cached value. Requires Kafka 0.10 or higher.
24
	Controller() (*Broker, error)
25

26
	// RefreshController retrieves the cluster controller from fresh metadata
27
	// and stores it in the local cache. Requires Kafka 0.10 or higher.
28
	RefreshController() (*Broker, error)
29

30
	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
31
	Brokers() []*Broker
32

33
	// Broker returns the active Broker if available for the broker ID.
34
	Broker(brokerID int32) (*Broker, error)
35

36
	// Topics returns the set of available topics as retrieved from cluster metadata.
37
	Topics() ([]string, error)
38

39
	// Partitions returns the sorted list of all partition IDs for the given topic.
40
	Partitions(topic string) ([]int32, error)
41

42
	// WritablePartitions returns the sorted list of all writable partition IDs for
43
	// the given topic, where "writable" means "having a valid leader accepting
44
	// writes".
45
	WritablePartitions(topic string) ([]int32, error)
46

47
	// Leader returns the broker object that is the leader of the current
48
	// topic/partition, as determined by querying the cluster metadata.
49
	Leader(topic string, partitionID int32) (*Broker, error)
50

51
	// Replicas returns the set of all replica IDs for the given partition.
52
	Replicas(topic string, partitionID int32) ([]int32, error)
53

54
	// InSyncReplicas returns the set of all in-sync replica IDs for the given
55
	// partition. In-sync replicas are replicas which are fully caught up with
56
	// the partition leader.
57
	InSyncReplicas(topic string, partitionID int32) ([]int32, error)
58

59
	// OfflineReplicas returns the set of all offline replica IDs for the given
60
	// partition. Offline replicas are replicas which are offline
61
	OfflineReplicas(topic string, partitionID int32) ([]int32, error)
62

63
	// RefreshBrokers takes a list of addresses to be used as seed brokers.
64
	// Existing broker connections are closed and the updated list of seed brokers
65
	// will be used for the next metadata fetch.
66
	RefreshBrokers(addrs []string) error
67

68
	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
69
	// available metadata for those topics. If no topics are provided, it will refresh
70
	// metadata for all topics.
71
	RefreshMetadata(topics ...string) error
72

73
	// GetOffset queries the cluster to get the most recent available offset at the
74
	// given time (in milliseconds) on the topic/partition combination.
75
	// Time should be OffsetOldest for the earliest available offset,
76
	// OffsetNewest for the offset of the message that will be produced next, or a time.
77
	GetOffset(topic string, partitionID int32, time int64) (int64, error)
78

79
	// Coordinator returns the coordinating broker for a consumer group. It will
80
	// return a locally cached value if it's available. You can call
81
	// RefreshCoordinator to update the cached value. This function only works on
82
	// Kafka 0.8.2 and higher.
83
	Coordinator(consumerGroup string) (*Broker, error)
84

85
	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it
86
	// in local cache. This function only works on Kafka 0.8.2 and higher.
87
	RefreshCoordinator(consumerGroup string) error
88

89
	// InitProducerID retrieves information required for Idempotent Producer
90
	InitProducerID() (*InitProducerIDResponse, error)
91

92
	// Close shuts down all broker connections managed by this client. It is required
93
	// to call this function before a client object passes out of scope, as it will
94
	// otherwise leak memory. You must close any Producers or Consumers using a client
95
	// before you close the client.
96
	Close() error
97

98
	// Closed returns true if the client has already had Close called on it
99
	Closed() bool
100
}
101

102
const (
103
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
104
	// assigned to the next message that will be produced to the partition. You
105
	// can send this to a client's GetOffset method to get this offset, or when
106
	// calling ConsumePartition to start consuming new messages.
107
	OffsetNewest int64 = -1
108
	// OffsetOldest stands for the oldest offset available on the broker for a
109
	// partition. You can send this to a client's GetOffset method to get this
110
	// offset, or when calling ConsumePartition to start consuming from the
111
	// oldest offset that is still available on the broker.
112
	OffsetOldest int64 = -2
113
)
114

115
type client struct {
116
	conf           *Config
117
	closer, closed chan none // for shutting down background metadata updater
118

119
	// the broker addresses given to us through the constructor are not guaranteed to be returned in
120
	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
121
	// so we store them separately
122
	seedBrokers []*Broker
123
	deadSeeds   []*Broker
124

125
	controllerID   int32                                   // cluster controller broker id
126
	brokers        map[int32]*Broker                       // maps broker ids to brokers
127
	metadata       map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
128
	metadataTopics map[string]none                         // topics that need to collect metadata
129
	coordinators   map[string]int32                        // Maps consumer group names to coordinating broker IDs
130

131
	// If the number of partitions is large, we can get some churn calling cachedPartitions,
132
	// so the result is cached.  It is important to update this value whenever metadata is changed
133
	cachedPartitionsResults map[string][maxPartitionIndex][]int32
134

135
	lock sync.RWMutex // protects access to the maps that hold cluster state.
136
}
137

138
// NewClient creates a new Client. It connects to one of the given broker addresses
139
// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
140
// be retrieved from any of the given broker addresses, the client is not created.
141
func NewClient(addrs []string, conf *Config) (Client, error) {
142
	DebugLogger.Println("Initializing new client")
143

144
	if conf == nil {
145
		conf = NewConfig()
146
	}
147

148
	if err := conf.Validate(); err != nil {
149
		return nil, err
150
	}
151

152
	if len(addrs) < 1 {
153
		return nil, ConfigurationError("You must provide at least one broker address")
154
	}
155

156
	client := &client{
157
		conf:                    conf,
158
		closer:                  make(chan none),
159
		closed:                  make(chan none),
160
		brokers:                 make(map[int32]*Broker),
161
		metadata:                make(map[string]map[int32]*PartitionMetadata),
162
		metadataTopics:          make(map[string]none),
163
		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
164
		coordinators:            make(map[string]int32),
165
	}
166

167
	client.randomizeSeedBrokers(addrs)
168

169
	if conf.Metadata.Full {
170
		// do an initial fetch of all cluster metadata by specifying an empty list of topics
171
		err := client.RefreshMetadata()
172
		if err == nil {
173
		} else if errors.Is(err, ErrLeaderNotAvailable) || errors.Is(err, ErrReplicaNotAvailable) || errors.Is(err, ErrTopicAuthorizationFailed) || errors.Is(err, ErrClusterAuthorizationFailed) {
174
			// indicates that maybe part of the cluster is down, but is not fatal to creating the client
175
			Logger.Println(err)
176
		} else {
177
			close(client.closed) // we haven't started the background updater yet, so we have to do this manually
178
			_ = client.Close()
179
			return nil, err
180
		}
181
	}
182
	go withRecover(client.backgroundMetadataUpdater)
183

184
	DebugLogger.Println("Successfully initialized new client")
185

186
	return client, nil
187
}
188

189
func (client *client) Config() *Config {
190
	return client.conf
191
}
192

193
func (client *client) Brokers() []*Broker {
194
	client.lock.RLock()
195
	defer client.lock.RUnlock()
196
	brokers := make([]*Broker, 0, len(client.brokers))
197
	for _, broker := range client.brokers {
198
		brokers = append(brokers, broker)
199
	}
200
	return brokers
201
}
202

203
func (client *client) Broker(brokerID int32) (*Broker, error) {
204
	client.lock.RLock()
205
	defer client.lock.RUnlock()
206
	broker, ok := client.brokers[brokerID]
207
	if !ok {
208
		return nil, ErrBrokerNotFound
209
	}
210
	_ = broker.Open(client.conf)
211
	return broker, nil
212
}
213

214
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
215
	brokerErrors := make([]error, 0)
216
	for broker := client.any(); broker != nil; broker = client.any() {
217
		var response *InitProducerIDResponse
218
		req := &InitProducerIDRequest{}
219

220
		response, err := broker.InitProducerID(req)
221
		if err == nil {
222
			return response, nil
223
		} else {
224
			// some error, remove that broker and try again
225
			Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
226
			_ = broker.Close()
227
			brokerErrors = append(brokerErrors, err)
228
			client.deregisterBroker(broker)
229
		}
230
	}
231

232
	return nil, Wrap(ErrOutOfBrokers, brokerErrors...)
233
}
234

235
func (client *client) Close() error {
236
	if client.Closed() {
237
		// Chances are this is being called from a defer() and the error will go unobserved
238
		// so we go ahead and log the event in this case.
239
		Logger.Printf("Close() called on already closed client")
240
		return ErrClosedClient
241
	}
242

243
	// shutdown and wait for the background thread before we take the lock, to avoid races
244
	close(client.closer)
245
	<-client.closed
246

247
	client.lock.Lock()
248
	defer client.lock.Unlock()
249
	DebugLogger.Println("Closing Client")
250

251
	for _, broker := range client.brokers {
252
		safeAsyncClose(broker)
253
	}
254

255
	for _, broker := range client.seedBrokers {
256
		safeAsyncClose(broker)
257
	}
258

259
	client.brokers = nil
260
	client.metadata = nil
261
	client.metadataTopics = nil
262

263
	return nil
264
}
265

266
func (client *client) Closed() bool {
267
	client.lock.RLock()
268
	defer client.lock.RUnlock()
269

270
	return client.brokers == nil
271
}
272

273
func (client *client) Topics() ([]string, error) {
274
	if client.Closed() {
275
		return nil, ErrClosedClient
276
	}
277

278
	client.lock.RLock()
279
	defer client.lock.RUnlock()
280

281
	ret := make([]string, 0, len(client.metadata))
282
	for topic := range client.metadata {
283
		ret = append(ret, topic)
284
	}
285

286
	return ret, nil
287
}
288

289
func (client *client) MetadataTopics() ([]string, error) {
290
	if client.Closed() {
291
		return nil, ErrClosedClient
292
	}
293

294
	client.lock.RLock()
295
	defer client.lock.RUnlock()
296

297
	ret := make([]string, 0, len(client.metadataTopics))
298
	for topic := range client.metadataTopics {
299
		ret = append(ret, topic)
300
	}
301

302
	return ret, nil
303
}
304

305
func (client *client) Partitions(topic string) ([]int32, error) {
306
	if client.Closed() {
307
		return nil, ErrClosedClient
308
	}
309

310
	partitions := client.cachedPartitions(topic, allPartitions)
311

312
	if len(partitions) == 0 {
313
		err := client.RefreshMetadata(topic)
314
		if err != nil {
315
			return nil, err
316
		}
317
		partitions = client.cachedPartitions(topic, allPartitions)
318
	}
319

320
	// no partitions found after refresh metadata
321
	if len(partitions) == 0 {
322
		return nil, ErrUnknownTopicOrPartition
323
	}
324

325
	return partitions, nil
326
}
327

328
func (client *client) WritablePartitions(topic string) ([]int32, error) {
329
	if client.Closed() {
330
		return nil, ErrClosedClient
331
	}
332

333
	partitions := client.cachedPartitions(topic, writablePartitions)
334

335
	// len==0 catches when it's nil (no such topic) and the odd case when every single
336
	// partition is undergoing leader election simultaneously. Callers have to be able to handle
337
	// this function returning an empty slice (which is a valid return value) but catching it
338
	// here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
339
	// a metadata refresh as a nicety so callers can just try again and don't have to manually
340
	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
341
	if len(partitions) == 0 {
342
		err := client.RefreshMetadata(topic)
343
		if err != nil {
344
			return nil, err
345
		}
346
		partitions = client.cachedPartitions(topic, writablePartitions)
347
	}
348

349
	if partitions == nil {
350
		return nil, ErrUnknownTopicOrPartition
351
	}
352

353
	return partitions, nil
354
}
355

356
func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
357
	if client.Closed() {
358
		return nil, ErrClosedClient
359
	}
360

361
	metadata := client.cachedMetadata(topic, partitionID)
362

363
	if metadata == nil {
364
		err := client.RefreshMetadata(topic)
365
		if err != nil {
366
			return nil, err
367
		}
368
		metadata = client.cachedMetadata(topic, partitionID)
369
	}
370

371
	if metadata == nil {
372
		return nil, ErrUnknownTopicOrPartition
373
	}
374

375
	if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
376
		return dupInt32Slice(metadata.Replicas), metadata.Err
377
	}
378
	return dupInt32Slice(metadata.Replicas), nil
379
}
380

381
func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
382
	if client.Closed() {
383
		return nil, ErrClosedClient
384
	}
385

386
	metadata := client.cachedMetadata(topic, partitionID)
387

388
	if metadata == nil {
389
		err := client.RefreshMetadata(topic)
390
		if err != nil {
391
			return nil, err
392
		}
393
		metadata = client.cachedMetadata(topic, partitionID)
394
	}
395

396
	if metadata == nil {
397
		return nil, ErrUnknownTopicOrPartition
398
	}
399

400
	if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
401
		return dupInt32Slice(metadata.Isr), metadata.Err
402
	}
403
	return dupInt32Slice(metadata.Isr), nil
404
}
405

406
func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
407
	if client.Closed() {
408
		return nil, ErrClosedClient
409
	}
410

411
	metadata := client.cachedMetadata(topic, partitionID)
412

413
	if metadata == nil {
414
		err := client.RefreshMetadata(topic)
415
		if err != nil {
416
			return nil, err
417
		}
418
		metadata = client.cachedMetadata(topic, partitionID)
419
	}
420

421
	if metadata == nil {
422
		return nil, ErrUnknownTopicOrPartition
423
	}
424

425
	if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
426
		return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
427
	}
428
	return dupInt32Slice(metadata.OfflineReplicas), nil
429
}
430

431
func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
432
	if client.Closed() {
433
		return nil, ErrClosedClient
434
	}
435

436
	leader, err := client.cachedLeader(topic, partitionID)
437

438
	if leader == nil {
439
		err = client.RefreshMetadata(topic)
440
		if err != nil {
441
			return nil, err
442
		}
443
		leader, err = client.cachedLeader(topic, partitionID)
444
	}
445

446
	return leader, err
447
}
448

449
func (client *client) RefreshBrokers(addrs []string) error {
450
	if client.Closed() {
451
		return ErrClosedClient
452
	}
453

454
	client.lock.Lock()
455
	defer client.lock.Unlock()
456

457
	for _, broker := range client.brokers {
458
		_ = broker.Close()
459
		delete(client.brokers, broker.ID())
460
	}
461

462
	for _, broker := range client.seedBrokers {
463
		_ = broker.Close()
464
	}
465

466
	for _, broker := range client.deadSeeds {
467
		_ = broker.Close()
468
	}
469

470
	client.seedBrokers = nil
471
	client.deadSeeds = nil
472

473
	client.randomizeSeedBrokers(addrs)
474

475
	return nil
476
}
477

478
func (client *client) RefreshMetadata(topics ...string) error {
479
	if client.Closed() {
480
		return ErrClosedClient
481
	}
482

483
	// Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
484
	// error. This handles the case by returning an error instead of sending it
485
	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
486
	for _, topic := range topics {
487
		if topic == "" {
488
			return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
489
		}
490
	}
491

492
	deadline := time.Time{}
493
	if client.conf.Metadata.Timeout > 0 {
494
		deadline = time.Now().Add(client.conf.Metadata.Timeout)
495
	}
496
	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
497
}
498

499
func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
500
	if client.Closed() {
501
		return -1, ErrClosedClient
502
	}
503

504
	offset, err := client.getOffset(topic, partitionID, time)
505
	if err != nil {
506
		if err := client.RefreshMetadata(topic); err != nil {
507
			return -1, err
508
		}
509
		return client.getOffset(topic, partitionID, time)
510
	}
511

512
	return offset, err
513
}
514

515
func (client *client) Controller() (*Broker, error) {
516
	if client.Closed() {
517
		return nil, ErrClosedClient
518
	}
519

520
	if !client.conf.Version.IsAtLeast(V0_10_0_0) {
521
		return nil, ErrUnsupportedVersion
522
	}
523

524
	controller := client.cachedController()
525
	if controller == nil {
526
		if err := client.refreshMetadata(); err != nil {
527
			return nil, err
528
		}
529
		controller = client.cachedController()
530
	}
531

532
	if controller == nil {
533
		return nil, ErrControllerNotAvailable
534
	}
535

536
	_ = controller.Open(client.conf)
537
	return controller, nil
538
}
539

540
// deregisterController removes the cached controllerID
541
func (client *client) deregisterController() {
542
	client.lock.Lock()
543
	defer client.lock.Unlock()
544
	if controller, ok := client.brokers[client.controllerID]; ok {
545
		_ = controller.Close()
546
		delete(client.brokers, client.controllerID)
547
	}
548
}
549

550
// RefreshController retrieves the cluster controller from fresh metadata
551
// and stores it in the local cache. Requires Kafka 0.10 or higher.
552
func (client *client) RefreshController() (*Broker, error) {
553
	if client.Closed() {
554
		return nil, ErrClosedClient
555
	}
556

557
	client.deregisterController()
558

559
	if err := client.refreshMetadata(); err != nil {
560
		return nil, err
561
	}
562

563
	controller := client.cachedController()
564
	if controller == nil {
565
		return nil, ErrControllerNotAvailable
566
	}
567

568
	_ = controller.Open(client.conf)
569
	return controller, nil
570
}
571

572
func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
573
	if client.Closed() {
574
		return nil, ErrClosedClient
575
	}
576

577
	coordinator := client.cachedCoordinator(consumerGroup)
578

579
	if coordinator == nil {
580
		if err := client.RefreshCoordinator(consumerGroup); err != nil {
581
			return nil, err
582
		}
583
		coordinator = client.cachedCoordinator(consumerGroup)
584
	}
585

586
	if coordinator == nil {
587
		return nil, ErrConsumerCoordinatorNotAvailable
588
	}
589

590
	_ = coordinator.Open(client.conf)
591
	return coordinator, nil
592
}
593

594
func (client *client) RefreshCoordinator(consumerGroup string) error {
595
	if client.Closed() {
596
		return ErrClosedClient
597
	}
598

599
	response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
600
	if err != nil {
601
		return err
602
	}
603

604
	client.lock.Lock()
605
	defer client.lock.Unlock()
606
	client.registerBroker(response.Coordinator)
607
	client.coordinators[consumerGroup] = response.Coordinator.ID()
608
	return nil
609
}
610

611
// private broker management helpers
612

613
func (client *client) randomizeSeedBrokers(addrs []string) {
614
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
615
	for _, index := range random.Perm(len(addrs)) {
616
		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
617
	}
618
}
619

620
func (client *client) updateBroker(brokers []*Broker) {
621
	currentBroker := make(map[int32]*Broker, len(brokers))
622

623
	for _, broker := range brokers {
624
		currentBroker[broker.ID()] = broker
625
		if client.brokers[broker.ID()] == nil { // add new broker
626
			client.brokers[broker.ID()] = broker
627
			DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
628
		} else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
629
			safeAsyncClose(client.brokers[broker.ID()])
630
			client.brokers[broker.ID()] = broker
631
			Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
632
		}
633
	}
634

635
	for id, broker := range client.brokers {
636
		if _, exist := currentBroker[id]; !exist { // remove old broker
637
			safeAsyncClose(broker)
638
			delete(client.brokers, id)
639
			Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
640
		}
641
	}
642
}
643

644
// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
645
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
646
// or a previously registered Broker instance. You must hold the write lock before calling this function.
647
func (client *client) registerBroker(broker *Broker) {
648
	if client.brokers == nil {
649
		Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
650
		return
651
	}
652

653
	if client.brokers[broker.ID()] == nil {
654
		client.brokers[broker.ID()] = broker
655
		DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
656
	} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
657
		safeAsyncClose(client.brokers[broker.ID()])
658
		client.brokers[broker.ID()] = broker
659
		Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
660
	}
661
}
662

663
// deregisterBroker removes a broker from the seedsBroker list, and if it's
664
// not the seedbroker, removes it from brokers map completely.
665
func (client *client) deregisterBroker(broker *Broker) {
666
	client.lock.Lock()
667
	defer client.lock.Unlock()
668

669
	if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
670
		client.deadSeeds = append(client.deadSeeds, broker)
671
		client.seedBrokers = client.seedBrokers[1:]
672
	} else {
673
		// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
674
		// but we really shouldn't have to; once that loop is made better this case can be
675
		// removed, and the function generally can be renamed from `deregisterBroker` to
676
		// `nextSeedBroker` or something
677
		DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
678
		delete(client.brokers, broker.ID())
679
	}
680
}
681

682
func (client *client) resurrectDeadBrokers() {
683
	client.lock.Lock()
684
	defer client.lock.Unlock()
685

686
	Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
687
	client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
688
	client.deadSeeds = nil
689
}
690

691
func (client *client) any() *Broker {
692
	client.lock.RLock()
693
	defer client.lock.RUnlock()
694

695
	if len(client.seedBrokers) > 0 {
696
		_ = client.seedBrokers[0].Open(client.conf)
697
		return client.seedBrokers[0]
698
	}
699

700
	// not guaranteed to be random *or* deterministic
701
	for _, broker := range client.brokers {
702
		_ = broker.Open(client.conf)
703
		return broker
704
	}
705

706
	return nil
707
}
708

709
// private caching/lazy metadata helpers
710

711
type partitionType int
712

713
const (
714
	allPartitions partitionType = iota
715
	writablePartitions
716
	// If you add any more types, update the partition cache in update()
717

718
	// Ensure this is the last partition type value
719
	maxPartitionIndex
720
)
721

722
func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
723
	client.lock.RLock()
724
	defer client.lock.RUnlock()
725

726
	partitions := client.metadata[topic]
727
	if partitions != nil {
728
		return partitions[partitionID]
729
	}
730

731
	return nil
732
}
733

734
func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
735
	client.lock.RLock()
736
	defer client.lock.RUnlock()
737

738
	partitions, exists := client.cachedPartitionsResults[topic]
739

740
	if !exists {
741
		return nil
742
	}
743
	return partitions[partitionSet]
744
}
745

746
func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
747
	partitions := client.metadata[topic]
748

749
	if partitions == nil {
750
		return nil
751
	}
752

753
	ret := make([]int32, 0, len(partitions))
754
	for _, partition := range partitions {
755
		if partitionSet == writablePartitions && errors.Is(partition.Err, ErrLeaderNotAvailable) {
756
			continue
757
		}
758
		ret = append(ret, partition.ID)
759
	}
760

761
	sort.Sort(int32Slice(ret))
762
	return ret
763
}
764

765
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
766
	client.lock.RLock()
767
	defer client.lock.RUnlock()
768

769
	partitions := client.metadata[topic]
770
	if partitions != nil {
771
		metadata, ok := partitions[partitionID]
772
		if ok {
773
			if errors.Is(metadata.Err, ErrLeaderNotAvailable) {
774
				return nil, ErrLeaderNotAvailable
775
			}
776
			b := client.brokers[metadata.Leader]
777
			if b == nil {
778
				return nil, ErrLeaderNotAvailable
779
			}
780
			_ = b.Open(client.conf)
781
			return b, nil
782
		}
783
	}
784

785
	return nil, ErrUnknownTopicOrPartition
786
}
787

788
func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
789
	broker, err := client.Leader(topic, partitionID)
790
	if err != nil {
791
		return -1, err
792
	}
793

794
	request := &OffsetRequest{}
795
	if client.conf.Version.IsAtLeast(V0_10_1_0) {
796
		request.Version = 1
797
	}
798
	request.AddBlock(topic, partitionID, time, 1)
799

800
	response, err := broker.GetAvailableOffsets(request)
801
	if err != nil {
802
		_ = broker.Close()
803
		return -1, err
804
	}
805

806
	block := response.GetBlock(topic, partitionID)
807
	if block == nil {
808
		_ = broker.Close()
809
		return -1, ErrIncompleteResponse
810
	}
811
	if !errors.Is(block.Err, ErrNoError) {
812
		return -1, block.Err
813
	}
814
	if len(block.Offsets) != 1 {
815
		return -1, ErrOffsetOutOfRange
816
	}
817

818
	return block.Offsets[0], nil
819
}
820

821
// core metadata update logic
822

823
func (client *client) backgroundMetadataUpdater() {
824
	defer close(client.closed)
825

826
	if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
827
		return
828
	}
829

830
	ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
831
	defer ticker.Stop()
832

833
	for {
834
		select {
835
		case <-ticker.C:
836
			if err := client.refreshMetadata(); err != nil {
837
				Logger.Println("Client background metadata update:", err)
838
			}
839
		case <-client.closer:
840
			return
841
		}
842
	}
843
}
844

845
func (client *client) refreshMetadata() error {
846
	var topics []string
847

848
	if !client.conf.Metadata.Full {
849
		if specificTopics, err := client.MetadataTopics(); err != nil {
850
			return err
851
		} else if len(specificTopics) == 0 {
852
			return ErrNoTopicsToUpdateMetadata
853
		} else {
854
			topics = specificTopics
855
		}
856
	}
857

858
	if err := client.RefreshMetadata(topics...); err != nil {
859
		return err
860
	}
861

862
	return nil
863
}
864

865
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
866
	pastDeadline := func(backoff time.Duration) bool {
867
		if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
868
			// we are past the deadline
869
			return true
870
		}
871
		return false
872
	}
873
	retry := func(err error) error {
874
		if attemptsRemaining > 0 {
875
			backoff := client.computeBackoff(attemptsRemaining)
876
			if pastDeadline(backoff) {
877
				Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
878
				return err
879
			}
880
			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
881
			if backoff > 0 {
882
				time.Sleep(backoff)
883
			}
884
			return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
885
		}
886
		return err
887
	}
888

889
	broker := client.any()
890
	brokerErrors := make([]error, 0)
891
	for ; broker != nil && !pastDeadline(0); broker = client.any() {
892
		allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
893
		if len(topics) > 0 {
894
			DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
895
		} else {
896
			allowAutoTopicCreation = false
897
			DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
898
		}
899

900
		req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
901
		if client.conf.Version.IsAtLeast(V1_0_0_0) {
902
			req.Version = 5
903
		} else if client.conf.Version.IsAtLeast(V0_10_0_0) {
904
			req.Version = 1
905
		}
906
		response, err := broker.GetMetadata(req)
907
		var kerror KError
908
		var packetEncodingError PacketEncodingError
909
		if err == nil {
910
			allKnownMetaData := len(topics) == 0
911
			// valid response, use it
912
			shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
913
			if shouldRetry {
914
				Logger.Println("client/metadata found some partitions to be leaderless")
915
				return retry(err) // note: err can be nil
916
			}
917
			return err
918
		} else if errors.As(err, &packetEncodingError) {
919
			// didn't even send, return the error
920
			return err
921
		} else if errors.As(err, &kerror) {
922
			// if SASL auth error return as this _should_ be a non retryable err for all brokers
923
			if errors.Is(err, ErrSASLAuthenticationFailed) {
924
				Logger.Println("client/metadata failed SASL authentication")
925
				return err
926
			}
927

928
			if errors.Is(err, ErrTopicAuthorizationFailed) {
929
				Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
930
				return err
931
			}
932
			// else remove that broker and try again
933
			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
934
			_ = broker.Close()
935
			client.deregisterBroker(broker)
936
		} else {
937
			// some other error, remove that broker and try again
938
			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
939
			brokerErrors = append(brokerErrors, err)
940
			_ = broker.Close()
941
			client.deregisterBroker(broker)
942
		}
943
	}
944

945
	error := Wrap(ErrOutOfBrokers, brokerErrors...)
946
	if broker != nil {
947
		Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
948
		return retry(error)
949
	}
950

951
	Logger.Println("client/metadata no available broker to send metadata request to")
952
	client.resurrectDeadBrokers()
953
	return retry(error)
954
}
955

956
// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
957
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
958
	if client.Closed() {
959
		return
960
	}
961

962
	client.lock.Lock()
963
	defer client.lock.Unlock()
964

965
	// For all the brokers we received:
966
	// - if it is a new ID, save it
967
	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
968
	// - if some brokers is not exist in it, remove old broker
969
	// - otherwise ignore it, replacing our existing one would just bounce the connection
970
	client.updateBroker(data.Brokers)
971

972
	client.controllerID = data.ControllerID
973

974
	if allKnownMetaData {
975
		client.metadata = make(map[string]map[int32]*PartitionMetadata)
976
		client.metadataTopics = make(map[string]none)
977
		client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
978
	}
979
	for _, topic := range data.Topics {
980
		// topics must be added firstly to `metadataTopics` to guarantee that all
981
		// requested topics must be recorded to keep them trackable for periodically
982
		// metadata refresh.
983
		if _, exists := client.metadataTopics[topic.Name]; !exists {
984
			client.metadataTopics[topic.Name] = none{}
985
		}
986
		delete(client.metadata, topic.Name)
987
		delete(client.cachedPartitionsResults, topic.Name)
988

989
		switch topic.Err {
990
		case ErrNoError:
991
			// no-op
992
		case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
993
			err = topic.Err
994
			continue
995
		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
996
			err = topic.Err
997
			retry = true
998
			continue
999
		case ErrLeaderNotAvailable: // retry, but store partial partition results
1000
			retry = true
1001
		default: // don't retry, don't store partial results
1002
			Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
1003
			err = topic.Err
1004
			continue
1005
		}
1006

1007
		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
1008
		for _, partition := range topic.Partitions {
1009
			client.metadata[topic.Name][partition.ID] = partition
1010
			if errors.Is(partition.Err, ErrLeaderNotAvailable) {
1011
				retry = true
1012
			}
1013
		}
1014

1015
		var partitionCache [maxPartitionIndex][]int32
1016
		partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
1017
		partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
1018
		client.cachedPartitionsResults[topic.Name] = partitionCache
1019
	}
1020

1021
	return
1022
}
1023

1024
func (client *client) cachedCoordinator(consumerGroup string) *Broker {
1025
	client.lock.RLock()
1026
	defer client.lock.RUnlock()
1027
	if coordinatorID, ok := client.coordinators[consumerGroup]; ok {
1028
		return client.brokers[coordinatorID]
1029
	}
1030
	return nil
1031
}
1032

1033
func (client *client) cachedController() *Broker {
1034
	client.lock.RLock()
1035
	defer client.lock.RUnlock()
1036

1037
	return client.brokers[client.controllerID]
1038
}
1039

1040
func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
1041
	if client.conf.Metadata.Retry.BackoffFunc != nil {
1042
		maxRetries := client.conf.Metadata.Retry.Max
1043
		retries := maxRetries - attemptsRemaining
1044
		return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
1045
	}
1046
	return client.conf.Metadata.Retry.Backoff
1047
}
1048

1049
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
1050
	retry := func(err error) (*FindCoordinatorResponse, error) {
1051
		if attemptsRemaining > 0 {
1052
			backoff := client.computeBackoff(attemptsRemaining)
1053
			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
1054
			time.Sleep(backoff)
1055
			return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
1056
		}
1057
		return nil, err
1058
	}
1059

1060
	brokerErrors := make([]error, 0)
1061
	for broker := client.any(); broker != nil; broker = client.any() {
1062
		DebugLogger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())
1063

1064
		request := new(FindCoordinatorRequest)
1065
		request.CoordinatorKey = consumerGroup
1066
		request.CoordinatorType = CoordinatorGroup
1067

1068
		response, err := broker.FindCoordinator(request)
1069
		if err != nil {
1070
			Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
1071

1072
			var packetEncodingError PacketEncodingError
1073
			if errors.As(err, &packetEncodingError) {
1074
				return nil, err
1075
			} else {
1076
				_ = broker.Close()
1077
				brokerErrors = append(brokerErrors, err)
1078
				client.deregisterBroker(broker)
1079
				continue
1080
			}
1081
		}
1082

1083
		if errors.Is(response.Err, ErrNoError) {
1084
			DebugLogger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
1085
			return response, nil
1086
		} else if errors.Is(response.Err, ErrConsumerCoordinatorNotAvailable) {
1087
			Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
1088

1089
			// This is very ugly, but this scenario will only happen once per cluster.
1090
			// The __consumer_offsets topic only has to be created one time.
1091
			// The number of partitions not configurable, but partition 0 should always exist.
1092
			if _, err := client.Leader("__consumer_offsets", 0); err != nil {
1093
				Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
1094
				time.Sleep(2 * time.Second)
1095
			}
1096

1097
			return retry(ErrConsumerCoordinatorNotAvailable)
1098
		} else if errors.Is(response.Err, ErrGroupAuthorizationFailed) {
1099
			Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup)
1100
			return retry(ErrGroupAuthorizationFailed)
1101
		} else {
1102
			return nil, response.Err
1103
		}
1104
	}
1105

1106
	Logger.Println("client/coordinator no available broker to send consumer metadata request to")
1107
	client.resurrectDeadBrokers()
1108
	return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
1109
}
1110

1111
// nopCloserClient embeds an existing Client, but disables
1112
// the Close method (yet all other methods pass
1113
// through unchanged). This is for use in larger structs
1114
// where it is undesirable to close the client that was
1115
// passed in by the caller.
1116
type nopCloserClient struct {
1117
	Client
1118
}
1119

1120
// Close intercepts and purposely does not call the underlying
1121
// client's Close() method.
1122
func (ncc *nopCloserClient) Close() error {
1123
	return nil
1124
}
1125

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.