Langchain-Chatchat
256 строк · 10.4 Кб
1from typing import List
2import os
3import shutil
4from langchain.embeddings.base import Embeddings
5from langchain.schema import Document
6from langchain.vectorstores.elasticsearch import ElasticsearchStore
7from configs import KB_ROOT_PATH, EMBEDDING_MODEL, EMBEDDING_DEVICE, CACHED_VS_NUM
8from server.knowledge_base.kb_service.base import KBService, SupportedVSType
9from server.knowledge_base.utils import KnowledgeFile
10from server.utils import load_local_embeddings
11from elasticsearch import Elasticsearch,BadRequestError
12from configs import logger
13from configs import kbs_config
14
15class ESKBService(KBService):
16
17def do_init(self):
18self.kb_path = self.get_kb_path(self.kb_name)
19self.index_name = os.path.split(self.kb_path)[-1]
20self.IP = kbs_config[self.vs_type()]['host']
21self.PORT = kbs_config[self.vs_type()]['port']
22self.user = kbs_config[self.vs_type()].get("user",'')
23self.password = kbs_config[self.vs_type()].get("password",'')
24self.dims_length = kbs_config[self.vs_type()].get("dims_length",None)
25self.embeddings_model = load_local_embeddings(self.embed_model, EMBEDDING_DEVICE)
26try:
27# ES python客户端连接(仅连接)
28if self.user != "" and self.password != "":
29self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}",
30basic_auth=(self.user,self.password))
31else:
32logger.warning("ES未配置用户名和密码")
33self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}")
34except ConnectionError:
35logger.error("连接到 Elasticsearch 失败!")
36raise ConnectionError
37except Exception as e:
38logger.error(f"Error 发生 : {e}")
39raise e
40try:
41# 首先尝试通过es_client_python创建
42mappings = {
43"properties": {
44"dense_vector": {
45"type": "dense_vector",
46"dims": self.dims_length,
47"index": True
48}
49}
50}
51self.es_client_python.indices.create(index=self.index_name, mappings=mappings)
52except BadRequestError as e:
53logger.error("创建索引失败,重新")
54logger.error(e)
55
56try:
57# langchain ES 连接、创建索引
58if self.user != "" and self.password != "":
59self.db_init = ElasticsearchStore(
60es_url=f"http://{self.IP}:{self.PORT}",
61index_name=self.index_name,
62query_field="context",
63vector_query_field="dense_vector",
64embedding=self.embeddings_model,
65es_user=self.user,
66es_password=self.password
67)
68else:
69logger.warning("ES未配置用户名和密码")
70self.db_init = ElasticsearchStore(
71es_url=f"http://{self.IP}:{self.PORT}",
72index_name=self.index_name,
73query_field="context",
74vector_query_field="dense_vector",
75embedding=self.embeddings_model,
76)
77except ConnectionError:
78print("### 初始化 Elasticsearch 失败!")
79logger.error("### 初始化 Elasticsearch 失败!")
80raise ConnectionError
81except Exception as e:
82logger.error(f"Error 发生 : {e}")
83raise e
84try:
85# 尝试通过db_init创建索引
86self.db_init._create_index_if_not_exists(
87index_name=self.index_name,
88dims_length=self.dims_length
89)
90except Exception as e:
91logger.error("创建索引失败...")
92logger.error(e)
93# raise e
94
95
96
97@staticmethod
98def get_kb_path(knowledge_base_name: str):
99return os.path.join(KB_ROOT_PATH, knowledge_base_name)
100
101@staticmethod
102def get_vs_path(knowledge_base_name: str):
103return os.path.join(ESKBService.get_kb_path(knowledge_base_name), "vector_store")
104
105def do_create_kb(self):
106if os.path.exists(self.doc_path):
107if not os.path.exists(os.path.join(self.kb_path, "vector_store")):
108os.makedirs(os.path.join(self.kb_path, "vector_store"))
109else:
110logger.warning("directory `vector_store` already exists.")
111
112def vs_type(self) -> str:
113return SupportedVSType.ES
114
115def _load_es(self, docs, embed_model):
116# 将docs写入到ES中
117try:
118# 连接 + 同时写入文档
119if self.user != "" and self.password != "":
120self.db = ElasticsearchStore.from_documents(
121documents=docs,
122embedding=embed_model,
123es_url= f"http://{self.IP}:{self.PORT}",
124index_name=self.index_name,
125distance_strategy="COSINE",
126query_field="context",
127vector_query_field="dense_vector",
128verify_certs=False,
129es_user=self.user,
130es_password=self.password
131)
132else:
133self.db = ElasticsearchStore.from_documents(
134documents=docs,
135embedding=embed_model,
136es_url= f"http://{self.IP}:{self.PORT}",
137index_name=self.index_name,
138distance_strategy="COSINE",
139query_field="context",
140vector_query_field="dense_vector",
141verify_certs=False)
142except ConnectionError as ce:
143print(ce)
144print("连接到 Elasticsearch 失败!")
145logger.error("连接到 Elasticsearch 失败!")
146except Exception as e:
147logger.error(f"Error 发生 : {e}")
148print(e)
149
150
151
152def do_search(self, query:str, top_k: int, score_threshold: float):
153# 文本相似性检索
154docs = self.db_init.similarity_search_with_score(query=query,
155k=top_k)
156return docs
157
158def get_doc_by_ids(self, ids: List[str]) -> List[Document]:
159results = []
160for doc_id in ids:
161try:
162response = self.es_client_python.get(index=self.index_name, id=doc_id)
163source = response["_source"]
164# Assuming your document has "text" and "metadata" fields
165text = source.get("context", "")
166metadata = source.get("metadata", {})
167results.append(Document(page_content=text, metadata=metadata))
168except Exception as e:
169logger.error(f"Error retrieving document from Elasticsearch! {e}")
170return results
171
172def del_doc_by_ids(self, ids: List[str]) -> bool:
173for doc_id in ids:
174try:
175self.es_client_python.delete(index=self.index_name,
176id=doc_id,
177refresh=True)
178except Exception as e:
179logger.error(f"ES Docs Delete Error! {e}")
180
181def do_delete_doc(self, kb_file, **kwargs):
182if self.es_client_python.indices.exists(index=self.index_name):
183# 从向量数据库中删除索引(文档名称是Keyword)
184query = {
185"query": {
186"term": {
187"metadata.source.keyword": self.get_relative_source_path(kb_file.filepath)
188}
189}
190}
191# 注意设置size,默认返回10个。
192search_results = self.es_client_python.search(body=query, size=50)
193delete_list = [hit["_id"] for hit in search_results['hits']['hits']]
194if len(delete_list) == 0:
195return None
196else:
197for doc_id in delete_list:
198try:
199self.es_client_python.delete(index=self.index_name,
200id=doc_id,
201refresh=True)
202except Exception as e:
203logger.error(f"ES Docs Delete Error! {e}")
204
205# self.db_init.delete(ids=delete_list)
206#self.es_client_python.indices.refresh(index=self.index_name)
207
208
209def do_add_doc(self, docs: List[Document], **kwargs):
210'''向知识库添加文件'''
211print(f"server.knowledge_base.kb_service.es_kb_service.do_add_doc 输入的docs参数长度为:{len(docs)}")
212print("*"*100)
213self._load_es(docs=docs, embed_model=self.embeddings_model)
214# 获取 id 和 source , 格式:[{"id": str, "metadata": dict}, ...]
215print("写入数据成功.")
216print("*"*100)
217
218if self.es_client_python.indices.exists(index=self.index_name):
219file_path = docs[0].metadata.get("source")
220query = {
221"query": {
222"term": {
223"metadata.source.keyword": file_path
224},
225"term": {
226"_index": self.index_name
227}
228}
229}
230# 注意设置size,默认返回10个。
231search_results = self.es_client_python.search(body=query, size=50)
232if len(search_results["hits"]["hits"]) == 0:
233raise ValueError("召回元素个数为0")
234info_docs = [{"id":hit["_id"], "metadata": hit["_source"]["metadata"]} for hit in search_results["hits"]["hits"]]
235return info_docs
236
237
238def do_clear_vs(self):
239"""从知识库删除全部向量"""
240if self.es_client_python.indices.exists(index=self.kb_name):
241self.es_client_python.indices.delete(index=self.kb_name)
242
243
244def do_drop_kb(self):
245"""删除知识库"""
246# self.kb_file: 知识库路径
247if os.path.exists(self.kb_path):
248shutil.rmtree(self.kb_path)
249
250
251if __name__ == '__main__':
252esKBService = ESKBService("test")
253#esKBService.clear_vs()
254#esKBService.create_kb()
255esKBService.add_doc(KnowledgeFile(filename="README.md", knowledge_base_name="test"))
256print(esKBService.search_docs("如何启动api服务"))
257
258
259
260
261
262