cubefs
434 строки · 21.3 Кб
1package sarama
2
3import (
4"errors"
5"fmt"
6"strings"
7
8"github.com/hashicorp/go-multierror"
9)
10
11// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
12// or otherwise failed to respond.
13var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")
14
15// ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
16var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")
17
18// ErrClosedClient is the error returned when a method is called on a client that has been closed.
19var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
20
21// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
22// not contain the expected information.
23var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
24
25// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
26// (meaning one outside of the range [0...numPartitions-1]).
27var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
28
29// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
30var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
31
32// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
33var ErrNotConnected = errors.New("kafka: broker not connected")
34
35// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
36// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
37// of the message set.
38var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
39
40// ErrShuttingDown is returned when a producer receives a message during shutdown.
41var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
42
43// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
44var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
45
46// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
47// a RecordBatch.
48var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
49
50// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
51// is lower than 0.10.0.0.
52var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
53
54// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
55// the metadata.
56var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
57
58// ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
59var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")
60
61// ErrReassignPartitions is returned when altering partition assignments for a topic fails
62var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")
63
64// ErrDeleteRecords is the type of error returned when fail to delete the required records
65var ErrDeleteRecords = errors.New("kafka server: failed to delete records")
66
67// ErrCreateACLs is the type of error returned when ACL creation failed
68var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")
69
70// MultiErrorFormat specifies the formatter applied to format multierrors. The
71// default implementation is a consensed version of the hashicorp/go-multierror
72// default one
73var MultiErrorFormat multierror.ErrorFormatFunc = func(es []error) string {
74if len(es) == 1 {
75return es[0].Error()
76}
77
78points := make([]string, len(es))
79for i, err := range es {
80points[i] = fmt.Sprintf("* %s", err)
81}
82
83return fmt.Sprintf(
84"%d errors occurred:\n\t%s\n",
85len(es), strings.Join(points, "\n\t"))
86}
87
88type sentinelError struct {
89sentinel error
90wrapped error
91}
92
93func (err sentinelError) Error() string {
94if err.wrapped != nil {
95return fmt.Sprintf("%s: %v", err.sentinel, err.wrapped)
96} else {
97return fmt.Sprintf("%s", err.sentinel)
98}
99}
100
101func (err sentinelError) Is(target error) bool {
102return errors.Is(err.sentinel, target) || errors.Is(err.wrapped, target)
103}
104
105func (err sentinelError) Unwrap() error {
106return err.wrapped
107}
108
109func Wrap(sentinel error, wrapped ...error) sentinelError {
110return sentinelError{sentinel: sentinel, wrapped: multiError(wrapped...)}
111}
112
113func multiError(wrapped ...error) error {
114merr := multierror.Append(nil, wrapped...)
115if MultiErrorFormat != nil {
116merr.ErrorFormat = MultiErrorFormat
117}
118return merr.ErrorOrNil()
119}
120
121// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
122// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
123type PacketEncodingError struct {
124Info string
125}
126
127func (err PacketEncodingError) Error() string {
128return fmt.Sprintf("kafka: error encoding packet: %s", err.Info)
129}
130
131// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
132// This can be a bad CRC or length field, or any other invalid value.
133type PacketDecodingError struct {
134Info string
135}
136
137func (err PacketDecodingError) Error() string {
138return fmt.Sprintf("kafka: error decoding packet: %s", err.Info)
139}
140
141// ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer)
142// when the specified configuration is invalid.
143type ConfigurationError string
144
145func (err ConfigurationError) Error() string {
146return "kafka: invalid configuration (" + string(err) + ")"
147}
148
149// KError is the type of error that can be returned directly by the Kafka broker.
150// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
151type KError int16
152
153// Numeric error codes returned by the Kafka server.
154const (
155ErrNoError KError = 0
156ErrUnknown KError = -1
157ErrOffsetOutOfRange KError = 1
158ErrInvalidMessage KError = 2
159ErrUnknownTopicOrPartition KError = 3
160ErrInvalidMessageSize KError = 4
161ErrLeaderNotAvailable KError = 5
162ErrNotLeaderForPartition KError = 6
163ErrRequestTimedOut KError = 7
164ErrBrokerNotAvailable KError = 8
165ErrReplicaNotAvailable KError = 9
166ErrMessageSizeTooLarge KError = 10
167ErrStaleControllerEpochCode KError = 11
168ErrOffsetMetadataTooLarge KError = 12
169ErrNetworkException KError = 13
170ErrOffsetsLoadInProgress KError = 14
171ErrConsumerCoordinatorNotAvailable KError = 15
172ErrNotCoordinatorForConsumer KError = 16
173ErrInvalidTopic KError = 17
174ErrMessageSetSizeTooLarge KError = 18
175ErrNotEnoughReplicas KError = 19
176ErrNotEnoughReplicasAfterAppend KError = 20
177ErrInvalidRequiredAcks KError = 21
178ErrIllegalGeneration KError = 22
179ErrInconsistentGroupProtocol KError = 23
180ErrInvalidGroupId KError = 24
181ErrUnknownMemberId KError = 25
182ErrInvalidSessionTimeout KError = 26
183ErrRebalanceInProgress KError = 27
184ErrInvalidCommitOffsetSize KError = 28
185ErrTopicAuthorizationFailed KError = 29
186ErrGroupAuthorizationFailed KError = 30
187ErrClusterAuthorizationFailed KError = 31
188ErrInvalidTimestamp KError = 32
189ErrUnsupportedSASLMechanism KError = 33
190ErrIllegalSASLState KError = 34
191ErrUnsupportedVersion KError = 35
192ErrTopicAlreadyExists KError = 36
193ErrInvalidPartitions KError = 37
194ErrInvalidReplicationFactor KError = 38
195ErrInvalidReplicaAssignment KError = 39
196ErrInvalidConfig KError = 40
197ErrNotController KError = 41
198ErrInvalidRequest KError = 42
199ErrUnsupportedForMessageFormat KError = 43
200ErrPolicyViolation KError = 44
201ErrOutOfOrderSequenceNumber KError = 45
202ErrDuplicateSequenceNumber KError = 46
203ErrInvalidProducerEpoch KError = 47
204ErrInvalidTxnState KError = 48
205ErrInvalidProducerIDMapping KError = 49
206ErrInvalidTransactionTimeout KError = 50
207ErrConcurrentTransactions KError = 51
208ErrTransactionCoordinatorFenced KError = 52
209ErrTransactionalIDAuthorizationFailed KError = 53
210ErrSecurityDisabled KError = 54
211ErrOperationNotAttempted KError = 55
212ErrKafkaStorageError KError = 56
213ErrLogDirNotFound KError = 57
214ErrSASLAuthenticationFailed KError = 58
215ErrUnknownProducerID KError = 59
216ErrReassignmentInProgress KError = 60
217ErrDelegationTokenAuthDisabled KError = 61
218ErrDelegationTokenNotFound KError = 62
219ErrDelegationTokenOwnerMismatch KError = 63
220ErrDelegationTokenRequestNotAllowed KError = 64
221ErrDelegationTokenAuthorizationFailed KError = 65
222ErrDelegationTokenExpired KError = 66
223ErrInvalidPrincipalType KError = 67
224ErrNonEmptyGroup KError = 68
225ErrGroupIDNotFound KError = 69
226ErrFetchSessionIDNotFound KError = 70
227ErrInvalidFetchSessionEpoch KError = 71
228ErrListenerNotFound KError = 72
229ErrTopicDeletionDisabled KError = 73
230ErrFencedLeaderEpoch KError = 74
231ErrUnknownLeaderEpoch KError = 75
232ErrUnsupportedCompressionType KError = 76
233ErrStaleBrokerEpoch KError = 77
234ErrOffsetNotAvailable KError = 78
235ErrMemberIdRequired KError = 79
236ErrPreferredLeaderNotAvailable KError = 80
237ErrGroupMaxSizeReached KError = 81
238ErrFencedInstancedId KError = 82
239ErrEligibleLeadersNotAvailable KError = 83
240ErrElectionNotNeeded KError = 84
241ErrNoReassignmentInProgress KError = 85
242ErrGroupSubscribedToTopic KError = 86
243ErrInvalidRecord KError = 87
244ErrUnstableOffsetCommit KError = 88
245)
246
247func (err KError) Error() string {
248// Error messages stolen/adapted from
249// https://kafka.apache.org/protocol#protocol_error_codes
250switch err {
251case ErrNoError:
252return "kafka server: Not an error, why are you printing me?"
253case ErrUnknown:
254return "kafka server: Unexpected (unknown?) server error"
255case ErrOffsetOutOfRange:
256return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition"
257case ErrInvalidMessage:
258return "kafka server: Message contents does not match its CRC"
259case ErrUnknownTopicOrPartition:
260return "kafka server: Request was for a topic or partition that does not exist on this broker"
261case ErrInvalidMessageSize:
262return "kafka server: The message has a negative size"
263case ErrLeaderNotAvailable:
264return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes"
265case ErrNotLeaderForPartition:
266return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date"
267case ErrRequestTimedOut:
268return "kafka server: Request exceeded the user-specified time limit in the request"
269case ErrBrokerNotAvailable:
270return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
271case ErrReplicaNotAvailable:
272return "kafka server: Replica information not available, one or more brokers are down"
273case ErrMessageSizeTooLarge:
274return "kafka server: Message was too large, server rejected it to avoid allocation error"
275case ErrStaleControllerEpochCode:
276return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)"
277case ErrOffsetMetadataTooLarge:
278return "kafka server: Specified a string larger than the configured maximum for offset metadata"
279case ErrNetworkException:
280return "kafka server: The server disconnected before a response was received"
281case ErrOffsetsLoadInProgress:
282return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition"
283case ErrConsumerCoordinatorNotAvailable:
284return "kafka server: Offset's topic has not yet been created"
285case ErrNotCoordinatorForConsumer:
286return "kafka server: Request was for a consumer group that is not coordinated by this broker"
287case ErrInvalidTopic:
288return "kafka server: The request attempted to perform an operation on an invalid topic"
289case ErrMessageSetSizeTooLarge:
290return "kafka server: The request included message batch larger than the configured segment size on the server"
291case ErrNotEnoughReplicas:
292return "kafka server: Messages are rejected since there are fewer in-sync replicas than required"
293case ErrNotEnoughReplicasAfterAppend:
294return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required"
295case ErrInvalidRequiredAcks:
296return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)"
297case ErrIllegalGeneration:
298return "kafka server: The provided generation id is not the current generation"
299case ErrInconsistentGroupProtocol:
300return "kafka server: The provider group protocol type is incompatible with the other members"
301case ErrInvalidGroupId:
302return "kafka server: The provided group id was empty"
303case ErrUnknownMemberId:
304return "kafka server: The provided member is not known in the current generation"
305case ErrInvalidSessionTimeout:
306return "kafka server: The provided session timeout is outside the allowed range"
307case ErrRebalanceInProgress:
308return "kafka server: A rebalance for the group is in progress. Please re-join the group"
309case ErrInvalidCommitOffsetSize:
310return "kafka server: The provided commit metadata was too large"
311case ErrTopicAuthorizationFailed:
312return "kafka server: The client is not authorized to access this topic"
313case ErrGroupAuthorizationFailed:
314return "kafka server: The client is not authorized to access this group"
315case ErrClusterAuthorizationFailed:
316return "kafka server: The client is not authorized to send this request type"
317case ErrInvalidTimestamp:
318return "kafka server: The timestamp of the message is out of acceptable range"
319case ErrUnsupportedSASLMechanism:
320return "kafka server: The broker does not support the requested SASL mechanism"
321case ErrIllegalSASLState:
322return "kafka server: Request is not valid given the current SASL state"
323case ErrUnsupportedVersion:
324return "kafka server: The version of API is not supported"
325case ErrTopicAlreadyExists:
326return "kafka server: Topic with this name already exists"
327case ErrInvalidPartitions:
328return "kafka server: Number of partitions is invalid"
329case ErrInvalidReplicationFactor:
330return "kafka server: Replication-factor is invalid"
331case ErrInvalidReplicaAssignment:
332return "kafka server: Replica assignment is invalid"
333case ErrInvalidConfig:
334return "kafka server: Configuration is invalid"
335case ErrNotController:
336return "kafka server: This is not the correct controller for this cluster"
337case ErrInvalidRequest:
338return "kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details"
339case ErrUnsupportedForMessageFormat:
340return "kafka server: The requested operation is not supported by the message format version"
341case ErrPolicyViolation:
342return "kafka server: Request parameters do not satisfy the configured policy"
343case ErrOutOfOrderSequenceNumber:
344return "kafka server: The broker received an out of order sequence number"
345case ErrDuplicateSequenceNumber:
346return "kafka server: The broker received a duplicate sequence number"
347case ErrInvalidProducerEpoch:
348return "kafka server: Producer attempted an operation with an old epoch"
349case ErrInvalidTxnState:
350return "kafka server: The producer attempted a transactional operation in an invalid state"
351case ErrInvalidProducerIDMapping:
352return "kafka server: The producer attempted to use a producer id which is not currently assigned to its transactional id"
353case ErrInvalidTransactionTimeout:
354return "kafka server: The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms)"
355case ErrConcurrentTransactions:
356return "kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing"
357case ErrTransactionCoordinatorFenced:
358return "kafka server: The transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer"
359case ErrTransactionalIDAuthorizationFailed:
360return "kafka server: Transactional ID authorization failed"
361case ErrSecurityDisabled:
362return "kafka server: Security features are disabled"
363case ErrOperationNotAttempted:
364return "kafka server: The broker did not attempt to execute this operation"
365case ErrKafkaStorageError:
366return "kafka server: Disk error when trying to access log file on the disk"
367case ErrLogDirNotFound:
368return "kafka server: The specified log directory is not found in the broker config"
369case ErrSASLAuthenticationFailed:
370return "kafka server: SASL Authentication failed"
371case ErrUnknownProducerID:
372return "kafka server: The broker could not locate the producer metadata associated with the Producer ID"
373case ErrReassignmentInProgress:
374return "kafka server: A partition reassignment is in progress"
375case ErrDelegationTokenAuthDisabled:
376return "kafka server: Delegation Token feature is not enabled"
377case ErrDelegationTokenNotFound:
378return "kafka server: Delegation Token is not found on server"
379case ErrDelegationTokenOwnerMismatch:
380return "kafka server: Specified Principal is not valid Owner/Renewer"
381case ErrDelegationTokenRequestNotAllowed:
382return "kafka server: Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels"
383case ErrDelegationTokenAuthorizationFailed:
384return "kafka server: Delegation Token authorization failed"
385case ErrDelegationTokenExpired:
386return "kafka server: Delegation Token is expired"
387case ErrInvalidPrincipalType:
388return "kafka server: Supplied principalType is not supported"
389case ErrNonEmptyGroup:
390return "kafka server: The group is not empty"
391case ErrGroupIDNotFound:
392return "kafka server: The group id does not exist"
393case ErrFetchSessionIDNotFound:
394return "kafka server: The fetch session ID was not found"
395case ErrInvalidFetchSessionEpoch:
396return "kafka server: The fetch session epoch is invalid"
397case ErrListenerNotFound:
398return "kafka server: There is no listener on the leader broker that matches the listener on which metadata request was processed"
399case ErrTopicDeletionDisabled:
400return "kafka server: Topic deletion is disabled"
401case ErrFencedLeaderEpoch:
402return "kafka server: The leader epoch in the request is older than the epoch on the broker"
403case ErrUnknownLeaderEpoch:
404return "kafka server: The leader epoch in the request is newer than the epoch on the broker"
405case ErrUnsupportedCompressionType:
406return "kafka server: The requesting client does not support the compression type of given partition"
407case ErrStaleBrokerEpoch:
408return "kafka server: Broker epoch has changed"
409case ErrOffsetNotAvailable:
410return "kafka server: The leader high watermark has not caught up from a recent leader election so the offsets cannot be guaranteed to be monotonically increasing"
411case ErrMemberIdRequired:
412return "kafka server: The group member needs to have a valid member id before actually entering a consumer group"
413case ErrPreferredLeaderNotAvailable:
414return "kafka server: The preferred leader was not available"
415case ErrGroupMaxSizeReached:
416return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members"
417case ErrFencedInstancedId:
418return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id"
419case ErrEligibleLeadersNotAvailable:
420return "kafka server: Eligible topic partition leaders are not available"
421case ErrElectionNotNeeded:
422return "kafka server: Leader election not needed for topic partition"
423case ErrNoReassignmentInProgress:
424return "kafka server: No partition reassignment is in progress"
425case ErrGroupSubscribedToTopic:
426return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it"
427case ErrInvalidRecord:
428return "kafka server: This record has failed the validation on broker and hence will be rejected"
429case ErrUnstableOffsetCommit:
430return "kafka server: There are unstable offsets that need to be cleared"
431}
432
433return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
434}
435