kafka-aspnet

Форк
0
/
BatchTests.cs 
88 строк · 3.0 Кб
1
using System;
2
using System.Threading.Tasks;
3
using AspNetCore.Kafka.Abstractions;
4
using AspNetCore.Kafka.Automation.Pipeline;
5
using AspNetCore.Kafka.Mock.Abstractions;
6
using FluentAssertions;
7
using Microsoft.Extensions.DependencyInjection;
8
using Tests.Data;
9
using Xunit;
10
using Xunit.Abstractions;
11

12
namespace Tests
13
{
14
    public class BatchTests : IClassFixture<TestServer>
15
    {
16
        private readonly TestServer _server;
17

18
        public BatchTests(TestServer server, ITestOutputHelper output)
19
        {
20
            _server = server.SetOutput(output);
21
        }
22

23
        [Fact]
24
        public async Task BatchSeries()
25
        {
26
            var broker = _server.Services.GetRequiredService<IKafkaMemoryBroker>();
27
            var producer = _server.Services.GetRequiredService<IKafkaProducer>();
28
            var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();
29
            
30
            var topic = broker.GetTopic(nameof(BatchSeries));
31
            const int batchSize = 5;
32
            const int batchCount = 30;
33
            const int batchTime = 500;
34

35
            var stub = new Stub();
36
            
37
            await stub.Produce(producer, batchCount * batchSize, topic.Name);
38

39
            var subscription = consumer
40
                .Message<StubMessage>()
41
                .Batch(batchSize, TimeSpan.FromMilliseconds(batchTime))
42
                .Action(stub.ConsumeBatch)
43
                .Subscribe(topic.Name);
44
            
45
            await stub.Produce(producer, 1, topic.Name);
46

47
            await topic.WhenConsumedAll();
48
            await Task.Delay(100);
49
            await subscription.UnsubscribeAsync();
50
            
51
            stub.Consumed.Count.Should().Be(batchCount * batchSize);
52
            stub.ConsumedBatches.Count.Should().Be(batchCount);
53
        }
54

55
        [Fact]
56
        public async Task RandomBatches()
57
        {
58
            var broker = _server.Services.GetRequiredService<IKafkaMemoryBroker>();
59
            var producer = _server.Services.GetRequiredService<IKafkaProducer>();
60
            var consumer = _server.Services.GetRequiredService<IKafkaConsumer>();
61
            
62
            var topic = broker.GetTopic(nameof(RandomBatches));
63
            const int batchSize = 10;
64
            const int batchTime = 500;
65
            
66
            var stub = new Stub();
67
            
68
            var count = await Generator.Run(
69
                () => stub.Produce(producer, 1, topic.Name),
70
                TimeSpan.FromSeconds(5), 
71
                TimeSpan.FromMilliseconds(200));
72
            
73
            var subscription = consumer
74
                .Message<StubMessage>()
75
                .Batch(batchSize, TimeSpan.FromMilliseconds(batchTime))
76
                .Action(stub.ConsumeBatch)
77
                .Subscribe(topic.Name);
78

79
            await topic.WhenConsumedAll();
80
            await Task.Delay(1000);
81
            await subscription.UnsubscribeAsync();
82
            
83
            _server.Output.WriteLine($"Generated {count} calls");
84

85
            stub.Consumed.Count.Should().Be(count);
86
        }
87
    }
88
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.