7
@Desc : Classes and Operations Related to Files in the File System.
10
from pathlib import Path
11
from typing import Optional, Union
14
from llama_index.core import Document, SimpleDirectoryReader
15
from llama_index.core.node_parser import SimpleNodeParser
16
from llama_index.readers.file import PDFReader
17
from pydantic import BaseModel, ConfigDict, Field
20
from metagpt.logs import logger
21
from metagpt.repo_parser import RepoParser
24
def validate_cols(content_col: str, df: pd.DataFrame):
25
if content_col not in df.columns:
26
raise ValueError("Content column not found in DataFrame.")
29
def read_data(data_path: Path) -> Union[pd.DataFrame, list[Document]]:
30
suffix = data_path.suffix
32
data = pd.read_excel(data_path)
33
elif ".csv" == suffix:
34
data = pd.read_csv(data_path)
35
elif ".json" == suffix:
36
data = pd.read_json(data_path)
37
elif suffix in (".docx", ".doc"):
38
data = SimpleDirectoryReader(input_files=[str(data_path)]).load_data()
39
elif ".txt" == suffix:
40
data = SimpleDirectoryReader(input_files=[str(data_path)]).load_data()
41
node_parser = SimpleNodeParser.from_defaults(separator="\n", chunk_size=256, chunk_overlap=0)
42
data = node_parser.get_nodes_from_documents(data)
43
elif ".pdf" == suffix:
44
data = PDFReader.load_data(str(data_path))
46
raise NotImplementedError("File format not supported.")
50
class DocumentStatus(Enum):
51
"""Indicates document status, a mechanism similar to RFC/PEP"""
54
UNDERREVIEW = "underreview"
59
class Document(BaseModel):
61
Document: Handles operations related to document files.
64
path: Path = Field(default=None)
65
name: str = Field(default="")
66
content: str = Field(default="")
68
# metadata? in content perhaps.
69
author: str = Field(default="")
70
status: DocumentStatus = Field(default=DocumentStatus.DRAFT)
71
reviews: list = Field(default_factory=list)
74
def from_path(cls, path: Path):
76
Create a Document instance from a file path.
79
raise FileNotFoundError(f"File {path} not found.")
80
content = path.read_text()
81
return cls(content=content, path=path)
84
def from_text(cls, text: str, path: Optional[Path] = None):
86
Create a Document from a text string.
88
return cls(content=text, path=path)
90
def to_path(self, path: Optional[Path] = None):
92
Save content to the specified file path.
98
raise ValueError("File path is not set.")
100
self.path.parent.mkdir(parents=True, exist_ok=True)
101
# TODO: excel, csv, json, etc.
102
self.path.write_text(self.content, encoding="utf-8")
106
Persist document to disk.
108
return self.to_path()
111
class IndexableDocument(Document):
113
Advanced document handling: For vector databases or search engines.
116
model_config = ConfigDict(arbitrary_types_allowed=True)
118
data: Union[pd.DataFrame, list]
119
content_col: Optional[str] = Field(default="")
120
meta_col: Optional[str] = Field(default="")
123
def from_path(cls, data_path: Path, content_col="content", meta_col="metadata"):
124
if not data_path.exists():
125
raise FileNotFoundError(f"File {data_path} not found.")
126
data = read_data(data_path)
127
if isinstance(data, pd.DataFrame):
128
validate_cols(content_col, data)
129
return cls(data=data, content=str(data), content_col=content_col, meta_col=meta_col)
131
content = data_path.read_text()
132
except Exception as e:
133
logger.debug(f"Load {str(data_path)} error: {e}")
135
return cls(data=data, content=content, content_col=content_col, meta_col=meta_col)
137
def _get_docs_and_metadatas_by_df(self) -> (list, list):
141
for i in tqdm(range(len(df))):
142
docs.append(df[self.content_col].iloc[i])
144
metadatas.append({self.meta_col: df[self.meta_col].iloc[i]})
147
return docs, metadatas
149
def _get_docs_and_metadatas_by_llamaindex(self) -> (list, list):
151
docs = [i.text for i in data]
152
metadatas = [i.metadata for i in data]
153
return docs, metadatas
155
def get_docs_and_metadatas(self) -> (list, list):
156
if isinstance(self.data, pd.DataFrame):
157
return self._get_docs_and_metadatas_by_df()
158
elif isinstance(self.data, list):
159
return self._get_docs_and_metadatas_by_llamaindex()
161
raise NotImplementedError("Data type not supported for metadata extraction.")
164
class RepoMetadata(BaseModel):
165
name: str = Field(default="")
166
n_docs: int = Field(default=0)
167
n_chars: int = Field(default=0)
168
symbols: list = Field(default_factory=list)
171
class Repo(BaseModel):
173
name: str = Field(default="")
174
# metadata: RepoMetadata = Field(default=RepoMetadata)
175
docs: dict[Path, Document] = Field(default_factory=dict)
176
codes: dict[Path, Document] = Field(default_factory=dict)
177
assets: dict[Path, Document] = Field(default_factory=dict)
178
path: Path = Field(default=None)
180
def _path(self, filename):
181
return self.path / filename
184
def from_path(cls, path: Path):
185
"""Load documents, code, and assets from a repository path."""
186
path.mkdir(parents=True, exist_ok=True)
187
repo = Repo(path=path, name=path.name)
188
for file_path in path.rglob("*"):
189
# FIXME: These judgments are difficult to support multiple programming languages and need to be more general
190
if file_path.is_file() and file_path.suffix in [".json", ".txt", ".md", ".py", ".js", ".css", ".html"]:
191
repo._set(file_path.read_text(), file_path)
195
"""Persist all documents, code, and assets to the given repository path."""
196
for doc in self.docs.values():
198
for code in self.codes.values():
200
for asset in self.assets.values():
203
def _set(self, content: str, path: Path):
204
"""Add a document to the appropriate category based on its file extension."""
206
doc = Document(content=content, path=path, name=str(path.relative_to(self.path)))
208
# FIXME: These judgments are difficult to support multiple programming languages and need to be more general
209
if suffix.lower() == ".md":
210
self.docs[path] = doc
211
elif suffix.lower() in [".py", ".js", ".css", ".html"]:
212
self.codes[path] = doc
214
self.assets[path] = doc
217
def set(self, filename: str, content: str):
218
"""Set a document and persist it to disk."""
219
path = self._path(filename)
220
doc = self._set(content, path)
223
def get(self, filename: str) -> Optional[Document]:
224
"""Get a document by its filename."""
225
path = self._path(filename)
226
return self.docs.get(path) or self.codes.get(path) or self.assets.get(path)
228
def get_text_documents(self) -> list[Document]:
229
return list(self.docs.values()) + list(self.codes.values())
231
def eda(self) -> RepoMetadata:
232
n_docs = sum(len(i) for i in [self.docs, self.codes, self.assets])
233
n_chars = sum(sum(len(j.content) for j in i.values()) for i in [self.docs, self.codes, self.assets])
234
symbols = RepoParser(base_directory=self.path).generate_symbols()
235
return RepoMetadata(name=self.name, n_docs=n_docs, n_chars=n_chars, symbols=symbols)