cubefs

Форк
0
1198 строк · 32.1 Кб
1
package sarama
2

3
import (
4
	"errors"
5
	"fmt"
6
	"math/rand"
7
	"strconv"
8
	"sync"
9
	"time"
10
)
11

12
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
13
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
14
// Methods with stricter requirements will specify the minimum broker version required.
15
// You MUST call Close() on a client to avoid leaks
16
type ClusterAdmin interface {
17
	// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
18
	// It may take several seconds after CreateTopic returns success for all the brokers
19
	// to become aware that the topic has been created. During this time, listTopics
20
	// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
21
	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
22

23
	// List the topics available in the cluster with the default options.
24
	ListTopics() (map[string]TopicDetail, error)
25

26
	// Describe some topics in the cluster.
27
	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
28

29
	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
30
	// and for all the brokers to become aware that the topics are gone.
31
	// During this time, listTopics  may continue to return information about the deleted topic.
32
	// If delete.topic.enable is false on the brokers, deleteTopic will mark
33
	// the topic for deletion, but not actually delete them.
34
	// This operation is supported by brokers with version 0.10.1.0 or higher.
35
	DeleteTopic(topic string) error
36

37
	// Increase the number of partitions of the topics  according to the corresponding values.
38
	// If partitions are increased for a topic that has a key, the partition logic or ordering of
39
	// the messages will be affected. It may take several seconds after this method returns
40
	// success for all the brokers to become aware that the partitions have been created.
41
	// During this time, ClusterAdmin#describeTopics may not return information about the
42
	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
43
	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
44

45
	// Alter the replica assignment for partitions.
46
	// This operation is supported by brokers with version 2.4.0.0 or higher.
47
	AlterPartitionReassignments(topic string, assignment [][]int32) error
48

49
	// Provides info on ongoing partitions replica reassignments.
50
	// This operation is supported by brokers with version 2.4.0.0 or higher.
51
	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
52

53
	// Delete records whose offset is smaller than the given offset of the corresponding partition.
54
	// This operation is supported by brokers with version 0.11.0.0 or higher.
55
	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
56

57
	// Get the configuration for the specified resources.
58
	// The returned configuration includes default values and the Default is true
59
	// can be used to distinguish them from user supplied values.
60
	// Config entries where ReadOnly is true cannot be updated.
61
	// The value of config entries where Sensitive is true is always nil so
62
	// sensitive information is not disclosed.
63
	// This operation is supported by brokers with version 0.11.0.0 or higher.
64
	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
65

66
	// Update the configuration for the specified resources with the default options.
67
	// This operation is supported by brokers with version 0.11.0.0 or higher.
68
	// The resources with their configs (topic is the only resource type with configs
69
	// that can be updated currently Updates are not transactional so they may succeed
70
	// for some resources while fail for others. The configs for a particular resource are updated automatically.
71
	AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
72

73
	// IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
74
	// This operation is supported by brokers with version 2.3.0.0 or higher.
75
	// Updates are not transactional so they may succeed for some resources while fail for others.
76
	// The configs for a particular resource are updated automatically.
77
	IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
78

79
	// Creates an access control list (ACL) which is bound to a specific resource.
80
	// This operation is not transactional so it may succeed or fail.
81
	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
82
	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
83
	// Deprecated: Use CreateACLs instead.
84
	CreateACL(resource Resource, acl Acl) error
85

86
	// Creates access control lists (ACLs) which are bound to specific resources.
87
	// This operation is not transactional so it may succeed for some ACLs while fail for others.
88
	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
89
	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
90
	CreateACLs([]*ResourceAcls) error
91

92
	// Lists access control lists (ACLs) according to the supplied filter.
93
	// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
94
	// This operation is supported by brokers with version 0.11.0.0 or higher.
95
	ListAcls(filter AclFilter) ([]ResourceAcls, error)
96

97
	// Deletes access control lists (ACLs) according to the supplied filters.
98
	// This operation is not transactional so it may succeed for some ACLs while fail for others.
99
	// This operation is supported by brokers with version 0.11.0.0 or higher.
100
	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
101

102
	// List the consumer groups available in the cluster.
103
	ListConsumerGroups() (map[string]string, error)
104

105
	// Describe the given consumer groups.
106
	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
107

108
	// List the consumer group offsets available in the cluster.
109
	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
110

111
	// Deletes a consumer group offset
112
	DeleteConsumerGroupOffset(group string, topic string, partition int32) error
113

114
	// Delete a consumer group.
115
	DeleteConsumerGroup(group string) error
116

117
	// Get information about the nodes in the cluster
118
	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
119

120
	// Get information about all log directories on the given set of brokers
121
	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
122

123
	// Get information about SCRAM users
124
	DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
125

126
	// Delete SCRAM users
127
	DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
128

129
	// Upsert SCRAM users
130
	UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
131

132
	// Get client quota configurations corresponding to the specified filter.
133
	// This operation is supported by brokers with version 2.6.0.0 or higher.
134
	DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
135

136
	// Alters client quota configurations with the specified alterations.
137
	// This operation is supported by brokers with version 2.6.0.0 or higher.
138
	AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
139

140
	// Controller returns the cluster controller broker. It will return a
141
	// locally cached value if it's available.
142
	Controller() (*Broker, error)
143

144
	// Close shuts down the admin and closes underlying client.
145
	Close() error
146
}
147

148
type clusterAdmin struct {
149
	client Client
150
	conf   *Config
151
}
152

153
// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
154
func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
155
	client, err := NewClient(addrs, conf)
156
	if err != nil {
157
		return nil, err
158
	}
159
	admin, err := NewClusterAdminFromClient(client)
160
	if err != nil {
161
		client.Close()
162
	}
163
	return admin, err
164
}
165

166
// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
167
// Note that underlying client will also be closed on admin's Close() call.
168
func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
169
	// make sure we can retrieve the controller
170
	_, err := client.Controller()
171
	if err != nil {
172
		return nil, err
173
	}
174

175
	ca := &clusterAdmin{
176
		client: client,
177
		conf:   client.Config(),
178
	}
179
	return ca, nil
180
}
181

182
func (ca *clusterAdmin) Close() error {
183
	return ca.client.Close()
184
}
185

186
func (ca *clusterAdmin) Controller() (*Broker, error) {
187
	return ca.client.Controller()
188
}
189

190
func (ca *clusterAdmin) refreshController() (*Broker, error) {
191
	return ca.client.RefreshController()
192
}
193

194
// isErrNoController returns `true` if the given error type unwraps to an
195
// `ErrNotController` response from Kafka
196
func isErrNoController(err error) bool {
197
	return errors.Is(err, ErrNotController)
198
}
199

200
// retryOnError will repeatedly call the given (error-returning) func in the
201
// case that its response is non-nil and retryable (as determined by the
202
// provided retryable func) up to the maximum number of tries permitted by
203
// the admin client configuration
204
func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
205
	var err error
206
	for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
207
		err = fn()
208
		if err == nil || !retryable(err) {
209
			return err
210
		}
211
		Logger.Printf(
212
			"admin/request retrying after %dms... (%d attempts remaining)\n",
213
			ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
214
		time.Sleep(ca.conf.Admin.Retry.Backoff)
215
		continue
216
	}
217
	return err
218
}
219

220
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
221
	if topic == "" {
222
		return ErrInvalidTopic
223
	}
224

225
	if detail == nil {
226
		return errors.New("you must specify topic details")
227
	}
228

229
	topicDetails := make(map[string]*TopicDetail)
230
	topicDetails[topic] = detail
231

232
	request := &CreateTopicsRequest{
233
		TopicDetails: topicDetails,
234
		ValidateOnly: validateOnly,
235
		Timeout:      ca.conf.Admin.Timeout,
236
	}
237

238
	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
239
		request.Version = 1
240
	}
241
	if ca.conf.Version.IsAtLeast(V1_0_0_0) {
242
		request.Version = 2
243
	}
244

245
	return ca.retryOnError(isErrNoController, func() error {
246
		b, err := ca.Controller()
247
		if err != nil {
248
			return err
249
		}
250

251
		rsp, err := b.CreateTopics(request)
252
		if err != nil {
253
			return err
254
		}
255

256
		topicErr, ok := rsp.TopicErrors[topic]
257
		if !ok {
258
			return ErrIncompleteResponse
259
		}
260

261
		if !errors.Is(topicErr.Err, ErrNoError) {
262
			if errors.Is(topicErr.Err, ErrNotController) {
263
				_, _ = ca.refreshController()
264
			}
265
			return topicErr
266
		}
267

268
		return nil
269
	})
270
}
271

272
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
273
	controller, err := ca.Controller()
274
	if err != nil {
275
		return nil, err
276
	}
277

278
	request := &MetadataRequest{
279
		Topics:                 topics,
280
		AllowAutoTopicCreation: false,
281
	}
282

283
	if ca.conf.Version.IsAtLeast(V1_0_0_0) {
284
		request.Version = 5
285
	} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
286
		request.Version = 4
287
	}
288

289
	response, err := controller.GetMetadata(request)
290
	if err != nil {
291
		return nil, err
292
	}
293
	return response.Topics, nil
294
}
295

296
func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
297
	controller, err := ca.Controller()
298
	if err != nil {
299
		return nil, int32(0), err
300
	}
301

302
	request := &MetadataRequest{
303
		Topics: []string{},
304
	}
305

306
	if ca.conf.Version.IsAtLeast(V0_10_0_0) {
307
		request.Version = 1
308
	}
309

310
	response, err := controller.GetMetadata(request)
311
	if err != nil {
312
		return nil, int32(0), err
313
	}
314

315
	return response.Brokers, response.ControllerID, nil
316
}
317

318
func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
319
	brokers := ca.client.Brokers()
320
	for _, b := range brokers {
321
		if b.ID() == id {
322
			return b, nil
323
		}
324
	}
325
	return nil, fmt.Errorf("could not find broker id %d", id)
326
}
327

328
func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
329
	brokers := ca.client.Brokers()
330
	if len(brokers) > 0 {
331
		index := rand.Intn(len(brokers))
332
		return brokers[index], nil
333
	}
334
	return nil, errors.New("no available broker")
335
}
336

337
func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
338
	// In order to build TopicDetails we need to first get the list of all
339
	// topics using a MetadataRequest and then get their configs using a
340
	// DescribeConfigsRequest request. To avoid sending many requests to the
341
	// broker, we use a single DescribeConfigsRequest.
342

343
	// Send the all-topic MetadataRequest
344
	b, err := ca.findAnyBroker()
345
	if err != nil {
346
		return nil, err
347
	}
348
	_ = b.Open(ca.client.Config())
349

350
	metadataReq := &MetadataRequest{}
351
	metadataResp, err := b.GetMetadata(metadataReq)
352
	if err != nil {
353
		return nil, err
354
	}
355

356
	topicsDetailsMap := make(map[string]TopicDetail)
357

358
	var describeConfigsResources []*ConfigResource
359

360
	for _, topic := range metadataResp.Topics {
361
		topicDetails := TopicDetail{
362
			NumPartitions: int32(len(topic.Partitions)),
363
		}
364
		if len(topic.Partitions) > 0 {
365
			topicDetails.ReplicaAssignment = map[int32][]int32{}
366
			for _, partition := range topic.Partitions {
367
				topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
368
			}
369
			topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
370
		}
371
		topicsDetailsMap[topic.Name] = topicDetails
372

373
		// we populate the resources we want to describe from the MetadataResponse
374
		topicResource := ConfigResource{
375
			Type: TopicResource,
376
			Name: topic.Name,
377
		}
378
		describeConfigsResources = append(describeConfigsResources, &topicResource)
379
	}
380

381
	// Send the DescribeConfigsRequest
382
	describeConfigsReq := &DescribeConfigsRequest{
383
		Resources: describeConfigsResources,
384
	}
385

386
	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
387
		describeConfigsReq.Version = 1
388
	}
389

390
	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
391
		describeConfigsReq.Version = 2
392
	}
393

394
	describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
395
	if err != nil {
396
		return nil, err
397
	}
398

399
	for _, resource := range describeConfigsResp.Resources {
400
		topicDetails := topicsDetailsMap[resource.Name]
401
		topicDetails.ConfigEntries = make(map[string]*string)
402

403
		for _, entry := range resource.Configs {
404
			// only include non-default non-sensitive config
405
			// (don't actually think topic config will ever be sensitive)
406
			if entry.Default || entry.Sensitive {
407
				continue
408
			}
409
			topicDetails.ConfigEntries[entry.Name] = &entry.Value
410
		}
411

412
		topicsDetailsMap[resource.Name] = topicDetails
413
	}
414

415
	return topicsDetailsMap, nil
416
}
417

418
func (ca *clusterAdmin) DeleteTopic(topic string) error {
419
	if topic == "" {
420
		return ErrInvalidTopic
421
	}
422

423
	request := &DeleteTopicsRequest{
424
		Topics:  []string{topic},
425
		Timeout: ca.conf.Admin.Timeout,
426
	}
427

428
	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
429
		request.Version = 1
430
	}
431

432
	return ca.retryOnError(isErrNoController, func() error {
433
		b, err := ca.Controller()
434
		if err != nil {
435
			return err
436
		}
437

438
		rsp, err := b.DeleteTopics(request)
439
		if err != nil {
440
			return err
441
		}
442

443
		topicErr, ok := rsp.TopicErrorCodes[topic]
444
		if !ok {
445
			return ErrIncompleteResponse
446
		}
447

448
		if !errors.Is(topicErr, ErrNoError) {
449
			if errors.Is(topicErr, ErrNotController) {
450
				_, _ = ca.refreshController()
451
			}
452
			return topicErr
453
		}
454

455
		return nil
456
	})
457
}
458

459
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
460
	if topic == "" {
461
		return ErrInvalidTopic
462
	}
463

464
	topicPartitions := make(map[string]*TopicPartition)
465
	topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
466

467
	request := &CreatePartitionsRequest{
468
		TopicPartitions: topicPartitions,
469
		Timeout:         ca.conf.Admin.Timeout,
470
		ValidateOnly:    validateOnly,
471
	}
472

473
	return ca.retryOnError(isErrNoController, func() error {
474
		b, err := ca.Controller()
475
		if err != nil {
476
			return err
477
		}
478

479
		rsp, err := b.CreatePartitions(request)
480
		if err != nil {
481
			return err
482
		}
483

484
		topicErr, ok := rsp.TopicPartitionErrors[topic]
485
		if !ok {
486
			return ErrIncompleteResponse
487
		}
488

489
		if !errors.Is(topicErr.Err, ErrNoError) {
490
			if errors.Is(topicErr.Err, ErrNotController) {
491
				_, _ = ca.refreshController()
492
			}
493
			return topicErr
494
		}
495

496
		return nil
497
	})
498
}
499

500
func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
501
	if topic == "" {
502
		return ErrInvalidTopic
503
	}
504

505
	request := &AlterPartitionReassignmentsRequest{
506
		TimeoutMs: int32(60000),
507
		Version:   int16(0),
508
	}
509

510
	for i := 0; i < len(assignment); i++ {
511
		request.AddBlock(topic, int32(i), assignment[i])
512
	}
513

514
	return ca.retryOnError(isErrNoController, func() error {
515
		b, err := ca.Controller()
516
		if err != nil {
517
			return err
518
		}
519

520
		errs := make([]error, 0)
521

522
		rsp, err := b.AlterPartitionReassignments(request)
523

524
		if err != nil {
525
			errs = append(errs, err)
526
		} else {
527
			if rsp.ErrorCode > 0 {
528
				errs = append(errs, rsp.ErrorCode)
529
			}
530

531
			for topic, topicErrors := range rsp.Errors {
532
				for partition, partitionError := range topicErrors {
533
					if !errors.Is(partitionError.errorCode, ErrNoError) {
534
						errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
535
					}
536
				}
537
			}
538
		}
539

540
		if len(errs) > 0 {
541
			return Wrap(ErrReassignPartitions, errs...)
542
		}
543

544
		return nil
545
	})
546
}
547

548
func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
549
	if topic == "" {
550
		return nil, ErrInvalidTopic
551
	}
552

553
	request := &ListPartitionReassignmentsRequest{
554
		TimeoutMs: int32(60000),
555
		Version:   int16(0),
556
	}
557

558
	request.AddBlock(topic, partitions)
559

560
	b, err := ca.Controller()
561
	if err != nil {
562
		return nil, err
563
	}
564
	_ = b.Open(ca.client.Config())
565

566
	rsp, err := b.ListPartitionReassignments(request)
567

568
	if err == nil && rsp != nil {
569
		return rsp.TopicStatus, nil
570
	} else {
571
		return nil, err
572
	}
573
}
574

575
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
576
	if topic == "" {
577
		return ErrInvalidTopic
578
	}
579
	errs := make([]error, 0)
580
	partitionPerBroker := make(map[*Broker][]int32)
581
	for partition := range partitionOffsets {
582
		broker, err := ca.client.Leader(topic, partition)
583
		if err != nil {
584
			errs = append(errs, err)
585
			continue
586
		}
587
		partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
588
	}
589
	for broker, partitions := range partitionPerBroker {
590
		topics := make(map[string]*DeleteRecordsRequestTopic)
591
		recordsToDelete := make(map[int32]int64)
592
		for _, p := range partitions {
593
			recordsToDelete[p] = partitionOffsets[p]
594
		}
595
		topics[topic] = &DeleteRecordsRequestTopic{
596
			PartitionOffsets: recordsToDelete,
597
		}
598
		request := &DeleteRecordsRequest{
599
			Topics:  topics,
600
			Timeout: ca.conf.Admin.Timeout,
601
		}
602
		rsp, err := broker.DeleteRecords(request)
603
		if err != nil {
604
			errs = append(errs, err)
605
			continue
606
		}
607

608
		deleteRecordsResponseTopic, ok := rsp.Topics[topic]
609
		if !ok {
610
			errs = append(errs, ErrIncompleteResponse)
611
			continue
612
		}
613

614
		for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
615
			if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
616
				errs = append(errs, deleteRecordsResponsePartition.Err)
617
				continue
618
			}
619
		}
620
	}
621
	if len(errs) > 0 {
622
		return Wrap(ErrDeleteRecords, errs...)
623
	}
624
	// todo since we are dealing with couple of partitions it would be good if we return slice of errors
625
	// for each partition instead of one error
626
	return nil
627
}
628

629
// Returns a bool indicating whether the resource request needs to go to a
630
// specific broker
631
func dependsOnSpecificNode(resource ConfigResource) bool {
632
	return (resource.Type == BrokerResource && resource.Name != "") ||
633
		resource.Type == BrokerLoggerResource
634
}
635

636
func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
637
	var entries []ConfigEntry
638
	var resources []*ConfigResource
639
	resources = append(resources, &resource)
640

641
	request := &DescribeConfigsRequest{
642
		Resources: resources,
643
	}
644

645
	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
646
		request.Version = 1
647
	}
648

649
	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
650
		request.Version = 2
651
	}
652

653
	var (
654
		b   *Broker
655
		err error
656
	)
657

658
	// DescribeConfig of broker/broker logger must be sent to the broker in question
659
	if dependsOnSpecificNode(resource) {
660
		var id int64
661
		id, err = strconv.ParseInt(resource.Name, 10, 32)
662
		if err != nil {
663
			return nil, err
664
		}
665
		b, err = ca.findBroker(int32(id))
666
	} else {
667
		b, err = ca.findAnyBroker()
668
	}
669
	if err != nil {
670
		return nil, err
671
	}
672

673
	_ = b.Open(ca.client.Config())
674
	rsp, err := b.DescribeConfigs(request)
675
	if err != nil {
676
		return nil, err
677
	}
678

679
	for _, rspResource := range rsp.Resources {
680
		if rspResource.Name == resource.Name {
681
			if rspResource.ErrorMsg != "" {
682
				return nil, errors.New(rspResource.ErrorMsg)
683
			}
684
			if rspResource.ErrorCode != 0 {
685
				return nil, KError(rspResource.ErrorCode)
686
			}
687
			for _, cfgEntry := range rspResource.Configs {
688
				entries = append(entries, *cfgEntry)
689
			}
690
		}
691
	}
692
	return entries, nil
693
}
694

695
func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
696
	var resources []*AlterConfigsResource
697
	resources = append(resources, &AlterConfigsResource{
698
		Type:          resourceType,
699
		Name:          name,
700
		ConfigEntries: entries,
701
	})
702

703
	request := &AlterConfigsRequest{
704
		Resources:    resources,
705
		ValidateOnly: validateOnly,
706
	}
707

708
	var (
709
		b   *Broker
710
		err error
711
	)
712

713
	// AlterConfig of broker/broker logger must be sent to the broker in question
714
	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
715
		var id int64
716
		id, err = strconv.ParseInt(name, 10, 32)
717
		if err != nil {
718
			return err
719
		}
720
		b, err = ca.findBroker(int32(id))
721
	} else {
722
		b, err = ca.findAnyBroker()
723
	}
724
	if err != nil {
725
		return err
726
	}
727

728
	_ = b.Open(ca.client.Config())
729
	rsp, err := b.AlterConfigs(request)
730
	if err != nil {
731
		return err
732
	}
733

734
	for _, rspResource := range rsp.Resources {
735
		if rspResource.Name == name {
736
			if rspResource.ErrorMsg != "" {
737
				return errors.New(rspResource.ErrorMsg)
738
			}
739
			if rspResource.ErrorCode != 0 {
740
				return KError(rspResource.ErrorCode)
741
			}
742
		}
743
	}
744
	return nil
745
}
746

747
func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
748
	var resources []*IncrementalAlterConfigsResource
749
	resources = append(resources, &IncrementalAlterConfigsResource{
750
		Type:          resourceType,
751
		Name:          name,
752
		ConfigEntries: entries,
753
	})
754

755
	request := &IncrementalAlterConfigsRequest{
756
		Resources:    resources,
757
		ValidateOnly: validateOnly,
758
	}
759

760
	var (
761
		b   *Broker
762
		err error
763
	)
764

765
	// AlterConfig of broker/broker logger must be sent to the broker in question
766
	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
767
		var id int64
768
		id, err = strconv.ParseInt(name, 10, 32)
769
		if err != nil {
770
			return err
771
		}
772
		b, err = ca.findBroker(int32(id))
773
	} else {
774
		b, err = ca.findAnyBroker()
775
	}
776
	if err != nil {
777
		return err
778
	}
779

780
	_ = b.Open(ca.client.Config())
781
	rsp, err := b.IncrementalAlterConfigs(request)
782
	if err != nil {
783
		return err
784
	}
785

786
	for _, rspResource := range rsp.Resources {
787
		if rspResource.Name == name {
788
			if rspResource.ErrorMsg != "" {
789
				return errors.New(rspResource.ErrorMsg)
790
			}
791
			if rspResource.ErrorCode != 0 {
792
				return KError(rspResource.ErrorCode)
793
			}
794
		}
795
	}
796
	return nil
797
}
798

799
func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
800
	var acls []*AclCreation
801
	acls = append(acls, &AclCreation{resource, acl})
802
	request := &CreateAclsRequest{AclCreations: acls}
803

804
	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
805
		request.Version = 1
806
	}
807

808
	b, err := ca.Controller()
809
	if err != nil {
810
		return err
811
	}
812

813
	_, err = b.CreateAcls(request)
814
	return err
815
}
816

817
func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
818
	var acls []*AclCreation
819
	for _, resourceACL := range resourceACLs {
820
		for _, acl := range resourceACL.Acls {
821
			acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
822
		}
823
	}
824
	request := &CreateAclsRequest{AclCreations: acls}
825

826
	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
827
		request.Version = 1
828
	}
829

830
	b, err := ca.Controller()
831
	if err != nil {
832
		return err
833
	}
834

835
	_, err = b.CreateAcls(request)
836
	return err
837
}
838

839
func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
840
	request := &DescribeAclsRequest{AclFilter: filter}
841

842
	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
843
		request.Version = 1
844
	}
845

846
	b, err := ca.Controller()
847
	if err != nil {
848
		return nil, err
849
	}
850

851
	rsp, err := b.DescribeAcls(request)
852
	if err != nil {
853
		return nil, err
854
	}
855

856
	var lAcls []ResourceAcls
857
	for _, rAcl := range rsp.ResourceAcls {
858
		lAcls = append(lAcls, *rAcl)
859
	}
860
	return lAcls, nil
861
}
862

863
func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
864
	var filters []*AclFilter
865
	filters = append(filters, &filter)
866
	request := &DeleteAclsRequest{Filters: filters}
867

868
	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
869
		request.Version = 1
870
	}
871

872
	b, err := ca.Controller()
873
	if err != nil {
874
		return nil, err
875
	}
876

877
	rsp, err := b.DeleteAcls(request)
878
	if err != nil {
879
		return nil, err
880
	}
881

882
	var mAcls []MatchingAcl
883
	for _, fr := range rsp.FilterResponses {
884
		for _, mACL := range fr.MatchingAcls {
885
			mAcls = append(mAcls, *mACL)
886
		}
887
	}
888
	return mAcls, nil
889
}
890

891
func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
892
	groupsPerBroker := make(map[*Broker][]string)
893

894
	for _, group := range groups {
895
		controller, err := ca.client.Coordinator(group)
896
		if err != nil {
897
			return nil, err
898
		}
899
		groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
900
	}
901

902
	for broker, brokerGroups := range groupsPerBroker {
903
		response, err := broker.DescribeGroups(&DescribeGroupsRequest{
904
			Groups: brokerGroups,
905
		})
906
		if err != nil {
907
			return nil, err
908
		}
909

910
		result = append(result, response.Groups...)
911
	}
912
	return result, nil
913
}
914

915
func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
916
	allGroups = make(map[string]string)
917

918
	// Query brokers in parallel, since we have to query *all* brokers
919
	brokers := ca.client.Brokers()
920
	groupMaps := make(chan map[string]string, len(brokers))
921
	errChan := make(chan error, len(brokers))
922
	wg := sync.WaitGroup{}
923

924
	for _, b := range brokers {
925
		wg.Add(1)
926
		go func(b *Broker, conf *Config) {
927
			defer wg.Done()
928
			_ = b.Open(conf) // Ensure that broker is opened
929

930
			response, err := b.ListGroups(&ListGroupsRequest{})
931
			if err != nil {
932
				errChan <- err
933
				return
934
			}
935

936
			groups := make(map[string]string)
937
			for group, typ := range response.Groups {
938
				groups[group] = typ
939
			}
940

941
			groupMaps <- groups
942
		}(b, ca.conf)
943
	}
944

945
	wg.Wait()
946
	close(groupMaps)
947
	close(errChan)
948

949
	for groupMap := range groupMaps {
950
		for group, protocolType := range groupMap {
951
			allGroups[group] = protocolType
952
		}
953
	}
954

955
	// Intentionally return only the first error for simplicity
956
	err = <-errChan
957
	return
958
}
959

960
func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
961
	coordinator, err := ca.client.Coordinator(group)
962
	if err != nil {
963
		return nil, err
964
	}
965

966
	request := &OffsetFetchRequest{
967
		ConsumerGroup: group,
968
		partitions:    topicPartitions,
969
	}
970

971
	if ca.conf.Version.IsAtLeast(V0_10_2_0) {
972
		request.Version = 2
973
	} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
974
		request.Version = 1
975
	}
976

977
	return coordinator.FetchOffset(request)
978
}
979

980
func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
981
	coordinator, err := ca.client.Coordinator(group)
982
	if err != nil {
983
		return err
984
	}
985

986
	request := &DeleteOffsetsRequest{
987
		Group: group,
988
		partitions: map[string][]int32{
989
			topic: {partition},
990
		},
991
	}
992

993
	resp, err := coordinator.DeleteOffsets(request)
994
	if err != nil {
995
		return err
996
	}
997

998
	if !errors.Is(resp.ErrorCode, ErrNoError) {
999
		return resp.ErrorCode
1000
	}
1001

1002
	if !errors.Is(resp.Errors[topic][partition], ErrNoError) {
1003
		return resp.Errors[topic][partition]
1004
	}
1005
	return nil
1006
}
1007

1008
func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
1009
	coordinator, err := ca.client.Coordinator(group)
1010
	if err != nil {
1011
		return err
1012
	}
1013

1014
	request := &DeleteGroupsRequest{
1015
		Groups: []string{group},
1016
	}
1017

1018
	resp, err := coordinator.DeleteGroups(request)
1019
	if err != nil {
1020
		return err
1021
	}
1022

1023
	groupErr, ok := resp.GroupErrorCodes[group]
1024
	if !ok {
1025
		return ErrIncompleteResponse
1026
	}
1027

1028
	if !errors.Is(groupErr, ErrNoError) {
1029
		return groupErr
1030
	}
1031

1032
	return nil
1033
}
1034

1035
func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
1036
	allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
1037

1038
	// Query brokers in parallel, since we may have to query multiple brokers
1039
	logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
1040
	errChan := make(chan error, len(brokerIds))
1041
	wg := sync.WaitGroup{}
1042

1043
	for _, b := range brokerIds {
1044
		wg.Add(1)
1045
		broker, err := ca.findBroker(b)
1046
		if err != nil {
1047
			Logger.Printf("Unable to find broker with ID = %v\n", b)
1048
			continue
1049
		}
1050
		go func(b *Broker, conf *Config) {
1051
			defer wg.Done()
1052
			_ = b.Open(conf) // Ensure that broker is opened
1053

1054
			response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
1055
			if err != nil {
1056
				errChan <- err
1057
				return
1058
			}
1059
			logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
1060
			logDirs[b.ID()] = response.LogDirs
1061
			logDirsMaps <- logDirs
1062
		}(broker, ca.conf)
1063
	}
1064

1065
	wg.Wait()
1066
	close(logDirsMaps)
1067
	close(errChan)
1068

1069
	for logDirsMap := range logDirsMaps {
1070
		for id, logDirs := range logDirsMap {
1071
			allLogDirs[id] = logDirs
1072
		}
1073
	}
1074

1075
	// Intentionally return only the first error for simplicity
1076
	err = <-errChan
1077
	return
1078
}
1079

1080
func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
1081
	req := &DescribeUserScramCredentialsRequest{}
1082
	for _, u := range users {
1083
		req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
1084
			Name: u,
1085
		})
1086
	}
1087

1088
	b, err := ca.Controller()
1089
	if err != nil {
1090
		return nil, err
1091
	}
1092

1093
	rsp, err := b.DescribeUserScramCredentials(req)
1094
	if err != nil {
1095
		return nil, err
1096
	}
1097

1098
	return rsp.Results, nil
1099
}
1100

1101
func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
1102
	res, err := ca.AlterUserScramCredentials(upsert, nil)
1103
	if err != nil {
1104
		return nil, err
1105
	}
1106

1107
	return res, nil
1108
}
1109

1110
func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1111
	res, err := ca.AlterUserScramCredentials(nil, delete)
1112
	if err != nil {
1113
		return nil, err
1114
	}
1115

1116
	return res, nil
1117
}
1118

1119
func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1120
	req := &AlterUserScramCredentialsRequest{
1121
		Deletions:  d,
1122
		Upsertions: u,
1123
	}
1124

1125
	b, err := ca.Controller()
1126
	if err != nil {
1127
		return nil, err
1128
	}
1129

1130
	rsp, err := b.AlterUserScramCredentials(req)
1131
	if err != nil {
1132
		return nil, err
1133
	}
1134

1135
	return rsp.Results, nil
1136
}
1137

1138
// Describe All : use an empty/nil components slice + strict = false
1139
// Contains components: strict = false
1140
// Contains only components: strict = true
1141
func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
1142
	request := &DescribeClientQuotasRequest{
1143
		Components: components,
1144
		Strict:     strict,
1145
	}
1146

1147
	b, err := ca.Controller()
1148
	if err != nil {
1149
		return nil, err
1150
	}
1151

1152
	rsp, err := b.DescribeClientQuotas(request)
1153
	if err != nil {
1154
		return nil, err
1155
	}
1156

1157
	if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
1158
		return nil, errors.New(*rsp.ErrorMsg)
1159
	}
1160
	if !errors.Is(rsp.ErrorCode, ErrNoError) {
1161
		return nil, rsp.ErrorCode
1162
	}
1163

1164
	return rsp.Entries, nil
1165
}
1166

1167
func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
1168
	entry := AlterClientQuotasEntry{
1169
		Entity: entity,
1170
		Ops:    []ClientQuotasOp{op},
1171
	}
1172

1173
	request := &AlterClientQuotasRequest{
1174
		Entries:      []AlterClientQuotasEntry{entry},
1175
		ValidateOnly: validateOnly,
1176
	}
1177

1178
	b, err := ca.Controller()
1179
	if err != nil {
1180
		return err
1181
	}
1182

1183
	rsp, err := b.AlterClientQuotas(request)
1184
	if err != nil {
1185
		return err
1186
	}
1187

1188
	for _, entry := range rsp.Entries {
1189
		if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
1190
			return errors.New(*entry.ErrorMsg)
1191
		}
1192
		if !errors.Is(entry.ErrorCode, ErrNoError) {
1193
			return entry.ErrorCode
1194
		}
1195
	}
1196

1197
	return nil
1198
}
1199

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.