3
from pathlib import Path
6
from pytest_mock import MockerFixture
8
import autogpt.commands.file_operations as file_ops
9
from autogpt.agents.agent import Agent
10
from autogpt.agents.utils.exceptions import DuplicateOperationError
11
from autogpt.config import Config
12
from autogpt.file_storage import FileStorage
13
from autogpt.memory.vector.memory_item import MemoryItem
14
from autogpt.memory.vector.utils import Embedding
19
return "This is a test file.\n"
23
def mock_MemoryItem_from_text(
24
mocker: MockerFixture, mock_embedding: Embedding, config: Config
27
file_ops.MemoryItemFactory,
29
new=lambda content, source_type, config, metadata: MemoryItem(
31
summary=f"Summary of content '{content}'",
32
chunk_summaries=[f"Summary of content '{content}'"],
34
e_summary=mock_embedding,
35
e_chunks=[mock_embedding],
36
metadata=metadata | {"source_type": source_type},
43
return Path("test_file.txt")
47
def test_file_path(test_file_name: Path, storage: FileStorage):
48
return storage.get_path(test_file_name)
52
def test_directory(storage: FileStorage):
53
return storage.get_path("test_directory")
57
def test_nested_file(storage: FileStorage):
58
return storage.get_path("nested/test_file.txt")
61
def test_file_operations_log():
63
"File Operation Logger\n"
64
"write: path/to/file1.txt #checksum1\n"
65
"write: path/to/file2.txt #checksum2\n"
66
"write: path/to/file3.txt #checksum3\n"
67
"append: path/to/file2.txt #checksum4\n"
68
"delete: path/to/file3.txt\n"
70
logs = all_logs.split("\n")
73
("write", "path/to/file1.txt", "checksum1"),
74
("write", "path/to/file2.txt", "checksum2"),
75
("write", "path/to/file3.txt", "checksum3"),
76
("append", "path/to/file2.txt", "checksum4"),
77
("delete", "path/to/file3.txt", None),
79
assert list(file_ops.operations_from_log(logs)) == expected
82
def test_is_duplicate_operation(agent: Agent, mocker: MockerFixture):
85
"path/to/file1.txt": "checksum1",
86
"path/to/file2.txt": "checksum2",
88
mocker.patch.object(file_ops, "file_operations_state", lambda _: state)
92
file_ops.is_duplicate_operation(
93
"write", Path("path/to/file1.txt"), agent, "checksum1"
98
file_ops.is_duplicate_operation(
99
"write", Path("path/to/file1.txt"), agent, "checksum2"
104
file_ops.is_duplicate_operation(
105
"write", Path("path/to/file3.txt"), agent, "checksum3"
111
file_ops.is_duplicate_operation(
112
"append", Path("path/to/file1.txt"), agent, "checksum1"
118
file_ops.is_duplicate_operation("delete", Path("path/to/file1.txt"), agent)
122
file_ops.is_duplicate_operation("delete", Path("path/to/file3.txt"), agent)
129
async def test_log_operation(agent: Agent):
130
await file_ops.log_operation("log_test", Path("path/to/test"), agent=agent)
131
log_entry = agent.get_file_operation_lines()[-1]
132
assert "log_test: path/to/test" in log_entry
135
def test_text_checksum(file_content: str):
136
checksum = file_ops.text_checksum(file_content)
137
different_checksum = file_ops.text_checksum("other content")
138
assert re.match(r"^[a-fA-F0-9]+$", checksum) is not None
139
assert checksum != different_checksum
143
async def test_log_operation_with_checksum(agent: Agent):
144
await file_ops.log_operation(
145
"log_test", Path("path/to/test"), agent=agent, checksum="ABCDEF"
147
log_entry = agent.get_file_operation_lines()[-1]
148
assert "log_test: path/to/test #ABCDEF" in log_entry
152
async def test_read_file(
153
mock_MemoryItem_from_text,
154
test_file_path: Path,
158
await agent.workspace.write_file(test_file_path.name, file_content)
159
await file_ops.log_operation(
160
"write", Path(test_file_path.name), agent, file_ops.text_checksum(file_content)
162
content = file_ops.read_file(test_file_path.name, agent=agent)
163
assert content.replace("\r", "") == file_content
166
def test_read_file_not_found(agent: Agent):
167
filename = "does_not_exist.txt"
168
with pytest.raises(FileNotFoundError):
169
file_ops.read_file(filename, agent=agent)
173
async def test_write_to_file_relative_path(test_file_name: Path, agent: Agent):
174
new_content = "This is new content.\n"
175
await file_ops.write_to_file(test_file_name, new_content, agent=agent)
176
with open(agent.workspace.get_path(test_file_name), "r", encoding="utf-8") as f:
178
assert content == new_content
182
async def test_write_to_file_absolute_path(test_file_path: Path, agent: Agent):
183
new_content = "This is new content.\n"
184
await file_ops.write_to_file(test_file_path, new_content, agent=agent)
185
with open(test_file_path, "r", encoding="utf-8") as f:
187
assert content == new_content
191
async def test_write_file_logs_checksum(test_file_name: Path, agent: Agent):
192
new_content = "This is new content.\n"
193
new_checksum = file_ops.text_checksum(new_content)
194
await file_ops.write_to_file(test_file_name, new_content, agent=agent)
195
log_entry = agent.get_file_operation_lines()[-1]
196
assert log_entry == f"write: {test_file_name} #{new_checksum}"
200
async def test_write_file_fails_if_content_exists(test_file_name: Path, agent: Agent):
201
new_content = "This is new content.\n"
202
await file_ops.log_operation(
206
checksum=file_ops.text_checksum(new_content),
208
with pytest.raises(DuplicateOperationError):
209
await file_ops.write_to_file(test_file_name, new_content, agent=agent)
213
async def test_write_file_succeeds_if_content_different(
214
test_file_path: Path, file_content: str, agent: Agent
216
await agent.workspace.write_file(test_file_path.name, file_content)
217
await file_ops.log_operation(
218
"write", Path(test_file_path.name), agent, file_ops.text_checksum(file_content)
220
new_content = "This is different content.\n"
221
await file_ops.write_to_file(test_file_path.name, new_content, agent=agent)
225
async def test_list_files(agent: Agent):
227
file_a_name = "file_a.txt"
228
file_b_name = "file_b.txt"
229
test_directory = Path("test_directory")
231
await agent.workspace.write_file(file_a_name, "This is file A.")
232
await agent.workspace.write_file(file_b_name, "This is file B.")
235
agent.workspace.make_dir(test_directory)
236
await agent.workspace.write_file(
237
test_directory / file_a_name, "This is file A in the subdirectory."
240
files = file_ops.list_folder(".", agent=agent)
241
assert file_a_name in files
242
assert file_b_name in files
243
assert os.path.join(test_directory, file_a_name) in files
246
agent.workspace.delete_file(file_a_name)
247
agent.workspace.delete_file(file_b_name)
248
agent.workspace.delete_file(test_directory / file_a_name)
249
agent.workspace.delete_dir(test_directory)
252
non_existent_file = "non_existent_file.txt"
253
files = file_ops.list_folder("", agent=agent)
254
assert non_existent_file not in files