kafka-aspnet

Форк
0
/
ParallelTests.cs 
94 строки · 3.1 Кб
1
using System;
2
using System.Diagnostics;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using AspNetCore.Kafka;
7
using AspNetCore.Kafka.Abstractions;
8
using AspNetCore.Kafka.Automation.Pipeline;
9
using AspNetCore.Kafka.Mock.Abstractions;
10
using FluentAssertions;
11
using Microsoft.Extensions.DependencyInjection;
12
using Tests.Data;
13
using Xunit;
14
using Xunit.Abstractions;
15

16
namespace Tests
17
{
18
    public class ParallelTests : IClassFixture<TestServer>
19
    {
20
        private readonly TestServer _server;
21

22
        public ParallelTests(TestServer server, ITestOutputHelper output)
23
        {
24
            _server = server.SetOutput(output);
25
        }
26

27
        [Fact]
28
        public async Task Parallel()
29
        {
30
            var broker = _server.Services.GetRequiredService<IKafkaMemoryBroker>();
31
            var producer = _server.Services.GetRequiredService<IKafkaProducer>();
32
            var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();
33
            
34
            var topic = broker.GetTopic(nameof(Parallel));
35
            var stub = new Stub();
36
            const int batchSize = 5;
37
            const int batchTime = 500;
38

39
            topic.PartitionsCount = 5;
40
            
41
            var produced = await stub.Produce(producer, topic.PartitionsCount * 2, topic.Name);
42
            
43
            var subscription = consumer
44
                .Message<StubMessage>()
45
                .AsParallel()
46
                .Batch(batchSize, TimeSpan.FromMilliseconds(batchTime))
47
                .Action(stub.ConsumeBatch)
48
                .Action(async messages =>
49
                {
50
                    _server.Output.WriteLine($"Received Batch = {messages.Count()}, Partition = {messages.First().Partition}");
51
                    await Task.Delay(batchTime);
52
                })
53
                .Subscribe(topic.Name);
54

55
            await topic.WhenConsumedAll();
56
            await Task.Delay(batchTime * 3);
57
            await subscription.UnsubscribeAsync();
58

59
            stub.ConsumedBatches.Count.Should().BeGreaterThan(1);
60
            stub.Consumed.Count.Should().Be(produced.Count);
61
        }
62
        
63
        [Fact]
64
        public async Task Unsubscribe()
65
        {
66
            var producer = _server.Services.GetRequiredService<IKafkaProducer>();
67
            var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();
68
            
69
            const int messageDelay = 5000;
70
            var signal = new ManualResetEvent(false);
71

72
            await producer.ProduceAsync(nameof(Unsubscribe), new StubMessage());
73
            
74
            var sw = Stopwatch.StartNew();
75
            
76
            var subscription = consumer
77
                .Message<StubMessage>()
78
                .Buffer(100)
79
                .AsParallel()
80
                .Action(async x =>
81
                {
82
                    signal.Set();
83
                    await Task.Delay(messageDelay);
84
                })
85
                .Subscribe(nameof(Unsubscribe));
86

87
            signal.WaitOne(1000).Should().Be(true);
88

89
            await subscription.UnsubscribeAsync();
90
            
91
            sw.ElapsedMilliseconds.Should().BeInRange(messageDelay, messageDelay * 15 / 10);
92
        }
93
    }
94
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.