dragonfly

Форк
0
/
defrag_mem_test.py 
202 строки · 7.1 Кб
1
#!/usr/bin/env python3
2
import asyncio
3
import aioredis
4
import async_timeout
5
import sys
6
import argparse
7

8
"""
9
To install: pip install -r requirements.txt
10

11
Run
12
dragonfly --mem_defrag_threshold=0.01 --mem_defrag_waste_threshold=0.01
13
defrag_mem_test.py -k 8000000 -v 645
14

15
This program would try to re-create the issue with memory defragmentation.
16
See issue number 448 for more details.
17
To run this:
18
    You can just execute this from the command line without any arguemnts.
19
    Or you can run with --help to see the options.
20
    The defaults are:
21
    number of keys: 800,000
22
    value size: 64 bytes
23
    key name pattern: key-for-testing
24
    host: localhost
25
    port: default redis port
26
    Please note that this would create 4 * number of keys entries
27
    You can see the memory usage/defrag state with the monitoring task that
28
    prints the current state
29

30
NOTE:
31
    If this seems to get stuck please kill it with ctrl+c
32
    This can happen in case we don't have "defrag_realloc_total > 0"
33
"""
34

35

36
class TaskCancel:
37
    def __init__(self):
38
        self.run = True
39

40
    def dont_stop(self):
41
        return self.run
42

43
    def stop(self):
44
        self.run = False
45

46

47
async def run_cmd(connection, cmd, sub_val):
48
    val = await connection.execute_command(cmd, sub_val)
49
    return val
50

51

52
async def handle_defrag_stats(connection, prev):
53
    info = await run_cmd(connection, "info", "stats")
54
    if info is not None:
55
        if info["defrag_task_invocation_total"] != prev:
56
            print("--------------------------------------------------------------")
57
            print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
58
            print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
59
            print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
60
            print("--------------------------------------------------------------")
61
            if info["defrag_realloc_total"] > 0:
62
                return True, None
63
            return False, info["defrag_task_invocation_total"]
64
    return False, None
65

66

67
async def memory_stats(connection):
68
    print("--------------------------------------------------------------")
69
    info = await run_cmd(connection, "info", "memory")
70
    # print(f"memory commited: {info['comitted_memory']:,}")
71
    print(f"memory used: {info['used_memory']:,}")
72
    # print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
73
    print("--------------------------------------------------------------")
74

75

76
async def stats_check(connection, condition):
77
    try:
78
        defrag_task_invocation_total = 0
79
        runs = 0
80
        while condition.dont_stop():
81
            await asyncio.sleep(0.3)
82
            done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
83
            if done:
84
                print("defrag task successfully found memory locations to reallocate")
85
                condition.stop()
86
            else:
87
                if d is not None:
88
                    defrag_task_invocation_total = d
89
            runs += 1
90
            if runs % 3 == 0:
91
                await memory_stats(connection)
92
        for i in range(5):
93
            done, d = await handle_defrag_stats(connection, -1)
94
            if done:
95
                print("defrag task successfully found memory locations to reallocate")
96
                return True
97
            else:
98
                await asyncio.sleep(2)
99
        return True
100
    except Exception as e:
101
        print(f"failed to run monitor task: {e}")
102
    return False
103

104

105
async def delete_keys(connection, keys):
106
    results = await connection.delete(*keys)
107
    return results
108

109

110
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
111
    for i in range(1, count, batch_size):
112
        batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
113
        yield batch
114

115

116
async def mem_cleanup(connection, pattern, num, cond, keys_count):
117
    counter = 0
118
    for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
119
        if cond.dont_stop() == False:
120
            print(f"task number {num} that deleted keys {pattern} finished")
121
            return counter
122
        counter += await delete_keys(connection, keys)
123
        await asyncio.sleep(0.2)
124
    print(f"task number {num} that deleted keys {pattern} finished")
125
    return counter
126

127

128
async def run_tasks(pool, key_name, value_size, keys_count):
129
    keys = [f"{key_name}-{i}" for i in range(4)]
130
    stop_cond = TaskCancel()
131
    try:
132
        connection = aioredis.Redis(connection_pool=pool)
133
        for key in keys:
134
            print(f"creating key {key} with size {value_size} of count {keys_count}")
135
            await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size)
136
            await asyncio.sleep(2)
137
        tasks = []
138
        count = 0
139
        for key in keys:
140
            pattern = f"{key}:"
141
            print(f"deleting keys from {pattern}")
142
            tasks.append(
143
                mem_cleanup(
144
                    connection=connection,
145
                    pattern=pattern,
146
                    num=count,
147
                    cond=stop_cond,
148
                    keys_count=int(keys_count),
149
                )
150
            )
151
            count += 1
152
        monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
153
        total = await asyncio.gather(*tasks, return_exceptions=True)
154
        print(f"successfully deleted {sum(total)} keys")
155
        stop_cond.stop()
156
        await monitor_task
157
        print("finish executing")
158
        return True
159
    except Exception as e:
160
        print(f"got error {e} while running delete keys")
161
        return False
162

163

164
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
165
    async_pool = aioredis.ConnectionPool(
166
        host=host, port=port, db=0, decode_responses=True, max_connections=16
167
    )
168

169
    loop = asyncio.new_event_loop()
170
    success = loop.run_until_complete(
171
        run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)
172
    )
173
    return success
174

175

176
if __name__ == "__main__":
177
    parser = argparse.ArgumentParser(
178
        description="active memory testing", formatter_class=argparse.ArgumentDefaultsHelpFormatter
179
    )
180
    parser.add_argument("-k", "--keys", type=int, default=800000, help="total number of keys")
181
    parser.add_argument("-v", "--value_size", type=int, default=645, help="size of the values")
182
    parser.add_argument(
183
        "-n", "--key_name", type=str, default="key-for-testing", help="the base key name"
184
    )
185
    parser.add_argument("-s", "--server", type=str, default="localhost", help="server host name")
186
    parser.add_argument("-p", "--port", type=int, default=6379, help="server port number")
187
    args = parser.parse_args()
188
    keys_num = args.keys
189
    key_name = args.key_name
190
    value_size = args.value_size
191
    host = args.server
192
    port = args.port
193
    print(
194
        f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}"
195
    )
196
    result = connect_and_run(
197
        key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port
198
    )
199
    if result == True:
200
        print("finished successfully")
201
    else:
202
        print("failed")
203

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.