kafka-aspnet

0
4 года назад
4 года назад
4 года назад
4 года назад
4 года назад
5 лет назад
5 лет назад
4 года назад
README.md

AspNetCore.Kafka

A messaging infrastructure on top of Confluent.Kafka and AspNetCore.

Features

  • Declare your subscriptions
  • Complex processing pipeline configuration (buffer, batch, parallel etc.)
  • Shutdown/Re-balance process to correctly store all the offsets
  • Interceptors
  • Mock implementation for unit tests
  • Flexible configuration in appsettings

Registration

Extended registration

This will register an IKafkaConsumer interface in AspNetCore DI container. But you won't need it when defining handlers with MessageHandler attribute or deriving from IMessageHandler interface.

Handlers

If you configure Kafka to scan assembly for declared handlers i.e.

services.AddKafka(Configuration).Subscribe(x => x.AddAssembly(ServiceLifetime.Transient));
, the only thing you need to start consuming is the handler itself.

Declare handler with attribute

Derive your handler from
IMessageHandler
interface

Derive your handler from
IMessageHandler<TContract>
interface

In example above TContract is an actual event type after deserialization. It's wrapped into IMessage to get access to additional message properties like offset etc.

If you don't need those additional message properties it's allowed to consume raw TContract, i.e.

Derive your handler from
IMessageHandler<TContract>
interface without
IMessage
additonal properties

Message contract declaration

Having a contract type in you handler it's possible to define a topic this type is tied to with

Message
attribute as below. In this case you don't need to repeat a topic name neither in your consumer handler or when producing message for this specific type - topic name will be retrieved from Message declaration.

IMessage
wrapper

This will give you a chance to access Kafka related message properties like partition, offset, group, key etc. Also the are two methods:

Commit()
and
Store()
to manually commit or store (see Kafka documentation for more details) current message offset. It's also possible to configure message processing pipeline (along with Kafka related consumer configuration) to do all the stuff automatically.

Shutdown consumption

When consumption is shutting down or broker initiated a re-balance - it is important to process and commit/store offsets correctly. Having an async processing and an internal processing pipeline when a client receives a shutdown notification (basically when an application is shutting down) or a re-balance notification - it will wait for the entire consumed messages (consumed but not processed yet) to be completely processed and their offsets stored as per configuration and only then will allow shutdown or rebalance.

In other words when you shutdown, bounce, re-deploy your service or a broker is re-balancing - all messages are supposed to be processed before the initiated action, so you have all the offsets stored correctly.

Fluent subscription

Simple handler

Pipeline

Change consumption offset

Changing consume offset to start from can be set in fluent pipeline, message Offset attributes or via configuration.

Fluent

Message Offset attribute

Configuration

appsetings.json:

Kafka:Message:Default:
Offset config will be added by default for all message subscriptions.

Kafka:Message:[MyMessage]:
Offset config will be added to messages marked with [Message(Name = "MyMessage")] attribute only overriding any values set in Default configuration above.

Message processing pipeline

Message pipelines are based on TPL blocks.

Batches, Buffer, Commit/Store and Parallel execution per partition

The order of attributes doesn't matter - the actual pipeline is always get built this way:

[Buffer] > [Parallel] > [Batch] > [Action] > [Commit] / [Store]

Any of the following blocks could be omitted.

[Parallel] with DegreeOfParallelism set to greater than 1 - is to lower the actual degree of parallelization, otherwise it's set to [-1] and means the degree of parallelization equals to partitions count of the target topic.

Configure all in appsettings.json

You could specify a message name to get all the configuration along with policies from message configuration in appsettings.

Actual message consumption configuration:

Kafka:Message:Default - specified blocks will be added by default for all message subscriptions overriding any values set in the code.

Kafka:Message:MyMessage - properties ans policies will be added to messages marked with [Message(Name = "MyMessage")] attribute overriding any values set by Default configuration above.

Producing messages

Message producing is available using message declaration [Message] attribute (to get topic name and format) as well as setting it inline while actual message producing.

Keys for produced messages

Keys could be set in several ways:

Explicit keys

Key property name in message declaration:

[MessageKey] attribute (preferred):

Interceptors

In-memory broker for Consumer/Producer mocking

The following setup will create a memory based broker and the actual IKafkaConsumer and IKafkaProducer are used as usual with complete support of all features.

An additional interface IKafkaMemoryBroker is available from DI container for produce/consume tracking or specific setup.

Metrics

Implemented as a MetricsInterceptor.

Configuration

Message configuration properties

NameAttributeValueDescription
State[MessageState]Enabled/DisabledSet message subscription state
Offset[Offset][begin,end,stored]
2020-01-01
(end, -100)
Set message offset
Bias[Offset]e.g. -100Set message offset bias. Offset is defaulted to End
Batch[Batch]e.g. (10, 1000)Group incoming message into batches with count 10. If less than 10 wait 1000ms and then send message for further processing.
Buffer[Buffer]e.g. 100Buffer incoming message during processing
Commit[Commit]-Commit current message offset after message being processed
Store[Store]-Store current message offset after message being processed
Parallel[Parallel]e.g. 22 - is a degree of paralelisation. Split pipeline into multiple sub-pipelines to process messages in parallel.

"Consume/Producer" json objects contain a set of Key/Values pairs that will eventually map to Kafka Consumer/Producer configuration, that is to say you can configure any additional client configuration in here, i.e. authentication mechanism.

Sample