1
"""Storage for saving and reading files."""
4
from contextlib import asynccontextmanager
5
from pathlib import Path
6
from typing import AsyncIterator, Optional, Protocol
7
from uuid import UUID, uuid4
10
from aiofiles import os as aio_os
11
from aiofiles.tempfile.temptypes import AsyncSpooledTemporaryFile
14
class AsyncBufferBase(Protocol):
15
async def seek(self, cursor: int, whence: int = os.SEEK_SET) -> int:
19
class AsyncBufferReadable(AsyncBufferBase):
20
async def read(self, bytes_to_read: Optional[int] = None) -> bytes:
25
def __init__(self, storage_path: Path) -> None:
26
"""Create new file storage.
28
`storage_path` should be path to existing directory.
31
assert storage_path.exists(), "`storage_path` dir should exists"
33
self._storage_path = storage_path
36
async def file(self, file_uuid: UUID) -> AsyncIterator[AsyncBufferReadable]:
37
"""Get file object in storage by its UUID."""
39
file_path = self._get_path_to_file(file_uuid)
41
assert file_path.exists(), f"File with uuid {file_uuid} not exists"
43
async with aiofiles.open(file_path, "rb") as fo:
46
async def save(self, file: AsyncSpooledTemporaryFile) -> UUID:
47
"""Save file to storage using its file object.
53
file_path = self._get_path_to_file(file_uuid)
55
async with aiofiles.open(file_path, "wb") as target_fo:
56
async for chunk in file:
57
await target_fo.write(chunk)
61
async def remove(self, file_uuid: UUID) -> None:
62
file_path = self._get_path_to_file(file_uuid)
64
assert file_path.exists(), f"File with uuid {file_uuid} not exists"
66
await aio_os.remove(file_path)
68
def _get_path_to_file(self, file_uuid: UUID) -> Path:
69
return self._storage_path.joinpath(str(file_uuid))