Langchain-Chatchat
417 строк · 16.4 Кб
1import os
2from configs import (
3KB_ROOT_PATH,
4CHUNK_SIZE,
5OVERLAP_SIZE,
6ZH_TITLE_ENHANCE,
7logger,
8log_verbose,
9text_splitter_dict,
10LLM_MODELS,
11TEXT_SPLITTER_NAME,
12)
13import importlib
14from text_splitter import zh_title_enhance as func_zh_title_enhance
15import langchain.document_loaders
16from langchain.docstore.document import Document
17from langchain.text_splitter import TextSplitter
18from pathlib import Path
19from server.utils import run_in_thread_pool, get_model_worker_config
20import json
21from typing import List, Union,Dict, Tuple, Generator
22import chardet
23
24
25def validate_kb_name(knowledge_base_id: str) -> bool:
26# 检查是否包含预期外的字符或路径攻击关键字
27if "../" in knowledge_base_id:
28return False
29return True
30
31
32def get_kb_path(knowledge_base_name: str):
33return os.path.join(KB_ROOT_PATH, knowledge_base_name)
34
35
36def get_doc_path(knowledge_base_name: str):
37return os.path.join(get_kb_path(knowledge_base_name), "content")
38
39
40def get_vs_path(knowledge_base_name: str, vector_name: str):
41return os.path.join(get_kb_path(knowledge_base_name), "vector_store", vector_name)
42
43
44def get_file_path(knowledge_base_name: str, doc_name: str):
45doc_path = Path(get_doc_path(knowledge_base_name))
46file_path = doc_path / doc_name
47if file_path.is_relative_to(doc_path):
48return str(file_path)
49
50
51def list_kbs_from_folder():
52return [f for f in os.listdir(KB_ROOT_PATH)
53if os.path.isdir(os.path.join(KB_ROOT_PATH, f))]
54
55
56def list_files_from_folder(kb_name: str):
57doc_path = get_doc_path(kb_name)
58result = []
59
60def is_skiped_path(path: str):
61tail = os.path.basename(path).lower()
62for x in ["temp", "tmp", ".", "~$"]:
63if tail.startswith(x):
64return True
65return False
66
67def process_entry(entry):
68if is_skiped_path(entry.path):
69return
70
71if entry.is_symlink():
72target_path = os.path.realpath(entry.path)
73with os.scandir(target_path) as target_it:
74for target_entry in target_it:
75process_entry(target_entry)
76elif entry.is_file():
77file_path = (Path(os.path.relpath(entry.path, doc_path)).as_posix()) # 路径统一为 posix 格式
78result.append(file_path)
79elif entry.is_dir():
80with os.scandir(entry.path) as it:
81for sub_entry in it:
82process_entry(sub_entry)
83
84with os.scandir(doc_path) as it:
85for entry in it:
86process_entry(entry)
87
88return result
89
90
91LOADER_DICT = {"UnstructuredHTMLLoader": ['.html', '.htm'],
92"MHTMLLoader": ['.mhtml'],
93"UnstructuredMarkdownLoader": ['.md'],
94"JSONLoader": [".json"],
95"JSONLinesLoader": [".jsonl"],
96"CSVLoader": [".csv"],
97# "FilteredCSVLoader": [".csv"], 如果使用自定义分割csv
98"RapidOCRPDFLoader": [".pdf"],
99"RapidOCRDocLoader": ['.docx', '.doc'],
100"RapidOCRPPTLoader": ['.ppt', '.pptx', ],
101"RapidOCRLoader": ['.png', '.jpg', '.jpeg', '.bmp'],
102"UnstructuredFileLoader": ['.eml', '.msg', '.rst',
103'.rtf', '.txt', '.xml',
104'.epub', '.odt','.tsv'],
105"UnstructuredEmailLoader": ['.eml', '.msg'],
106"UnstructuredEPubLoader": ['.epub'],
107"UnstructuredExcelLoader": ['.xlsx', '.xls', '.xlsd'],
108"NotebookLoader": ['.ipynb'],
109"UnstructuredODTLoader": ['.odt'],
110"PythonLoader": ['.py'],
111"UnstructuredRSTLoader": ['.rst'],
112"UnstructuredRTFLoader": ['.rtf'],
113"SRTLoader": ['.srt'],
114"TomlLoader": ['.toml'],
115"UnstructuredTSVLoader": ['.tsv'],
116"UnstructuredWordDocumentLoader": ['.docx', '.doc'],
117"UnstructuredXMLLoader": ['.xml'],
118"UnstructuredPowerPointLoader": ['.ppt', '.pptx'],
119"EverNoteLoader": ['.enex'],
120}
121SUPPORTED_EXTS = [ext for sublist in LOADER_DICT.values() for ext in sublist]
122
123
124# patch json.dumps to disable ensure_ascii
125def _new_json_dumps(obj, **kwargs):
126kwargs["ensure_ascii"] = False
127return _origin_json_dumps(obj, **kwargs)
128
129if json.dumps is not _new_json_dumps:
130_origin_json_dumps = json.dumps
131json.dumps = _new_json_dumps
132
133
134class JSONLinesLoader(langchain.document_loaders.JSONLoader):
135'''
136行式 Json 加载器,要求文件扩展名为 .jsonl
137'''
138def __init__(self, *args, **kwargs):
139super().__init__(*args, **kwargs)
140self._json_lines = True
141
142
143langchain.document_loaders.JSONLinesLoader = JSONLinesLoader
144
145
146def get_LoaderClass(file_extension):
147for LoaderClass, extensions in LOADER_DICT.items():
148if file_extension in extensions:
149return LoaderClass
150
151def get_loader(loader_name: str, file_path: str, loader_kwargs: Dict = None):
152'''
153根据loader_name和文件路径或内容返回文档加载器。
154'''
155loader_kwargs = loader_kwargs or {}
156try:
157if loader_name in ["RapidOCRPDFLoader", "RapidOCRLoader", "FilteredCSVLoader",
158"RapidOCRDocLoader", "RapidOCRPPTLoader"]:
159document_loaders_module = importlib.import_module('document_loaders')
160else:
161document_loaders_module = importlib.import_module('langchain.document_loaders')
162DocumentLoader = getattr(document_loaders_module, loader_name)
163except Exception as e:
164msg = f"为文件{file_path}查找加载器{loader_name}时出错:{e}"
165logger.error(f'{e.__class__.__name__}: {msg}',
166exc_info=e if log_verbose else None)
167document_loaders_module = importlib.import_module('langchain.document_loaders')
168DocumentLoader = getattr(document_loaders_module, "UnstructuredFileLoader")
169
170if loader_name == "UnstructuredFileLoader":
171loader_kwargs.setdefault("autodetect_encoding", True)
172elif loader_name == "CSVLoader":
173if not loader_kwargs.get("encoding"):
174# 如果未指定 encoding,自动识别文件编码类型,避免langchain loader 加载文件报编码错误
175with open(file_path, 'rb') as struct_file:
176encode_detect = chardet.detect(struct_file.read())
177if encode_detect is None:
178encode_detect = {"encoding": "utf-8"}
179loader_kwargs["encoding"] = encode_detect["encoding"]
180
181elif loader_name == "JSONLoader":
182loader_kwargs.setdefault("jq_schema", ".")
183loader_kwargs.setdefault("text_content", False)
184elif loader_name == "JSONLinesLoader":
185loader_kwargs.setdefault("jq_schema", ".")
186loader_kwargs.setdefault("text_content", False)
187
188loader = DocumentLoader(file_path, **loader_kwargs)
189return loader
190
191
192def make_text_splitter(
193splitter_name: str = TEXT_SPLITTER_NAME,
194chunk_size: int = CHUNK_SIZE,
195chunk_overlap: int = OVERLAP_SIZE,
196llm_model: str = LLM_MODELS[0],
197):
198"""
199根据参数获取特定的分词器
200"""
201splitter_name = splitter_name or "SpacyTextSplitter"
202try:
203if splitter_name == "MarkdownHeaderTextSplitter": # MarkdownHeaderTextSplitter特殊判定
204headers_to_split_on = text_splitter_dict[splitter_name]['headers_to_split_on']
205text_splitter = langchain.text_splitter.MarkdownHeaderTextSplitter(
206headers_to_split_on=headers_to_split_on)
207else:
208
209try: ## 优先使用用户自定义的text_splitter
210text_splitter_module = importlib.import_module('text_splitter')
211TextSplitter = getattr(text_splitter_module, splitter_name)
212except: ## 否则使用langchain的text_splitter
213text_splitter_module = importlib.import_module('langchain.text_splitter')
214TextSplitter = getattr(text_splitter_module, splitter_name)
215
216if text_splitter_dict[splitter_name]["source"] == "tiktoken": ## 从tiktoken加载
217try:
218text_splitter = TextSplitter.from_tiktoken_encoder(
219encoding_name=text_splitter_dict[splitter_name]["tokenizer_name_or_path"],
220pipeline="zh_core_web_sm",
221chunk_size=chunk_size,
222chunk_overlap=chunk_overlap
223)
224except:
225text_splitter = TextSplitter.from_tiktoken_encoder(
226encoding_name=text_splitter_dict[splitter_name]["tokenizer_name_or_path"],
227chunk_size=chunk_size,
228chunk_overlap=chunk_overlap
229)
230elif text_splitter_dict[splitter_name]["source"] == "huggingface": ## 从huggingface加载
231if text_splitter_dict[splitter_name]["tokenizer_name_or_path"] == "":
232config = get_model_worker_config(llm_model)
233text_splitter_dict[splitter_name]["tokenizer_name_or_path"] = \
234config.get("model_path")
235
236if text_splitter_dict[splitter_name]["tokenizer_name_or_path"] == "gpt2":
237from transformers import GPT2TokenizerFast
238from langchain.text_splitter import CharacterTextSplitter
239tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
240else: ## 字符长度加载
241from transformers import AutoTokenizer
242tokenizer = AutoTokenizer.from_pretrained(
243text_splitter_dict[splitter_name]["tokenizer_name_or_path"],
244trust_remote_code=True)
245text_splitter = TextSplitter.from_huggingface_tokenizer(
246tokenizer=tokenizer,
247chunk_size=chunk_size,
248chunk_overlap=chunk_overlap
249)
250else:
251try:
252text_splitter = TextSplitter(
253pipeline="zh_core_web_sm",
254chunk_size=chunk_size,
255chunk_overlap=chunk_overlap
256)
257except:
258text_splitter = TextSplitter(
259chunk_size=chunk_size,
260chunk_overlap=chunk_overlap
261)
262except Exception as e:
263print(e)
264text_splitter_module = importlib.import_module('langchain.text_splitter')
265TextSplitter = getattr(text_splitter_module, "RecursiveCharacterTextSplitter")
266text_splitter = TextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
267
268# If you use SpacyTextSplitter you can use GPU to do split likes Issue #1287
269# text_splitter._tokenizer.max_length = 37016792
270# text_splitter._tokenizer.prefer_gpu()
271return text_splitter
272
273
274class KnowledgeFile:
275def __init__(
276self,
277filename: str,
278knowledge_base_name: str,
279loader_kwargs: Dict = {},
280):
281'''
282对应知识库目录中的文件,必须是磁盘上存在的才能进行向量化等操作。
283'''
284self.kb_name = knowledge_base_name
285self.filename = str(Path(filename).as_posix())
286self.ext = os.path.splitext(filename)[-1].lower()
287if self.ext not in SUPPORTED_EXTS:
288raise ValueError(f"暂未支持的文件格式 {self.filename}")
289self.loader_kwargs = loader_kwargs
290self.filepath = get_file_path(knowledge_base_name, filename)
291self.docs = None
292self.splited_docs = None
293self.document_loader_name = get_LoaderClass(self.ext)
294self.text_splitter_name = TEXT_SPLITTER_NAME
295
296def file2docs(self, refresh: bool = False):
297if self.docs is None or refresh:
298logger.info(f"{self.document_loader_name} used for {self.filepath}")
299loader = get_loader(loader_name=self.document_loader_name,
300file_path=self.filepath,
301loader_kwargs=self.loader_kwargs)
302self.docs = loader.load()
303return self.docs
304
305def docs2texts(
306self,
307docs: List[Document] = None,
308zh_title_enhance: bool = ZH_TITLE_ENHANCE,
309refresh: bool = False,
310chunk_size: int = CHUNK_SIZE,
311chunk_overlap: int = OVERLAP_SIZE,
312text_splitter: TextSplitter = None,
313):
314docs = docs or self.file2docs(refresh=refresh)
315if not docs:
316return []
317if self.ext not in [".csv"]:
318if text_splitter is None:
319text_splitter = make_text_splitter(splitter_name=self.text_splitter_name, chunk_size=chunk_size,
320chunk_overlap=chunk_overlap)
321if self.text_splitter_name == "MarkdownHeaderTextSplitter":
322docs = text_splitter.split_text(docs[0].page_content)
323else:
324docs = text_splitter.split_documents(docs)
325
326if not docs:
327return []
328
329print(f"文档切分示例:{docs[0]}")
330if zh_title_enhance:
331docs = func_zh_title_enhance(docs)
332self.splited_docs = docs
333return self.splited_docs
334
335def file2text(
336self,
337zh_title_enhance: bool = ZH_TITLE_ENHANCE,
338refresh: bool = False,
339chunk_size: int = CHUNK_SIZE,
340chunk_overlap: int = OVERLAP_SIZE,
341text_splitter: TextSplitter = None,
342):
343if self.splited_docs is None or refresh:
344docs = self.file2docs()
345self.splited_docs = self.docs2texts(docs=docs,
346zh_title_enhance=zh_title_enhance,
347refresh=refresh,
348chunk_size=chunk_size,
349chunk_overlap=chunk_overlap,
350text_splitter=text_splitter)
351return self.splited_docs
352
353def file_exist(self):
354return os.path.isfile(self.filepath)
355
356def get_mtime(self):
357return os.path.getmtime(self.filepath)
358
359def get_size(self):
360return os.path.getsize(self.filepath)
361
362
363def files2docs_in_thread(
364files: List[Union[KnowledgeFile, Tuple[str, str], Dict]],
365chunk_size: int = CHUNK_SIZE,
366chunk_overlap: int = OVERLAP_SIZE,
367zh_title_enhance: bool = ZH_TITLE_ENHANCE,
368) -> Generator:
369'''
370利用多线程批量将磁盘文件转化成langchain Document.
371如果传入参数是Tuple,形式为(filename, kb_name)
372生成器返回值为 status, (kb_name, file_name, docs | error)
373'''
374
375def file2docs(*, file: KnowledgeFile, **kwargs) -> Tuple[bool, Tuple[str, str, List[Document]]]:
376try:
377return True, (file.kb_name, file.filename, file.file2text(**kwargs))
378except Exception as e:
379msg = f"从文件 {file.kb_name}/{file.filename} 加载文档时出错:{e}"
380logger.error(f'{e.__class__.__name__}: {msg}',
381exc_info=e if log_verbose else None)
382return False, (file.kb_name, file.filename, msg)
383
384kwargs_list = []
385for i, file in enumerate(files):
386kwargs = {}
387try:
388if isinstance(file, tuple) and len(file) >= 2:
389filename = file[0]
390kb_name = file[1]
391file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)
392elif isinstance(file, dict):
393filename = file.pop("filename")
394kb_name = file.pop("kb_name")
395kwargs.update(file)
396file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)
397kwargs["file"] = file
398kwargs["chunk_size"] = chunk_size
399kwargs["chunk_overlap"] = chunk_overlap
400kwargs["zh_title_enhance"] = zh_title_enhance
401kwargs_list.append(kwargs)
402except Exception as e:
403yield False, (kb_name, filename, str(e))
404
405for result in run_in_thread_pool(func=file2docs, params=kwargs_list):
406yield result
407
408
409if __name__ == "__main__":
410from pprint import pprint
411
412kb_file = KnowledgeFile(
413filename="/home/congyin/Code/Project_Langchain_0814/Langchain-Chatchat/knowledge_base/csv1/content/gm.csv",
414knowledge_base_name="samples")
415# kb_file.text_splitter_name = "RecursiveCharacterTextSplitter"
416docs = kb_file.file2docs()
417# pprint(docs[-1])
418