9
To install: pip install -r requirements.txt
12
dragonfly --mem_defrag_threshold=0.01 --mem_defrag_waste_threshold=0.01
13
defrag_mem_test.py -k 8000000 -v 645
15
This program would try to re-create the issue with memory defragmentation.
16
See issue number 448 for more details.
18
You can just execute this from the command line without any arguemnts.
19
Or you can run with --help to see the options.
21
number of keys: 800,000
23
key name pattern: key-for-testing
25
port: default redis port
26
Please note that this would create 4 * number of keys entries
27
You can see the memory usage/defrag state with the monitoring task that
28
prints the current state
31
If this seems to get stuck please kill it with ctrl+c
32
This can happen in case we don't have "defrag_realloc_total > 0"
47
async def run_cmd(connection, cmd, sub_val):
48
val = await connection.execute_command(cmd, sub_val)
52
async def handle_defrag_stats(connection, prev):
53
info = await run_cmd(connection, "info", "stats")
55
if info["defrag_task_invocation_total"] != prev:
56
print("--------------------------------------------------------------")
57
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
58
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
59
print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
60
print("--------------------------------------------------------------")
61
if info["defrag_realloc_total"] > 0:
63
return False, info["defrag_task_invocation_total"]
67
async def memory_stats(connection):
68
print("--------------------------------------------------------------")
69
info = await run_cmd(connection, "info", "memory")
70
# print(f"memory commited: {info['comitted_memory']:,}")
71
print(f"memory used: {info['used_memory']:,}")
72
# print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
73
print("--------------------------------------------------------------")
76
async def stats_check(connection, condition):
78
defrag_task_invocation_total = 0
80
while condition.dont_stop():
81
await asyncio.sleep(0.3)
82
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
84
print("defrag task successfully found memory locations to reallocate")
88
defrag_task_invocation_total = d
91
await memory_stats(connection)
93
done, d = await handle_defrag_stats(connection, -1)
95
print("defrag task successfully found memory locations to reallocate")
98
await asyncio.sleep(2)
100
except Exception as e:
101
print(f"failed to run monitor task: {e}")
105
async def delete_keys(connection, keys):
106
results = await connection.delete(*keys)
110
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
111
for i in range(1, count, batch_size):
112
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
116
async def mem_cleanup(connection, pattern, num, cond, keys_count):
118
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
119
if cond.dont_stop() == False:
120
print(f"task number {num} that deleted keys {pattern} finished")
122
counter += await delete_keys(connection, keys)
123
await asyncio.sleep(0.2)
124
print(f"task number {num} that deleted keys {pattern} finished")
128
async def run_tasks(pool, key_name, value_size, keys_count):
129
keys = [f"{key_name}-{i}" for i in range(4)]
130
stop_cond = TaskCancel()
132
connection = aioredis.Redis(connection_pool=pool)
134
print(f"creating key {key} with size {value_size} of count {keys_count}")
135
await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size)
136
await asyncio.sleep(2)
141
print(f"deleting keys from {pattern}")
144
connection=connection,
148
keys_count=int(keys_count),
152
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
153
total = await asyncio.gather(*tasks, return_exceptions=True)
154
print(f"successfully deleted {sum(total)} keys")
157
print("finish executing")
159
except Exception as e:
160
print(f"got error {e} while running delete keys")
164
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
165
async_pool = aioredis.ConnectionPool(
166
host=host, port=port, db=0, decode_responses=True, max_connections=16
169
loop = asyncio.new_event_loop()
170
success = loop.run_until_complete(
171
run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)
176
if __name__ == "__main__":
177
parser = argparse.ArgumentParser(
178
description="active memory testing", formatter_class=argparse.ArgumentDefaultsHelpFormatter
180
parser.add_argument("-k", "--keys", type=int, default=800000, help="total number of keys")
181
parser.add_argument("-v", "--value_size", type=int, default=645, help="size of the values")
183
"-n", "--key_name", type=str, default="key-for-testing", help="the base key name"
185
parser.add_argument("-s", "--server", type=str, default="localhost", help="server host name")
186
parser.add_argument("-p", "--port", type=int, default=6379, help="server port number")
187
args = parser.parse_args()
189
key_name = args.key_name
190
value_size = args.value_size
194
f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}"
196
result = connect_and_run(
197
key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port
200
print("finished successfully")