cubefs
1198 строк · 32.1 Кб
1package sarama
2
3import (
4"errors"
5"fmt"
6"math/rand"
7"strconv"
8"sync"
9"time"
10)
11
12// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
13// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
14// Methods with stricter requirements will specify the minimum broker version required.
15// You MUST call Close() on a client to avoid leaks
16type ClusterAdmin interface {
17// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
18// It may take several seconds after CreateTopic returns success for all the brokers
19// to become aware that the topic has been created. During this time, listTopics
20// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
21CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
22
23// List the topics available in the cluster with the default options.
24ListTopics() (map[string]TopicDetail, error)
25
26// Describe some topics in the cluster.
27DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
28
29// Delete a topic. It may take several seconds after the DeleteTopic to returns success
30// and for all the brokers to become aware that the topics are gone.
31// During this time, listTopics may continue to return information about the deleted topic.
32// If delete.topic.enable is false on the brokers, deleteTopic will mark
33// the topic for deletion, but not actually delete them.
34// This operation is supported by brokers with version 0.10.1.0 or higher.
35DeleteTopic(topic string) error
36
37// Increase the number of partitions of the topics according to the corresponding values.
38// If partitions are increased for a topic that has a key, the partition logic or ordering of
39// the messages will be affected. It may take several seconds after this method returns
40// success for all the brokers to become aware that the partitions have been created.
41// During this time, ClusterAdmin#describeTopics may not return information about the
42// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
43CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
44
45// Alter the replica assignment for partitions.
46// This operation is supported by brokers with version 2.4.0.0 or higher.
47AlterPartitionReassignments(topic string, assignment [][]int32) error
48
49// Provides info on ongoing partitions replica reassignments.
50// This operation is supported by brokers with version 2.4.0.0 or higher.
51ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
52
53// Delete records whose offset is smaller than the given offset of the corresponding partition.
54// This operation is supported by brokers with version 0.11.0.0 or higher.
55DeleteRecords(topic string, partitionOffsets map[int32]int64) error
56
57// Get the configuration for the specified resources.
58// The returned configuration includes default values and the Default is true
59// can be used to distinguish them from user supplied values.
60// Config entries where ReadOnly is true cannot be updated.
61// The value of config entries where Sensitive is true is always nil so
62// sensitive information is not disclosed.
63// This operation is supported by brokers with version 0.11.0.0 or higher.
64DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
65
66// Update the configuration for the specified resources with the default options.
67// This operation is supported by brokers with version 0.11.0.0 or higher.
68// The resources with their configs (topic is the only resource type with configs
69// that can be updated currently Updates are not transactional so they may succeed
70// for some resources while fail for others. The configs for a particular resource are updated automatically.
71AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
72
73// IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
74// This operation is supported by brokers with version 2.3.0.0 or higher.
75// Updates are not transactional so they may succeed for some resources while fail for others.
76// The configs for a particular resource are updated automatically.
77IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
78
79// Creates an access control list (ACL) which is bound to a specific resource.
80// This operation is not transactional so it may succeed or fail.
81// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
82// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
83// Deprecated: Use CreateACLs instead.
84CreateACL(resource Resource, acl Acl) error
85
86// Creates access control lists (ACLs) which are bound to specific resources.
87// This operation is not transactional so it may succeed for some ACLs while fail for others.
88// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
89// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
90CreateACLs([]*ResourceAcls) error
91
92// Lists access control lists (ACLs) according to the supplied filter.
93// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
94// This operation is supported by brokers with version 0.11.0.0 or higher.
95ListAcls(filter AclFilter) ([]ResourceAcls, error)
96
97// Deletes access control lists (ACLs) according to the supplied filters.
98// This operation is not transactional so it may succeed for some ACLs while fail for others.
99// This operation is supported by brokers with version 0.11.0.0 or higher.
100DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
101
102// List the consumer groups available in the cluster.
103ListConsumerGroups() (map[string]string, error)
104
105// Describe the given consumer groups.
106DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
107
108// List the consumer group offsets available in the cluster.
109ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
110
111// Deletes a consumer group offset
112DeleteConsumerGroupOffset(group string, topic string, partition int32) error
113
114// Delete a consumer group.
115DeleteConsumerGroup(group string) error
116
117// Get information about the nodes in the cluster
118DescribeCluster() (brokers []*Broker, controllerID int32, err error)
119
120// Get information about all log directories on the given set of brokers
121DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
122
123// Get information about SCRAM users
124DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
125
126// Delete SCRAM users
127DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
128
129// Upsert SCRAM users
130UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
131
132// Get client quota configurations corresponding to the specified filter.
133// This operation is supported by brokers with version 2.6.0.0 or higher.
134DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
135
136// Alters client quota configurations with the specified alterations.
137// This operation is supported by brokers with version 2.6.0.0 or higher.
138AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
139
140// Controller returns the cluster controller broker. It will return a
141// locally cached value if it's available.
142Controller() (*Broker, error)
143
144// Close shuts down the admin and closes underlying client.
145Close() error
146}
147
148type clusterAdmin struct {
149client Client
150conf *Config
151}
152
153// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
154func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
155client, err := NewClient(addrs, conf)
156if err != nil {
157return nil, err
158}
159admin, err := NewClusterAdminFromClient(client)
160if err != nil {
161client.Close()
162}
163return admin, err
164}
165
166// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
167// Note that underlying client will also be closed on admin's Close() call.
168func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
169// make sure we can retrieve the controller
170_, err := client.Controller()
171if err != nil {
172return nil, err
173}
174
175ca := &clusterAdmin{
176client: client,
177conf: client.Config(),
178}
179return ca, nil
180}
181
182func (ca *clusterAdmin) Close() error {
183return ca.client.Close()
184}
185
186func (ca *clusterAdmin) Controller() (*Broker, error) {
187return ca.client.Controller()
188}
189
190func (ca *clusterAdmin) refreshController() (*Broker, error) {
191return ca.client.RefreshController()
192}
193
194// isErrNoController returns `true` if the given error type unwraps to an
195// `ErrNotController` response from Kafka
196func isErrNoController(err error) bool {
197return errors.Is(err, ErrNotController)
198}
199
200// retryOnError will repeatedly call the given (error-returning) func in the
201// case that its response is non-nil and retryable (as determined by the
202// provided retryable func) up to the maximum number of tries permitted by
203// the admin client configuration
204func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
205var err error
206for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
207err = fn()
208if err == nil || !retryable(err) {
209return err
210}
211Logger.Printf(
212"admin/request retrying after %dms... (%d attempts remaining)\n",
213ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
214time.Sleep(ca.conf.Admin.Retry.Backoff)
215continue
216}
217return err
218}
219
220func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
221if topic == "" {
222return ErrInvalidTopic
223}
224
225if detail == nil {
226return errors.New("you must specify topic details")
227}
228
229topicDetails := make(map[string]*TopicDetail)
230topicDetails[topic] = detail
231
232request := &CreateTopicsRequest{
233TopicDetails: topicDetails,
234ValidateOnly: validateOnly,
235Timeout: ca.conf.Admin.Timeout,
236}
237
238if ca.conf.Version.IsAtLeast(V0_11_0_0) {
239request.Version = 1
240}
241if ca.conf.Version.IsAtLeast(V1_0_0_0) {
242request.Version = 2
243}
244
245return ca.retryOnError(isErrNoController, func() error {
246b, err := ca.Controller()
247if err != nil {
248return err
249}
250
251rsp, err := b.CreateTopics(request)
252if err != nil {
253return err
254}
255
256topicErr, ok := rsp.TopicErrors[topic]
257if !ok {
258return ErrIncompleteResponse
259}
260
261if !errors.Is(topicErr.Err, ErrNoError) {
262if errors.Is(topicErr.Err, ErrNotController) {
263_, _ = ca.refreshController()
264}
265return topicErr
266}
267
268return nil
269})
270}
271
272func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
273controller, err := ca.Controller()
274if err != nil {
275return nil, err
276}
277
278request := &MetadataRequest{
279Topics: topics,
280AllowAutoTopicCreation: false,
281}
282
283if ca.conf.Version.IsAtLeast(V1_0_0_0) {
284request.Version = 5
285} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
286request.Version = 4
287}
288
289response, err := controller.GetMetadata(request)
290if err != nil {
291return nil, err
292}
293return response.Topics, nil
294}
295
296func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
297controller, err := ca.Controller()
298if err != nil {
299return nil, int32(0), err
300}
301
302request := &MetadataRequest{
303Topics: []string{},
304}
305
306if ca.conf.Version.IsAtLeast(V0_10_0_0) {
307request.Version = 1
308}
309
310response, err := controller.GetMetadata(request)
311if err != nil {
312return nil, int32(0), err
313}
314
315return response.Brokers, response.ControllerID, nil
316}
317
318func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
319brokers := ca.client.Brokers()
320for _, b := range brokers {
321if b.ID() == id {
322return b, nil
323}
324}
325return nil, fmt.Errorf("could not find broker id %d", id)
326}
327
328func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
329brokers := ca.client.Brokers()
330if len(brokers) > 0 {
331index := rand.Intn(len(brokers))
332return brokers[index], nil
333}
334return nil, errors.New("no available broker")
335}
336
337func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
338// In order to build TopicDetails we need to first get the list of all
339// topics using a MetadataRequest and then get their configs using a
340// DescribeConfigsRequest request. To avoid sending many requests to the
341// broker, we use a single DescribeConfigsRequest.
342
343// Send the all-topic MetadataRequest
344b, err := ca.findAnyBroker()
345if err != nil {
346return nil, err
347}
348_ = b.Open(ca.client.Config())
349
350metadataReq := &MetadataRequest{}
351metadataResp, err := b.GetMetadata(metadataReq)
352if err != nil {
353return nil, err
354}
355
356topicsDetailsMap := make(map[string]TopicDetail)
357
358var describeConfigsResources []*ConfigResource
359
360for _, topic := range metadataResp.Topics {
361topicDetails := TopicDetail{
362NumPartitions: int32(len(topic.Partitions)),
363}
364if len(topic.Partitions) > 0 {
365topicDetails.ReplicaAssignment = map[int32][]int32{}
366for _, partition := range topic.Partitions {
367topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
368}
369topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
370}
371topicsDetailsMap[topic.Name] = topicDetails
372
373// we populate the resources we want to describe from the MetadataResponse
374topicResource := ConfigResource{
375Type: TopicResource,
376Name: topic.Name,
377}
378describeConfigsResources = append(describeConfigsResources, &topicResource)
379}
380
381// Send the DescribeConfigsRequest
382describeConfigsReq := &DescribeConfigsRequest{
383Resources: describeConfigsResources,
384}
385
386if ca.conf.Version.IsAtLeast(V1_1_0_0) {
387describeConfigsReq.Version = 1
388}
389
390if ca.conf.Version.IsAtLeast(V2_0_0_0) {
391describeConfigsReq.Version = 2
392}
393
394describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
395if err != nil {
396return nil, err
397}
398
399for _, resource := range describeConfigsResp.Resources {
400topicDetails := topicsDetailsMap[resource.Name]
401topicDetails.ConfigEntries = make(map[string]*string)
402
403for _, entry := range resource.Configs {
404// only include non-default non-sensitive config
405// (don't actually think topic config will ever be sensitive)
406if entry.Default || entry.Sensitive {
407continue
408}
409topicDetails.ConfigEntries[entry.Name] = &entry.Value
410}
411
412topicsDetailsMap[resource.Name] = topicDetails
413}
414
415return topicsDetailsMap, nil
416}
417
418func (ca *clusterAdmin) DeleteTopic(topic string) error {
419if topic == "" {
420return ErrInvalidTopic
421}
422
423request := &DeleteTopicsRequest{
424Topics: []string{topic},
425Timeout: ca.conf.Admin.Timeout,
426}
427
428if ca.conf.Version.IsAtLeast(V0_11_0_0) {
429request.Version = 1
430}
431
432return ca.retryOnError(isErrNoController, func() error {
433b, err := ca.Controller()
434if err != nil {
435return err
436}
437
438rsp, err := b.DeleteTopics(request)
439if err != nil {
440return err
441}
442
443topicErr, ok := rsp.TopicErrorCodes[topic]
444if !ok {
445return ErrIncompleteResponse
446}
447
448if !errors.Is(topicErr, ErrNoError) {
449if errors.Is(topicErr, ErrNotController) {
450_, _ = ca.refreshController()
451}
452return topicErr
453}
454
455return nil
456})
457}
458
459func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
460if topic == "" {
461return ErrInvalidTopic
462}
463
464topicPartitions := make(map[string]*TopicPartition)
465topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
466
467request := &CreatePartitionsRequest{
468TopicPartitions: topicPartitions,
469Timeout: ca.conf.Admin.Timeout,
470ValidateOnly: validateOnly,
471}
472
473return ca.retryOnError(isErrNoController, func() error {
474b, err := ca.Controller()
475if err != nil {
476return err
477}
478
479rsp, err := b.CreatePartitions(request)
480if err != nil {
481return err
482}
483
484topicErr, ok := rsp.TopicPartitionErrors[topic]
485if !ok {
486return ErrIncompleteResponse
487}
488
489if !errors.Is(topicErr.Err, ErrNoError) {
490if errors.Is(topicErr.Err, ErrNotController) {
491_, _ = ca.refreshController()
492}
493return topicErr
494}
495
496return nil
497})
498}
499
500func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
501if topic == "" {
502return ErrInvalidTopic
503}
504
505request := &AlterPartitionReassignmentsRequest{
506TimeoutMs: int32(60000),
507Version: int16(0),
508}
509
510for i := 0; i < len(assignment); i++ {
511request.AddBlock(topic, int32(i), assignment[i])
512}
513
514return ca.retryOnError(isErrNoController, func() error {
515b, err := ca.Controller()
516if err != nil {
517return err
518}
519
520errs := make([]error, 0)
521
522rsp, err := b.AlterPartitionReassignments(request)
523
524if err != nil {
525errs = append(errs, err)
526} else {
527if rsp.ErrorCode > 0 {
528errs = append(errs, rsp.ErrorCode)
529}
530
531for topic, topicErrors := range rsp.Errors {
532for partition, partitionError := range topicErrors {
533if !errors.Is(partitionError.errorCode, ErrNoError) {
534errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
535}
536}
537}
538}
539
540if len(errs) > 0 {
541return Wrap(ErrReassignPartitions, errs...)
542}
543
544return nil
545})
546}
547
548func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
549if topic == "" {
550return nil, ErrInvalidTopic
551}
552
553request := &ListPartitionReassignmentsRequest{
554TimeoutMs: int32(60000),
555Version: int16(0),
556}
557
558request.AddBlock(topic, partitions)
559
560b, err := ca.Controller()
561if err != nil {
562return nil, err
563}
564_ = b.Open(ca.client.Config())
565
566rsp, err := b.ListPartitionReassignments(request)
567
568if err == nil && rsp != nil {
569return rsp.TopicStatus, nil
570} else {
571return nil, err
572}
573}
574
575func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
576if topic == "" {
577return ErrInvalidTopic
578}
579errs := make([]error, 0)
580partitionPerBroker := make(map[*Broker][]int32)
581for partition := range partitionOffsets {
582broker, err := ca.client.Leader(topic, partition)
583if err != nil {
584errs = append(errs, err)
585continue
586}
587partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
588}
589for broker, partitions := range partitionPerBroker {
590topics := make(map[string]*DeleteRecordsRequestTopic)
591recordsToDelete := make(map[int32]int64)
592for _, p := range partitions {
593recordsToDelete[p] = partitionOffsets[p]
594}
595topics[topic] = &DeleteRecordsRequestTopic{
596PartitionOffsets: recordsToDelete,
597}
598request := &DeleteRecordsRequest{
599Topics: topics,
600Timeout: ca.conf.Admin.Timeout,
601}
602rsp, err := broker.DeleteRecords(request)
603if err != nil {
604errs = append(errs, err)
605continue
606}
607
608deleteRecordsResponseTopic, ok := rsp.Topics[topic]
609if !ok {
610errs = append(errs, ErrIncompleteResponse)
611continue
612}
613
614for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
615if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
616errs = append(errs, deleteRecordsResponsePartition.Err)
617continue
618}
619}
620}
621if len(errs) > 0 {
622return Wrap(ErrDeleteRecords, errs...)
623}
624// todo since we are dealing with couple of partitions it would be good if we return slice of errors
625// for each partition instead of one error
626return nil
627}
628
629// Returns a bool indicating whether the resource request needs to go to a
630// specific broker
631func dependsOnSpecificNode(resource ConfigResource) bool {
632return (resource.Type == BrokerResource && resource.Name != "") ||
633resource.Type == BrokerLoggerResource
634}
635
636func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
637var entries []ConfigEntry
638var resources []*ConfigResource
639resources = append(resources, &resource)
640
641request := &DescribeConfigsRequest{
642Resources: resources,
643}
644
645if ca.conf.Version.IsAtLeast(V1_1_0_0) {
646request.Version = 1
647}
648
649if ca.conf.Version.IsAtLeast(V2_0_0_0) {
650request.Version = 2
651}
652
653var (
654b *Broker
655err error
656)
657
658// DescribeConfig of broker/broker logger must be sent to the broker in question
659if dependsOnSpecificNode(resource) {
660var id int64
661id, err = strconv.ParseInt(resource.Name, 10, 32)
662if err != nil {
663return nil, err
664}
665b, err = ca.findBroker(int32(id))
666} else {
667b, err = ca.findAnyBroker()
668}
669if err != nil {
670return nil, err
671}
672
673_ = b.Open(ca.client.Config())
674rsp, err := b.DescribeConfigs(request)
675if err != nil {
676return nil, err
677}
678
679for _, rspResource := range rsp.Resources {
680if rspResource.Name == resource.Name {
681if rspResource.ErrorMsg != "" {
682return nil, errors.New(rspResource.ErrorMsg)
683}
684if rspResource.ErrorCode != 0 {
685return nil, KError(rspResource.ErrorCode)
686}
687for _, cfgEntry := range rspResource.Configs {
688entries = append(entries, *cfgEntry)
689}
690}
691}
692return entries, nil
693}
694
695func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
696var resources []*AlterConfigsResource
697resources = append(resources, &AlterConfigsResource{
698Type: resourceType,
699Name: name,
700ConfigEntries: entries,
701})
702
703request := &AlterConfigsRequest{
704Resources: resources,
705ValidateOnly: validateOnly,
706}
707
708var (
709b *Broker
710err error
711)
712
713// AlterConfig of broker/broker logger must be sent to the broker in question
714if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
715var id int64
716id, err = strconv.ParseInt(name, 10, 32)
717if err != nil {
718return err
719}
720b, err = ca.findBroker(int32(id))
721} else {
722b, err = ca.findAnyBroker()
723}
724if err != nil {
725return err
726}
727
728_ = b.Open(ca.client.Config())
729rsp, err := b.AlterConfigs(request)
730if err != nil {
731return err
732}
733
734for _, rspResource := range rsp.Resources {
735if rspResource.Name == name {
736if rspResource.ErrorMsg != "" {
737return errors.New(rspResource.ErrorMsg)
738}
739if rspResource.ErrorCode != 0 {
740return KError(rspResource.ErrorCode)
741}
742}
743}
744return nil
745}
746
747func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
748var resources []*IncrementalAlterConfigsResource
749resources = append(resources, &IncrementalAlterConfigsResource{
750Type: resourceType,
751Name: name,
752ConfigEntries: entries,
753})
754
755request := &IncrementalAlterConfigsRequest{
756Resources: resources,
757ValidateOnly: validateOnly,
758}
759
760var (
761b *Broker
762err error
763)
764
765// AlterConfig of broker/broker logger must be sent to the broker in question
766if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
767var id int64
768id, err = strconv.ParseInt(name, 10, 32)
769if err != nil {
770return err
771}
772b, err = ca.findBroker(int32(id))
773} else {
774b, err = ca.findAnyBroker()
775}
776if err != nil {
777return err
778}
779
780_ = b.Open(ca.client.Config())
781rsp, err := b.IncrementalAlterConfigs(request)
782if err != nil {
783return err
784}
785
786for _, rspResource := range rsp.Resources {
787if rspResource.Name == name {
788if rspResource.ErrorMsg != "" {
789return errors.New(rspResource.ErrorMsg)
790}
791if rspResource.ErrorCode != 0 {
792return KError(rspResource.ErrorCode)
793}
794}
795}
796return nil
797}
798
799func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
800var acls []*AclCreation
801acls = append(acls, &AclCreation{resource, acl})
802request := &CreateAclsRequest{AclCreations: acls}
803
804if ca.conf.Version.IsAtLeast(V2_0_0_0) {
805request.Version = 1
806}
807
808b, err := ca.Controller()
809if err != nil {
810return err
811}
812
813_, err = b.CreateAcls(request)
814return err
815}
816
817func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
818var acls []*AclCreation
819for _, resourceACL := range resourceACLs {
820for _, acl := range resourceACL.Acls {
821acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
822}
823}
824request := &CreateAclsRequest{AclCreations: acls}
825
826if ca.conf.Version.IsAtLeast(V2_0_0_0) {
827request.Version = 1
828}
829
830b, err := ca.Controller()
831if err != nil {
832return err
833}
834
835_, err = b.CreateAcls(request)
836return err
837}
838
839func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
840request := &DescribeAclsRequest{AclFilter: filter}
841
842if ca.conf.Version.IsAtLeast(V2_0_0_0) {
843request.Version = 1
844}
845
846b, err := ca.Controller()
847if err != nil {
848return nil, err
849}
850
851rsp, err := b.DescribeAcls(request)
852if err != nil {
853return nil, err
854}
855
856var lAcls []ResourceAcls
857for _, rAcl := range rsp.ResourceAcls {
858lAcls = append(lAcls, *rAcl)
859}
860return lAcls, nil
861}
862
863func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
864var filters []*AclFilter
865filters = append(filters, &filter)
866request := &DeleteAclsRequest{Filters: filters}
867
868if ca.conf.Version.IsAtLeast(V2_0_0_0) {
869request.Version = 1
870}
871
872b, err := ca.Controller()
873if err != nil {
874return nil, err
875}
876
877rsp, err := b.DeleteAcls(request)
878if err != nil {
879return nil, err
880}
881
882var mAcls []MatchingAcl
883for _, fr := range rsp.FilterResponses {
884for _, mACL := range fr.MatchingAcls {
885mAcls = append(mAcls, *mACL)
886}
887}
888return mAcls, nil
889}
890
891func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
892groupsPerBroker := make(map[*Broker][]string)
893
894for _, group := range groups {
895controller, err := ca.client.Coordinator(group)
896if err != nil {
897return nil, err
898}
899groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
900}
901
902for broker, brokerGroups := range groupsPerBroker {
903response, err := broker.DescribeGroups(&DescribeGroupsRequest{
904Groups: brokerGroups,
905})
906if err != nil {
907return nil, err
908}
909
910result = append(result, response.Groups...)
911}
912return result, nil
913}
914
915func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
916allGroups = make(map[string]string)
917
918// Query brokers in parallel, since we have to query *all* brokers
919brokers := ca.client.Brokers()
920groupMaps := make(chan map[string]string, len(brokers))
921errChan := make(chan error, len(brokers))
922wg := sync.WaitGroup{}
923
924for _, b := range brokers {
925wg.Add(1)
926go func(b *Broker, conf *Config) {
927defer wg.Done()
928_ = b.Open(conf) // Ensure that broker is opened
929
930response, err := b.ListGroups(&ListGroupsRequest{})
931if err != nil {
932errChan <- err
933return
934}
935
936groups := make(map[string]string)
937for group, typ := range response.Groups {
938groups[group] = typ
939}
940
941groupMaps <- groups
942}(b, ca.conf)
943}
944
945wg.Wait()
946close(groupMaps)
947close(errChan)
948
949for groupMap := range groupMaps {
950for group, protocolType := range groupMap {
951allGroups[group] = protocolType
952}
953}
954
955// Intentionally return only the first error for simplicity
956err = <-errChan
957return
958}
959
960func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
961coordinator, err := ca.client.Coordinator(group)
962if err != nil {
963return nil, err
964}
965
966request := &OffsetFetchRequest{
967ConsumerGroup: group,
968partitions: topicPartitions,
969}
970
971if ca.conf.Version.IsAtLeast(V0_10_2_0) {
972request.Version = 2
973} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
974request.Version = 1
975}
976
977return coordinator.FetchOffset(request)
978}
979
980func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
981coordinator, err := ca.client.Coordinator(group)
982if err != nil {
983return err
984}
985
986request := &DeleteOffsetsRequest{
987Group: group,
988partitions: map[string][]int32{
989topic: {partition},
990},
991}
992
993resp, err := coordinator.DeleteOffsets(request)
994if err != nil {
995return err
996}
997
998if !errors.Is(resp.ErrorCode, ErrNoError) {
999return resp.ErrorCode
1000}
1001
1002if !errors.Is(resp.Errors[topic][partition], ErrNoError) {
1003return resp.Errors[topic][partition]
1004}
1005return nil
1006}
1007
1008func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
1009coordinator, err := ca.client.Coordinator(group)
1010if err != nil {
1011return err
1012}
1013
1014request := &DeleteGroupsRequest{
1015Groups: []string{group},
1016}
1017
1018resp, err := coordinator.DeleteGroups(request)
1019if err != nil {
1020return err
1021}
1022
1023groupErr, ok := resp.GroupErrorCodes[group]
1024if !ok {
1025return ErrIncompleteResponse
1026}
1027
1028if !errors.Is(groupErr, ErrNoError) {
1029return groupErr
1030}
1031
1032return nil
1033}
1034
1035func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
1036allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
1037
1038// Query brokers in parallel, since we may have to query multiple brokers
1039logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
1040errChan := make(chan error, len(brokerIds))
1041wg := sync.WaitGroup{}
1042
1043for _, b := range brokerIds {
1044wg.Add(1)
1045broker, err := ca.findBroker(b)
1046if err != nil {
1047Logger.Printf("Unable to find broker with ID = %v\n", b)
1048continue
1049}
1050go func(b *Broker, conf *Config) {
1051defer wg.Done()
1052_ = b.Open(conf) // Ensure that broker is opened
1053
1054response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
1055if err != nil {
1056errChan <- err
1057return
1058}
1059logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
1060logDirs[b.ID()] = response.LogDirs
1061logDirsMaps <- logDirs
1062}(broker, ca.conf)
1063}
1064
1065wg.Wait()
1066close(logDirsMaps)
1067close(errChan)
1068
1069for logDirsMap := range logDirsMaps {
1070for id, logDirs := range logDirsMap {
1071allLogDirs[id] = logDirs
1072}
1073}
1074
1075// Intentionally return only the first error for simplicity
1076err = <-errChan
1077return
1078}
1079
1080func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
1081req := &DescribeUserScramCredentialsRequest{}
1082for _, u := range users {
1083req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
1084Name: u,
1085})
1086}
1087
1088b, err := ca.Controller()
1089if err != nil {
1090return nil, err
1091}
1092
1093rsp, err := b.DescribeUserScramCredentials(req)
1094if err != nil {
1095return nil, err
1096}
1097
1098return rsp.Results, nil
1099}
1100
1101func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
1102res, err := ca.AlterUserScramCredentials(upsert, nil)
1103if err != nil {
1104return nil, err
1105}
1106
1107return res, nil
1108}
1109
1110func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1111res, err := ca.AlterUserScramCredentials(nil, delete)
1112if err != nil {
1113return nil, err
1114}
1115
1116return res, nil
1117}
1118
1119func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1120req := &AlterUserScramCredentialsRequest{
1121Deletions: d,
1122Upsertions: u,
1123}
1124
1125b, err := ca.Controller()
1126if err != nil {
1127return nil, err
1128}
1129
1130rsp, err := b.AlterUserScramCredentials(req)
1131if err != nil {
1132return nil, err
1133}
1134
1135return rsp.Results, nil
1136}
1137
1138// Describe All : use an empty/nil components slice + strict = false
1139// Contains components: strict = false
1140// Contains only components: strict = true
1141func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
1142request := &DescribeClientQuotasRequest{
1143Components: components,
1144Strict: strict,
1145}
1146
1147b, err := ca.Controller()
1148if err != nil {
1149return nil, err
1150}
1151
1152rsp, err := b.DescribeClientQuotas(request)
1153if err != nil {
1154return nil, err
1155}
1156
1157if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
1158return nil, errors.New(*rsp.ErrorMsg)
1159}
1160if !errors.Is(rsp.ErrorCode, ErrNoError) {
1161return nil, rsp.ErrorCode
1162}
1163
1164return rsp.Entries, nil
1165}
1166
1167func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
1168entry := AlterClientQuotasEntry{
1169Entity: entity,
1170Ops: []ClientQuotasOp{op},
1171}
1172
1173request := &AlterClientQuotasRequest{
1174Entries: []AlterClientQuotasEntry{entry},
1175ValidateOnly: validateOnly,
1176}
1177
1178b, err := ca.Controller()
1179if err != nil {
1180return err
1181}
1182
1183rsp, err := b.AlterClientQuotas(request)
1184if err != nil {
1185return err
1186}
1187
1188for _, entry := range rsp.Entries {
1189if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
1190return errors.New(*entry.ErrorMsg)
1191}
1192if !errors.Is(entry.ErrorCode, ErrNoError) {
1193return entry.ErrorCode
1194}
1195}
1196
1197return nil
1198}
1199