AutoGPT

Форк
0
/
test_file_operations.py 
254 строки · 8.0 Кб
1
import os
2
import re
3
from pathlib import Path
4

5
import pytest
6
from pytest_mock import MockerFixture
7

8
import autogpt.commands.file_operations as file_ops
9
from autogpt.agents.agent import Agent
10
from autogpt.agents.utils.exceptions import DuplicateOperationError
11
from autogpt.config import Config
12
from autogpt.file_storage import FileStorage
13
from autogpt.memory.vector.memory_item import MemoryItem
14
from autogpt.memory.vector.utils import Embedding
15

16

17
@pytest.fixture()
18
def file_content():
19
    return "This is a test file.\n"
20

21

22
@pytest.fixture()
23
def mock_MemoryItem_from_text(
24
    mocker: MockerFixture, mock_embedding: Embedding, config: Config
25
):
26
    mocker.patch.object(
27
        file_ops.MemoryItemFactory,
28
        "from_text",
29
        new=lambda content, source_type, config, metadata: MemoryItem(
30
            raw_content=content,
31
            summary=f"Summary of content '{content}'",
32
            chunk_summaries=[f"Summary of content '{content}'"],
33
            chunks=[content],
34
            e_summary=mock_embedding,
35
            e_chunks=[mock_embedding],
36
            metadata=metadata | {"source_type": source_type},
37
        ),
38
    )
39

40

41
@pytest.fixture()
42
def test_file_name():
43
    return Path("test_file.txt")
44

45

46
@pytest.fixture
47
def test_file_path(test_file_name: Path, storage: FileStorage):
48
    return storage.get_path(test_file_name)
49

50

51
@pytest.fixture()
52
def test_directory(storage: FileStorage):
53
    return storage.get_path("test_directory")
54

55

56
@pytest.fixture()
57
def test_nested_file(storage: FileStorage):
58
    return storage.get_path("nested/test_file.txt")
59

60

61
def test_file_operations_log():
62
    all_logs = (
63
        "File Operation Logger\n"
64
        "write: path/to/file1.txt #checksum1\n"
65
        "write: path/to/file2.txt #checksum2\n"
66
        "write: path/to/file3.txt #checksum3\n"
67
        "append: path/to/file2.txt #checksum4\n"
68
        "delete: path/to/file3.txt\n"
69
    )
70
    logs = all_logs.split("\n")
71

72
    expected = [
73
        ("write", "path/to/file1.txt", "checksum1"),
74
        ("write", "path/to/file2.txt", "checksum2"),
75
        ("write", "path/to/file3.txt", "checksum3"),
76
        ("append", "path/to/file2.txt", "checksum4"),
77
        ("delete", "path/to/file3.txt", None),
78
    ]
79
    assert list(file_ops.operations_from_log(logs)) == expected
80

81

82
def test_is_duplicate_operation(agent: Agent, mocker: MockerFixture):
83
    # Prepare a fake state dictionary for the function to use
84
    state = {
85
        "path/to/file1.txt": "checksum1",
86
        "path/to/file2.txt": "checksum2",
87
    }
88
    mocker.patch.object(file_ops, "file_operations_state", lambda _: state)
89

90
    # Test cases with write operations
91
    assert (
92
        file_ops.is_duplicate_operation(
93
            "write", Path("path/to/file1.txt"), agent, "checksum1"
94
        )
95
        is True
96
    )
97
    assert (
98
        file_ops.is_duplicate_operation(
99
            "write", Path("path/to/file1.txt"), agent, "checksum2"
100
        )
101
        is False
102
    )
103
    assert (
104
        file_ops.is_duplicate_operation(
105
            "write", Path("path/to/file3.txt"), agent, "checksum3"
106
        )
107
        is False
108
    )
109
    # Test cases with append operations
110
    assert (
111
        file_ops.is_duplicate_operation(
112
            "append", Path("path/to/file1.txt"), agent, "checksum1"
113
        )
114
        is False
115
    )
116
    # Test cases with delete operations
117
    assert (
118
        file_ops.is_duplicate_operation("delete", Path("path/to/file1.txt"), agent)
119
        is False
120
    )
121
    assert (
122
        file_ops.is_duplicate_operation("delete", Path("path/to/file3.txt"), agent)
123
        is True
124
    )
125

126

127
# Test logging a file operation
128
@pytest.mark.asyncio
129
async def test_log_operation(agent: Agent):
130
    await file_ops.log_operation("log_test", Path("path/to/test"), agent=agent)
131
    log_entry = agent.get_file_operation_lines()[-1]
132
    assert "log_test: path/to/test" in log_entry
133

134

135
def test_text_checksum(file_content: str):
136
    checksum = file_ops.text_checksum(file_content)
137
    different_checksum = file_ops.text_checksum("other content")
138
    assert re.match(r"^[a-fA-F0-9]+$", checksum) is not None
139
    assert checksum != different_checksum
140

141

142
@pytest.mark.asyncio
143
async def test_log_operation_with_checksum(agent: Agent):
144
    await file_ops.log_operation(
145
        "log_test", Path("path/to/test"), agent=agent, checksum="ABCDEF"
146
    )
147
    log_entry = agent.get_file_operation_lines()[-1]
148
    assert "log_test: path/to/test #ABCDEF" in log_entry
149

150

151
@pytest.mark.asyncio
152
async def test_read_file(
153
    mock_MemoryItem_from_text,
154
    test_file_path: Path,
155
    file_content,
156
    agent: Agent,
157
):
158
    await agent.workspace.write_file(test_file_path.name, file_content)
159
    await file_ops.log_operation(
160
        "write", Path(test_file_path.name), agent, file_ops.text_checksum(file_content)
161
    )
162
    content = file_ops.read_file(test_file_path.name, agent=agent)
163
    assert content.replace("\r", "") == file_content
164

165

166
def test_read_file_not_found(agent: Agent):
167
    filename = "does_not_exist.txt"
168
    with pytest.raises(FileNotFoundError):
169
        file_ops.read_file(filename, agent=agent)
170

171

172
@pytest.mark.asyncio
173
async def test_write_to_file_relative_path(test_file_name: Path, agent: Agent):
174
    new_content = "This is new content.\n"
175
    await file_ops.write_to_file(test_file_name, new_content, agent=agent)
176
    with open(agent.workspace.get_path(test_file_name), "r", encoding="utf-8") as f:
177
        content = f.read()
178
    assert content == new_content
179

180

181
@pytest.mark.asyncio
182
async def test_write_to_file_absolute_path(test_file_path: Path, agent: Agent):
183
    new_content = "This is new content.\n"
184
    await file_ops.write_to_file(test_file_path, new_content, agent=agent)
185
    with open(test_file_path, "r", encoding="utf-8") as f:
186
        content = f.read()
187
    assert content == new_content
188

189

190
@pytest.mark.asyncio
191
async def test_write_file_logs_checksum(test_file_name: Path, agent: Agent):
192
    new_content = "This is new content.\n"
193
    new_checksum = file_ops.text_checksum(new_content)
194
    await file_ops.write_to_file(test_file_name, new_content, agent=agent)
195
    log_entry = agent.get_file_operation_lines()[-1]
196
    assert log_entry == f"write: {test_file_name} #{new_checksum}"
197

198

199
@pytest.mark.asyncio
200
async def test_write_file_fails_if_content_exists(test_file_name: Path, agent: Agent):
201
    new_content = "This is new content.\n"
202
    await file_ops.log_operation(
203
        "write",
204
        test_file_name,
205
        agent=agent,
206
        checksum=file_ops.text_checksum(new_content),
207
    )
208
    with pytest.raises(DuplicateOperationError):
209
        await file_ops.write_to_file(test_file_name, new_content, agent=agent)
210

211

212
@pytest.mark.asyncio
213
async def test_write_file_succeeds_if_content_different(
214
    test_file_path: Path, file_content: str, agent: Agent
215
):
216
    await agent.workspace.write_file(test_file_path.name, file_content)
217
    await file_ops.log_operation(
218
        "write", Path(test_file_path.name), agent, file_ops.text_checksum(file_content)
219
    )
220
    new_content = "This is different content.\n"
221
    await file_ops.write_to_file(test_file_path.name, new_content, agent=agent)
222

223

224
@pytest.mark.asyncio
225
async def test_list_files(agent: Agent):
226
    # Create files A and B
227
    file_a_name = "file_a.txt"
228
    file_b_name = "file_b.txt"
229
    test_directory = Path("test_directory")
230

231
    await agent.workspace.write_file(file_a_name, "This is file A.")
232
    await agent.workspace.write_file(file_b_name, "This is file B.")
233

234
    # Create a subdirectory and place a copy of file_a in it
235
    agent.workspace.make_dir(test_directory)
236
    await agent.workspace.write_file(
237
        test_directory / file_a_name, "This is file A in the subdirectory."
238
    )
239

240
    files = file_ops.list_folder(".", agent=agent)
241
    assert file_a_name in files
242
    assert file_b_name in files
243
    assert os.path.join(test_directory, file_a_name) in files
244

245
    # Clean up
246
    agent.workspace.delete_file(file_a_name)
247
    agent.workspace.delete_file(file_b_name)
248
    agent.workspace.delete_file(test_directory / file_a_name)
249
    agent.workspace.delete_dir(test_directory)
250

251
    # Case 2: Search for a file that does not exist and make sure we don't throw
252
    non_existent_file = "non_existent_file.txt"
253
    files = file_ops.list_folder("", agent=agent)
254
    assert non_existent_file not in files
255

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.