kafka-aspnet
Описание
A messaging infrastructure on top of Confluent.Kafka and AspNetCore
Языки
- C#100%
AspNetCore.Kafka
A messaging infrastructure on top of Confluent.Kafka and AspNetCore.
Features
- Declare your subscriptions
- Complex processing pipeline configuration (buffer, batch, parallel etc.)
- Shutdown/Re-balance process to correctly store all the offsets
- Interceptors
- Mock implementation for unit tests
- Flexible configuration in appsettings
Registration
Extended registration
This will register an IKafkaConsumer interface in AspNetCore DI container. But you won't need it when defining handlers with MessageHandler attribute or deriving from IMessageHandler interface.
Handlers
If you configure Kafka to scan assembly for declared handlers i.e. , the only thing you need to start consuming is the handler itself.
Declare handler with attribute
Derive your handler from IMessageHandler interface
Derive your handler from IMessageHandler<TContract> interface
In example above TContract is an actual event type after deserialization. It's wrapped into IMessage to get access to additional message properties like offset etc.
If you don't need those additional message properties it's allowed to consume raw TContract, i.e.
Derive your handler from IMessageHandler<TContract> interface without IMessage additonal properties
Message contract declaration
Having a contract type in you handler it's possible to define a topic this type is tied to with attribute as below.
In this case you don't need to repeat a topic name neither in your consumer handler or when producing message for this specific type - topic name will be retrieved from Message declaration.
IMessage wrapper
This will give you a chance to access Kafka related message properties like partition, offset, group, key etc.
Also the are two methods: and to manually commit or store (see Kafka documentation for more details) current message offset.
It's also possible to configure message processing pipeline (along with Kafka related consumer configuration) to do all the stuff automatically.
Shutdown consumption
When consumption is shutting down or broker initiated a re-balance - it is important to process and commit/store offsets correctly. Having an async processing and an internal processing pipeline when a client receives a shutdown notification (basically when an application is shutting down) or a re-balance notification - it will wait for the entire consumed messages (consumed but not processed yet) to be completely processed and their offsets stored as per configuration and only then will allow shutdown or rebalance.
In other words when you shutdown, bounce, re-deploy your service or a broker is re-balancing - all messages are supposed to be processed before the initiated action, so you have all the offsets stored correctly.
Fluent subscription
Simple handler
Pipeline
Change consumption offset
Changing consume offset to start from can be set in fluent pipeline, message Offset attributes or via configuration.
Fluent
Message Offset attribute
Configuration
appsetings.json:
Kafka:Message:Default:
Offset config will be added by default for all message subscriptions.
Kafka:Message:[MyMessage]:
Offset config will be added to messages marked
with [Message(Name = "MyMessage")] attribute only overriding any values set in Default
configuration above.
Message processing pipeline
Message pipelines are based on TPL blocks.
Batches, Buffer, Commit/Store and Parallel execution per partition
The order of attributes doesn't matter - the actual pipeline is always get built this way:
[Buffer] > [Parallel] > [Batch] > [Action] > [Commit] / [Store]
Any of the following blocks could be omitted.
[Parallel] with DegreeOfParallelism set to greater than 1 - is to lower the actual degree of parallelization, otherwise it's set to [-1] and means the degree of parallelization equals to partitions count of the target topic.
Configure all in appsettings.json
You could specify a message name to get all the configuration along with policies from message configuration in appsettings.
Actual message consumption configuration:
Kafka:Message:Default - specified blocks will be added by default for all message subscriptions overriding any values set in the code.
Kafka:Message:MyMessage - properties ans policies will be added to messages marked with [Message(Name = "MyMessage")] attribute overriding any values set by Default configuration above.
Producing messages
Message producing is available using message declaration [Message] attribute (to get topic name and format) as well as setting it inline while actual message producing.
Keys for produced messages
Keys could be set in several ways:
Explicit keys
Key property name in message declaration:
[MessageKey] attribute (preferred):
Interceptors
In-memory broker for Consumer/Producer mocking
The following setup will create a memory based broker and the actual IKafkaConsumer and IKafkaProducer are used as usual with complete support of all features.
An additional interface IKafkaMemoryBroker is available from DI container for produce/consume tracking or specific setup.
Metrics
Implemented as a MetricsInterceptor.
Configuration
Message configuration properties
| Name | Attribute | Value | Description |
| State | [MessageState] | Enabled/Disabled | Set message subscription state |
| Offset | [Offset] | [begin,end,stored] 2020-01-01 (end, -100) | Set message offset |
| Bias | [Offset] | e.g. -100 | Set message offset bias. Offset is defaulted to End |
| Batch | [Batch] | e.g. (10, 1000) | Group incoming message into batches with count 10. If less than 10 wait 1000ms and then send message for further processing. |
| Buffer | [Buffer] | e.g. 100 | Buffer incoming message during processing |
| Commit | [Commit] | - | Commit current message offset after message being processed |
| Store | [Store] | - | Store current message offset after message being processed |
| Parallel | [Parallel] | e.g. 2 | 2 - is a degree of paralelisation. Split pipeline into multiple sub-pipelines to process messages in parallel. |
"Consume/Producer" json objects contain a set of Key/Values pairs that will eventually map to Kafka Consumer/Producer configuration, that is to say you can configure any additional client configuration in here, i.e. authentication mechanism.