kafka-aspnet
94 строки · 3.1 Кб
1using System;2using System.Diagnostics;3using System.Linq;4using System.Threading;5using System.Threading.Tasks;6using AspNetCore.Kafka;7using AspNetCore.Kafka.Abstractions;8using AspNetCore.Kafka.Automation.Pipeline;9using AspNetCore.Kafka.Mock.Abstractions;10using FluentAssertions;11using Microsoft.Extensions.DependencyInjection;12using Tests.Data;13using Xunit;14using Xunit.Abstractions;15
16namespace Tests17{
18public class ParallelTests : IClassFixture<TestServer>19{20private readonly TestServer _server;21
22public ParallelTests(TestServer server, ITestOutputHelper output)23{24_server = server.SetOutput(output);25}26
27[Fact]28public async Task Parallel()29{30var broker = _server.Services.GetRequiredService<IKafkaMemoryBroker>();31var producer = _server.Services.GetRequiredService<IKafkaProducer>();32var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();33
34var topic = broker.GetTopic(nameof(Parallel));35var stub = new Stub();36const int batchSize = 5;37const int batchTime = 500;38
39topic.PartitionsCount = 5;40
41var produced = await stub.Produce(producer, topic.PartitionsCount * 2, topic.Name);42
43var subscription = consumer44.Message<StubMessage>()45.AsParallel()46.Batch(batchSize, TimeSpan.FromMilliseconds(batchTime))47.Action(stub.ConsumeBatch)48.Action(async messages =>49{50_server.Output.WriteLine($"Received Batch = {messages.Count()}, Partition = {messages.First().Partition}");51await Task.Delay(batchTime);52})53.Subscribe(topic.Name);54
55await topic.WhenConsumedAll();56await Task.Delay(batchTime * 3);57await subscription.UnsubscribeAsync();58
59stub.ConsumedBatches.Count.Should().BeGreaterThan(1);60stub.Consumed.Count.Should().Be(produced.Count);61}62
63[Fact]64public async Task Unsubscribe()65{66var producer = _server.Services.GetRequiredService<IKafkaProducer>();67var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();68
69const int messageDelay = 5000;70var signal = new ManualResetEvent(false);71
72await producer.ProduceAsync(nameof(Unsubscribe), new StubMessage());73
74var sw = Stopwatch.StartNew();75
76var subscription = consumer77.Message<StubMessage>()78.Buffer(100)79.AsParallel()80.Action(async x =>81{82signal.Set();83await Task.Delay(messageDelay);84})85.Subscribe(nameof(Unsubscribe));86
87signal.WaitOne(1000).Should().Be(true);88
89await subscription.UnsubscribeAsync();90
91sw.ElapsedMilliseconds.Should().BeInRange(messageDelay, messageDelay * 15 / 10);92}93}94}