kafka-aspnet
88 строк · 3.0 Кб
1using System;2using System.Threading.Tasks;3using AspNetCore.Kafka.Abstractions;4using AspNetCore.Kafka.Automation.Pipeline;5using AspNetCore.Kafka.Mock.Abstractions;6using FluentAssertions;7using Microsoft.Extensions.DependencyInjection;8using Tests.Data;9using Xunit;10using Xunit.Abstractions;11
12namespace Tests13{
14public class BatchTests : IClassFixture<TestServer>15{16private readonly TestServer _server;17
18public BatchTests(TestServer server, ITestOutputHelper output)19{20_server = server.SetOutput(output);21}22
23[Fact]24public async Task BatchSeries()25{26var broker = _server.Services.GetRequiredService<IKafkaMemoryBroker>();27var producer = _server.Services.GetRequiredService<IKafkaProducer>();28var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();29
30var topic = broker.GetTopic(nameof(BatchSeries));31const int batchSize = 5;32const int batchCount = 30;33const int batchTime = 500;34
35var stub = new Stub();36
37await stub.Produce(producer, batchCount * batchSize, topic.Name);38
39var subscription = consumer40.Message<StubMessage>()41.Batch(batchSize, TimeSpan.FromMilliseconds(batchTime))42.Action(stub.ConsumeBatch)43.Subscribe(topic.Name);44
45await stub.Produce(producer, 1, topic.Name);46
47await topic.WhenConsumedAll();48await Task.Delay(100);49await subscription.UnsubscribeAsync();50
51stub.Consumed.Count.Should().Be(batchCount * batchSize);52stub.ConsumedBatches.Count.Should().Be(batchCount);53}54
55[Fact]56public async Task RandomBatches()57{58var broker = _server.Services.GetRequiredService<IKafkaMemoryBroker>();59var producer = _server.Services.GetRequiredService<IKafkaProducer>();60var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();61
62var topic = broker.GetTopic(nameof(RandomBatches));63const int batchSize = 10;64const int batchTime = 500;65
66var stub = new Stub();67
68var count = await Generator.Run(69() => stub.Produce(producer, 1, topic.Name),70TimeSpan.FromSeconds(5),71TimeSpan.FromMilliseconds(200));72
73var subscription = consumer74.Message<StubMessage>()75.Batch(batchSize, TimeSpan.FromMilliseconds(batchTime))76.Action(stub.ConsumeBatch)77.Subscribe(topic.Name);78
79await topic.WhenConsumedAll();80await Task.Delay(1000);81await subscription.UnsubscribeAsync();82
83_server.Output.WriteLine($"Generated {count} calls");84
85stub.Consumed.Count.Should().Be(count);86}87}88}