Langchain-Chatchat

Форк
0
256 строк · 10.4 Кб
1
from typing import List
2
import os
3
import shutil
4
from langchain.embeddings.base import Embeddings
5
from langchain.schema import Document
6
from langchain.vectorstores.elasticsearch import ElasticsearchStore
7
from configs import KB_ROOT_PATH, EMBEDDING_MODEL, EMBEDDING_DEVICE, CACHED_VS_NUM
8
from server.knowledge_base.kb_service.base import KBService, SupportedVSType
9
from server.knowledge_base.utils import KnowledgeFile
10
from server.utils import load_local_embeddings
11
from elasticsearch import Elasticsearch,BadRequestError
12
from configs import logger
13
from configs import kbs_config
14

15
class ESKBService(KBService):
16

17
    def do_init(self):
18
        self.kb_path = self.get_kb_path(self.kb_name)
19
        self.index_name = os.path.split(self.kb_path)[-1]
20
        self.IP = kbs_config[self.vs_type()]['host']
21
        self.PORT = kbs_config[self.vs_type()]['port']
22
        self.user = kbs_config[self.vs_type()].get("user",'')
23
        self.password = kbs_config[self.vs_type()].get("password",'')
24
        self.dims_length = kbs_config[self.vs_type()].get("dims_length",None)
25
        self.embeddings_model = load_local_embeddings(self.embed_model, EMBEDDING_DEVICE)
26
        try:
27
            # ES python客户端连接(仅连接)
28
            if self.user != "" and self.password != "":
29
                self.es_client_python =  Elasticsearch(f"http://{self.IP}:{self.PORT}",
30
                basic_auth=(self.user,self.password))
31
            else:
32
                logger.warning("ES未配置用户名和密码")
33
                self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}")
34
        except ConnectionError:
35
            logger.error("连接到 Elasticsearch 失败!")
36
            raise ConnectionError
37
        except Exception as e:
38
            logger.error(f"Error 发生 : {e}")
39
            raise e
40
        try:
41
            # 首先尝试通过es_client_python创建
42
            mappings = {
43
                "properties": {
44
                    "dense_vector": {
45
                        "type": "dense_vector",
46
                        "dims": self.dims_length,
47
                        "index": True
48
                    }
49
                }
50
            }
51
            self.es_client_python.indices.create(index=self.index_name, mappings=mappings)
52
        except BadRequestError as e:
53
            logger.error("创建索引失败,重新")
54
            logger.error(e)
55

56
        try:
57
            # langchain ES 连接、创建索引
58
            if self.user != "" and self.password != "":
59
                self.db_init = ElasticsearchStore(
60
                es_url=f"http://{self.IP}:{self.PORT}",
61
                index_name=self.index_name,
62
                query_field="context",
63
                vector_query_field="dense_vector",
64
                embedding=self.embeddings_model,
65
                es_user=self.user,
66
                es_password=self.password
67
            )
68
            else:
69
                logger.warning("ES未配置用户名和密码")
70
                self.db_init = ElasticsearchStore(
71
                    es_url=f"http://{self.IP}:{self.PORT}",
72
                    index_name=self.index_name,
73
                    query_field="context",
74
                    vector_query_field="dense_vector",
75
                    embedding=self.embeddings_model,
76
                )
77
        except ConnectionError:
78
            print("### 初始化 Elasticsearch 失败!")
79
            logger.error("### 初始化 Elasticsearch 失败!")
80
            raise ConnectionError
81
        except Exception as e:
82
            logger.error(f"Error 发生 : {e}")
83
            raise e
84
        try:
85
            # 尝试通过db_init创建索引
86
            self.db_init._create_index_if_not_exists(
87
                                                     index_name=self.index_name,
88
                                                     dims_length=self.dims_length
89
                                                     )
90
        except Exception as e:
91
            logger.error("创建索引失败...")
92
            logger.error(e)
93
            # raise e
94

95

96

97
    @staticmethod
98
    def get_kb_path(knowledge_base_name: str):
99
        return os.path.join(KB_ROOT_PATH, knowledge_base_name)
100

101
    @staticmethod
102
    def get_vs_path(knowledge_base_name: str):
103
        return os.path.join(ESKBService.get_kb_path(knowledge_base_name), "vector_store")
104

105
    def do_create_kb(self):
106
        if os.path.exists(self.doc_path):
107
            if not os.path.exists(os.path.join(self.kb_path, "vector_store")):
108
                os.makedirs(os.path.join(self.kb_path, "vector_store"))
109
            else:
110
                logger.warning("directory `vector_store` already exists.")
111

112
    def vs_type(self) -> str:
113
        return SupportedVSType.ES
114

115
    def _load_es(self, docs, embed_model):
116
        # 将docs写入到ES中
117
        try:
118
            # 连接 + 同时写入文档
119
            if self.user != "" and self.password != "":
120
                self.db = ElasticsearchStore.from_documents(
121
                        documents=docs,
122
                        embedding=embed_model,
123
                        es_url= f"http://{self.IP}:{self.PORT}",
124
                        index_name=self.index_name,
125
                        distance_strategy="COSINE",
126
                        query_field="context",
127
                        vector_query_field="dense_vector",
128
                        verify_certs=False,
129
                        es_user=self.user,
130
                        es_password=self.password
131
                    )
132
            else:
133
                self.db = ElasticsearchStore.from_documents(
134
                        documents=docs,
135
                        embedding=embed_model,
136
                        es_url= f"http://{self.IP}:{self.PORT}",
137
                        index_name=self.index_name,
138
                        distance_strategy="COSINE",
139
                        query_field="context",
140
                        vector_query_field="dense_vector",
141
                        verify_certs=False)
142
        except ConnectionError as ce:
143
            print(ce)
144
            print("连接到 Elasticsearch 失败!")
145
            logger.error("连接到 Elasticsearch 失败!")
146
        except Exception as e:
147
            logger.error(f"Error 发生 : {e}")
148
            print(e)
149

150

151

152
    def do_search(self, query:str, top_k: int, score_threshold: float):
153
        # 文本相似性检索
154
        docs = self.db_init.similarity_search_with_score(query=query,
155
                                         k=top_k)
156
        return docs
157

158
    def get_doc_by_ids(self, ids: List[str]) -> List[Document]:
159
        results = []
160
        for doc_id in ids:
161
            try:
162
                response = self.es_client_python.get(index=self.index_name, id=doc_id)
163
                source = response["_source"]
164
                # Assuming your document has "text" and "metadata" fields
165
                text = source.get("context", "")
166
                metadata = source.get("metadata", {})
167
                results.append(Document(page_content=text, metadata=metadata))
168
            except Exception as e:
169
                logger.error(f"Error retrieving document from Elasticsearch! {e}")
170
        return results
171

172
    def del_doc_by_ids(self, ids: List[str]) -> bool:
173
        for doc_id in ids:
174
            try:
175
                self.es_client_python.delete(index=self.index_name,
176
                                            id=doc_id,
177
                                            refresh=True)
178
            except Exception as e:
179
                logger.error(f"ES Docs Delete Error! {e}")
180

181
    def do_delete_doc(self, kb_file, **kwargs):
182
        if self.es_client_python.indices.exists(index=self.index_name):
183
            # 从向量数据库中删除索引(文档名称是Keyword)
184
            query = {
185
                "query": {
186
                    "term": {
187
                        "metadata.source.keyword": self.get_relative_source_path(kb_file.filepath)
188
                    }
189
                }
190
            }
191
            # 注意设置size,默认返回10个。
192
            search_results = self.es_client_python.search(body=query, size=50)
193
            delete_list = [hit["_id"] for hit in search_results['hits']['hits']]
194
            if len(delete_list) == 0:
195
                return None
196
            else:
197
                for doc_id in delete_list:
198
                    try:
199
                        self.es_client_python.delete(index=self.index_name,
200
                                                     id=doc_id,
201
                                                     refresh=True)
202
                    except Exception as e:
203
                        logger.error(f"ES Docs Delete Error! {e}")
204

205
            # self.db_init.delete(ids=delete_list)
206
            #self.es_client_python.indices.refresh(index=self.index_name)
207

208

209
    def do_add_doc(self, docs: List[Document], **kwargs):
210
        '''向知识库添加文件'''
211
        print(f"server.knowledge_base.kb_service.es_kb_service.do_add_doc 输入的docs参数长度为:{len(docs)}")
212
        print("*"*100)
213
        self._load_es(docs=docs, embed_model=self.embeddings_model)
214
        # 获取 id 和 source , 格式:[{"id": str, "metadata": dict}, ...]
215
        print("写入数据成功.")
216
        print("*"*100)
217

218
        if self.es_client_python.indices.exists(index=self.index_name):
219
            file_path = docs[0].metadata.get("source")
220
            query = {
221
                "query": {
222
                    "term": {
223
                        "metadata.source.keyword": file_path
224
                    },
225
                    "term": {
226
                        "_index": self.index_name
227
                    }
228
                }
229
            }
230
            # 注意设置size,默认返回10个。
231
            search_results = self.es_client_python.search(body=query, size=50)
232
            if len(search_results["hits"]["hits"]) == 0:
233
                raise ValueError("召回元素个数为0")
234
        info_docs = [{"id":hit["_id"], "metadata": hit["_source"]["metadata"]} for hit in search_results["hits"]["hits"]]
235
        return info_docs
236

237

238
    def do_clear_vs(self):
239
        """从知识库删除全部向量"""
240
        if self.es_client_python.indices.exists(index=self.kb_name):
241
            self.es_client_python.indices.delete(index=self.kb_name)
242

243

244
    def do_drop_kb(self):
245
        """删除知识库"""
246
        # self.kb_file: 知识库路径
247
        if os.path.exists(self.kb_path):
248
            shutil.rmtree(self.kb_path)
249

250

251
if __name__ == '__main__':
252
    esKBService = ESKBService("test")
253
    #esKBService.clear_vs()
254
    #esKBService.create_kb()
255
    esKBService.add_doc(KnowledgeFile(filename="README.md", knowledge_base_name="test"))
256
    print(esKBService.search_docs("如何启动api服务"))
257

258

259

260

261

262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.