cubefs

Форк
0
434 строки · 21.3 Кб
1
package sarama
2

3
import (
4
	"errors"
5
	"fmt"
6
	"strings"
7

8
	"github.com/hashicorp/go-multierror"
9
)
10

11
// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
12
// or otherwise failed to respond.
13
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")
14

15
// ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
16
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")
17

18
// ErrClosedClient is the error returned when a method is called on a client that has been closed.
19
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
20

21
// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
22
// not contain the expected information.
23
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
24

25
// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
26
// (meaning one outside of the range [0...numPartitions-1]).
27
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
28

29
// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
30
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
31

32
// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
33
var ErrNotConnected = errors.New("kafka: broker not connected")
34

35
// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
36
// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
37
// of the message set.
38
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
39

40
// ErrShuttingDown is returned when a producer receives a message during shutdown.
41
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
42

43
// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
44
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
45

46
// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
47
// a RecordBatch.
48
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
49

50
// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
51
// is lower than 0.10.0.0.
52
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
53

54
// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
55
// the metadata.
56
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
57

58
// ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
59
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")
60

61
// ErrReassignPartitions is returned when altering partition assignments for a topic fails
62
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")
63

64
// ErrDeleteRecords is the type of error returned when fail to delete the required records
65
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")
66

67
// ErrCreateACLs is the type of error returned when ACL creation failed
68
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")
69

70
// MultiErrorFormat specifies the formatter applied to format multierrors. The
71
// default implementation is a consensed version of the hashicorp/go-multierror
72
// default one
73
var MultiErrorFormat multierror.ErrorFormatFunc = func(es []error) string {
74
	if len(es) == 1 {
75
		return es[0].Error()
76
	}
77

78
	points := make([]string, len(es))
79
	for i, err := range es {
80
		points[i] = fmt.Sprintf("* %s", err)
81
	}
82

83
	return fmt.Sprintf(
84
		"%d errors occurred:\n\t%s\n",
85
		len(es), strings.Join(points, "\n\t"))
86
}
87

88
type sentinelError struct {
89
	sentinel error
90
	wrapped  error
91
}
92

93
func (err sentinelError) Error() string {
94
	if err.wrapped != nil {
95
		return fmt.Sprintf("%s: %v", err.sentinel, err.wrapped)
96
	} else {
97
		return fmt.Sprintf("%s", err.sentinel)
98
	}
99
}
100

101
func (err sentinelError) Is(target error) bool {
102
	return errors.Is(err.sentinel, target) || errors.Is(err.wrapped, target)
103
}
104

105
func (err sentinelError) Unwrap() error {
106
	return err.wrapped
107
}
108

109
func Wrap(sentinel error, wrapped ...error) sentinelError {
110
	return sentinelError{sentinel: sentinel, wrapped: multiError(wrapped...)}
111
}
112

113
func multiError(wrapped ...error) error {
114
	merr := multierror.Append(nil, wrapped...)
115
	if MultiErrorFormat != nil {
116
		merr.ErrorFormat = MultiErrorFormat
117
	}
118
	return merr.ErrorOrNil()
119
}
120

121
// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
122
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
123
type PacketEncodingError struct {
124
	Info string
125
}
126

127
func (err PacketEncodingError) Error() string {
128
	return fmt.Sprintf("kafka: error encoding packet: %s", err.Info)
129
}
130

131
// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
132
// This can be a bad CRC or length field, or any other invalid value.
133
type PacketDecodingError struct {
134
	Info string
135
}
136

137
func (err PacketDecodingError) Error() string {
138
	return fmt.Sprintf("kafka: error decoding packet: %s", err.Info)
139
}
140

141
// ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer)
142
// when the specified configuration is invalid.
143
type ConfigurationError string
144

145
func (err ConfigurationError) Error() string {
146
	return "kafka: invalid configuration (" + string(err) + ")"
147
}
148

149
// KError is the type of error that can be returned directly by the Kafka broker.
150
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
151
type KError int16
152

153
// Numeric error codes returned by the Kafka server.
154
const (
155
	ErrNoError                            KError = 0
156
	ErrUnknown                            KError = -1
157
	ErrOffsetOutOfRange                   KError = 1
158
	ErrInvalidMessage                     KError = 2
159
	ErrUnknownTopicOrPartition            KError = 3
160
	ErrInvalidMessageSize                 KError = 4
161
	ErrLeaderNotAvailable                 KError = 5
162
	ErrNotLeaderForPartition              KError = 6
163
	ErrRequestTimedOut                    KError = 7
164
	ErrBrokerNotAvailable                 KError = 8
165
	ErrReplicaNotAvailable                KError = 9
166
	ErrMessageSizeTooLarge                KError = 10
167
	ErrStaleControllerEpochCode           KError = 11
168
	ErrOffsetMetadataTooLarge             KError = 12
169
	ErrNetworkException                   KError = 13
170
	ErrOffsetsLoadInProgress              KError = 14
171
	ErrConsumerCoordinatorNotAvailable    KError = 15
172
	ErrNotCoordinatorForConsumer          KError = 16
173
	ErrInvalidTopic                       KError = 17
174
	ErrMessageSetSizeTooLarge             KError = 18
175
	ErrNotEnoughReplicas                  KError = 19
176
	ErrNotEnoughReplicasAfterAppend       KError = 20
177
	ErrInvalidRequiredAcks                KError = 21
178
	ErrIllegalGeneration                  KError = 22
179
	ErrInconsistentGroupProtocol          KError = 23
180
	ErrInvalidGroupId                     KError = 24
181
	ErrUnknownMemberId                    KError = 25
182
	ErrInvalidSessionTimeout              KError = 26
183
	ErrRebalanceInProgress                KError = 27
184
	ErrInvalidCommitOffsetSize            KError = 28
185
	ErrTopicAuthorizationFailed           KError = 29
186
	ErrGroupAuthorizationFailed           KError = 30
187
	ErrClusterAuthorizationFailed         KError = 31
188
	ErrInvalidTimestamp                   KError = 32
189
	ErrUnsupportedSASLMechanism           KError = 33
190
	ErrIllegalSASLState                   KError = 34
191
	ErrUnsupportedVersion                 KError = 35
192
	ErrTopicAlreadyExists                 KError = 36
193
	ErrInvalidPartitions                  KError = 37
194
	ErrInvalidReplicationFactor           KError = 38
195
	ErrInvalidReplicaAssignment           KError = 39
196
	ErrInvalidConfig                      KError = 40
197
	ErrNotController                      KError = 41
198
	ErrInvalidRequest                     KError = 42
199
	ErrUnsupportedForMessageFormat        KError = 43
200
	ErrPolicyViolation                    KError = 44
201
	ErrOutOfOrderSequenceNumber           KError = 45
202
	ErrDuplicateSequenceNumber            KError = 46
203
	ErrInvalidProducerEpoch               KError = 47
204
	ErrInvalidTxnState                    KError = 48
205
	ErrInvalidProducerIDMapping           KError = 49
206
	ErrInvalidTransactionTimeout          KError = 50
207
	ErrConcurrentTransactions             KError = 51
208
	ErrTransactionCoordinatorFenced       KError = 52
209
	ErrTransactionalIDAuthorizationFailed KError = 53
210
	ErrSecurityDisabled                   KError = 54
211
	ErrOperationNotAttempted              KError = 55
212
	ErrKafkaStorageError                  KError = 56
213
	ErrLogDirNotFound                     KError = 57
214
	ErrSASLAuthenticationFailed           KError = 58
215
	ErrUnknownProducerID                  KError = 59
216
	ErrReassignmentInProgress             KError = 60
217
	ErrDelegationTokenAuthDisabled        KError = 61
218
	ErrDelegationTokenNotFound            KError = 62
219
	ErrDelegationTokenOwnerMismatch       KError = 63
220
	ErrDelegationTokenRequestNotAllowed   KError = 64
221
	ErrDelegationTokenAuthorizationFailed KError = 65
222
	ErrDelegationTokenExpired             KError = 66
223
	ErrInvalidPrincipalType               KError = 67
224
	ErrNonEmptyGroup                      KError = 68
225
	ErrGroupIDNotFound                    KError = 69
226
	ErrFetchSessionIDNotFound             KError = 70
227
	ErrInvalidFetchSessionEpoch           KError = 71
228
	ErrListenerNotFound                   KError = 72
229
	ErrTopicDeletionDisabled              KError = 73
230
	ErrFencedLeaderEpoch                  KError = 74
231
	ErrUnknownLeaderEpoch                 KError = 75
232
	ErrUnsupportedCompressionType         KError = 76
233
	ErrStaleBrokerEpoch                   KError = 77
234
	ErrOffsetNotAvailable                 KError = 78
235
	ErrMemberIdRequired                   KError = 79
236
	ErrPreferredLeaderNotAvailable        KError = 80
237
	ErrGroupMaxSizeReached                KError = 81
238
	ErrFencedInstancedId                  KError = 82
239
	ErrEligibleLeadersNotAvailable        KError = 83
240
	ErrElectionNotNeeded                  KError = 84
241
	ErrNoReassignmentInProgress           KError = 85
242
	ErrGroupSubscribedToTopic             KError = 86
243
	ErrInvalidRecord                      KError = 87
244
	ErrUnstableOffsetCommit               KError = 88
245
)
246

247
func (err KError) Error() string {
248
	// Error messages stolen/adapted from
249
	// https://kafka.apache.org/protocol#protocol_error_codes
250
	switch err {
251
	case ErrNoError:
252
		return "kafka server: Not an error, why are you printing me?"
253
	case ErrUnknown:
254
		return "kafka server: Unexpected (unknown?) server error"
255
	case ErrOffsetOutOfRange:
256
		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition"
257
	case ErrInvalidMessage:
258
		return "kafka server: Message contents does not match its CRC"
259
	case ErrUnknownTopicOrPartition:
260
		return "kafka server: Request was for a topic or partition that does not exist on this broker"
261
	case ErrInvalidMessageSize:
262
		return "kafka server: The message has a negative size"
263
	case ErrLeaderNotAvailable:
264
		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes"
265
	case ErrNotLeaderForPartition:
266
		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date"
267
	case ErrRequestTimedOut:
268
		return "kafka server: Request exceeded the user-specified time limit in the request"
269
	case ErrBrokerNotAvailable:
270
		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
271
	case ErrReplicaNotAvailable:
272
		return "kafka server: Replica information not available, one or more brokers are down"
273
	case ErrMessageSizeTooLarge:
274
		return "kafka server: Message was too large, server rejected it to avoid allocation error"
275
	case ErrStaleControllerEpochCode:
276
		return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)"
277
	case ErrOffsetMetadataTooLarge:
278
		return "kafka server: Specified a string larger than the configured maximum for offset metadata"
279
	case ErrNetworkException:
280
		return "kafka server: The server disconnected before a response was received"
281
	case ErrOffsetsLoadInProgress:
282
		return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition"
283
	case ErrConsumerCoordinatorNotAvailable:
284
		return "kafka server: Offset's topic has not yet been created"
285
	case ErrNotCoordinatorForConsumer:
286
		return "kafka server: Request was for a consumer group that is not coordinated by this broker"
287
	case ErrInvalidTopic:
288
		return "kafka server: The request attempted to perform an operation on an invalid topic"
289
	case ErrMessageSetSizeTooLarge:
290
		return "kafka server: The request included message batch larger than the configured segment size on the server"
291
	case ErrNotEnoughReplicas:
292
		return "kafka server: Messages are rejected since there are fewer in-sync replicas than required"
293
	case ErrNotEnoughReplicasAfterAppend:
294
		return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required"
295
	case ErrInvalidRequiredAcks:
296
		return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)"
297
	case ErrIllegalGeneration:
298
		return "kafka server: The provided generation id is not the current generation"
299
	case ErrInconsistentGroupProtocol:
300
		return "kafka server: The provider group protocol type is incompatible with the other members"
301
	case ErrInvalidGroupId:
302
		return "kafka server: The provided group id was empty"
303
	case ErrUnknownMemberId:
304
		return "kafka server: The provided member is not known in the current generation"
305
	case ErrInvalidSessionTimeout:
306
		return "kafka server: The provided session timeout is outside the allowed range"
307
	case ErrRebalanceInProgress:
308
		return "kafka server: A rebalance for the group is in progress. Please re-join the group"
309
	case ErrInvalidCommitOffsetSize:
310
		return "kafka server: The provided commit metadata was too large"
311
	case ErrTopicAuthorizationFailed:
312
		return "kafka server: The client is not authorized to access this topic"
313
	case ErrGroupAuthorizationFailed:
314
		return "kafka server: The client is not authorized to access this group"
315
	case ErrClusterAuthorizationFailed:
316
		return "kafka server: The client is not authorized to send this request type"
317
	case ErrInvalidTimestamp:
318
		return "kafka server: The timestamp of the message is out of acceptable range"
319
	case ErrUnsupportedSASLMechanism:
320
		return "kafka server: The broker does not support the requested SASL mechanism"
321
	case ErrIllegalSASLState:
322
		return "kafka server: Request is not valid given the current SASL state"
323
	case ErrUnsupportedVersion:
324
		return "kafka server: The version of API is not supported"
325
	case ErrTopicAlreadyExists:
326
		return "kafka server: Topic with this name already exists"
327
	case ErrInvalidPartitions:
328
		return "kafka server: Number of partitions is invalid"
329
	case ErrInvalidReplicationFactor:
330
		return "kafka server: Replication-factor is invalid"
331
	case ErrInvalidReplicaAssignment:
332
		return "kafka server: Replica assignment is invalid"
333
	case ErrInvalidConfig:
334
		return "kafka server: Configuration is invalid"
335
	case ErrNotController:
336
		return "kafka server: This is not the correct controller for this cluster"
337
	case ErrInvalidRequest:
338
		return "kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details"
339
	case ErrUnsupportedForMessageFormat:
340
		return "kafka server: The requested operation is not supported by the message format version"
341
	case ErrPolicyViolation:
342
		return "kafka server: Request parameters do not satisfy the configured policy"
343
	case ErrOutOfOrderSequenceNumber:
344
		return "kafka server: The broker received an out of order sequence number"
345
	case ErrDuplicateSequenceNumber:
346
		return "kafka server: The broker received a duplicate sequence number"
347
	case ErrInvalidProducerEpoch:
348
		return "kafka server: Producer attempted an operation with an old epoch"
349
	case ErrInvalidTxnState:
350
		return "kafka server: The producer attempted a transactional operation in an invalid state"
351
	case ErrInvalidProducerIDMapping:
352
		return "kafka server: The producer attempted to use a producer id which is not currently assigned to its transactional id"
353
	case ErrInvalidTransactionTimeout:
354
		return "kafka server: The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms)"
355
	case ErrConcurrentTransactions:
356
		return "kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing"
357
	case ErrTransactionCoordinatorFenced:
358
		return "kafka server: The transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer"
359
	case ErrTransactionalIDAuthorizationFailed:
360
		return "kafka server: Transactional ID authorization failed"
361
	case ErrSecurityDisabled:
362
		return "kafka server: Security features are disabled"
363
	case ErrOperationNotAttempted:
364
		return "kafka server: The broker did not attempt to execute this operation"
365
	case ErrKafkaStorageError:
366
		return "kafka server: Disk error when trying to access log file on the disk"
367
	case ErrLogDirNotFound:
368
		return "kafka server: The specified log directory is not found in the broker config"
369
	case ErrSASLAuthenticationFailed:
370
		return "kafka server: SASL Authentication failed"
371
	case ErrUnknownProducerID:
372
		return "kafka server: The broker could not locate the producer metadata associated with the Producer ID"
373
	case ErrReassignmentInProgress:
374
		return "kafka server: A partition reassignment is in progress"
375
	case ErrDelegationTokenAuthDisabled:
376
		return "kafka server: Delegation Token feature is not enabled"
377
	case ErrDelegationTokenNotFound:
378
		return "kafka server: Delegation Token is not found on server"
379
	case ErrDelegationTokenOwnerMismatch:
380
		return "kafka server: Specified Principal is not valid Owner/Renewer"
381
	case ErrDelegationTokenRequestNotAllowed:
382
		return "kafka server: Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels"
383
	case ErrDelegationTokenAuthorizationFailed:
384
		return "kafka server: Delegation Token authorization failed"
385
	case ErrDelegationTokenExpired:
386
		return "kafka server: Delegation Token is expired"
387
	case ErrInvalidPrincipalType:
388
		return "kafka server: Supplied principalType is not supported"
389
	case ErrNonEmptyGroup:
390
		return "kafka server: The group is not empty"
391
	case ErrGroupIDNotFound:
392
		return "kafka server: The group id does not exist"
393
	case ErrFetchSessionIDNotFound:
394
		return "kafka server: The fetch session ID was not found"
395
	case ErrInvalidFetchSessionEpoch:
396
		return "kafka server: The fetch session epoch is invalid"
397
	case ErrListenerNotFound:
398
		return "kafka server: There is no listener on the leader broker that matches the listener on which metadata request was processed"
399
	case ErrTopicDeletionDisabled:
400
		return "kafka server: Topic deletion is disabled"
401
	case ErrFencedLeaderEpoch:
402
		return "kafka server: The leader epoch in the request is older than the epoch on the broker"
403
	case ErrUnknownLeaderEpoch:
404
		return "kafka server: The leader epoch in the request is newer than the epoch on the broker"
405
	case ErrUnsupportedCompressionType:
406
		return "kafka server: The requesting client does not support the compression type of given partition"
407
	case ErrStaleBrokerEpoch:
408
		return "kafka server: Broker epoch has changed"
409
	case ErrOffsetNotAvailable:
410
		return "kafka server: The leader high watermark has not caught up from a recent leader election so the offsets cannot be guaranteed to be monotonically increasing"
411
	case ErrMemberIdRequired:
412
		return "kafka server: The group member needs to have a valid member id before actually entering a consumer group"
413
	case ErrPreferredLeaderNotAvailable:
414
		return "kafka server: The preferred leader was not available"
415
	case ErrGroupMaxSizeReached:
416
		return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members"
417
	case ErrFencedInstancedId:
418
		return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id"
419
	case ErrEligibleLeadersNotAvailable:
420
		return "kafka server: Eligible topic partition leaders are not available"
421
	case ErrElectionNotNeeded:
422
		return "kafka server: Leader election not needed for topic partition"
423
	case ErrNoReassignmentInProgress:
424
		return "kafka server: No partition reassignment is in progress"
425
	case ErrGroupSubscribedToTopic:
426
		return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it"
427
	case ErrInvalidRecord:
428
		return "kafka server: This record has failed the validation on broker and hence will be rejected"
429
	case ErrUnstableOffsetCommit:
430
		return "kafka server: There are unstable offsets that need to be cleared"
431
	}
432

433
	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
434
}
435

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.