consolidator

Форк
0
/
MainConsolidator.py 
341 строка · 17.7 Кб
1
import ntpath
2
import os
3
import sys
4
import pandas as pd
5
import openpyxl as excel
6
import json 
7
from MyLogger import MyLogger
8
from MdbHelper import MdbHelper
9

10
class MainConsolidator(object):
11

12
    version = "1.1"
13
    columns:list
14

15
    def __init__(self,logFile,logStream) -> None:
16
        self.productions={}
17
        self.guids={}
18
        self.positions:pd.DataFrame = None
19
        self.mlog=MyLogger(__name__,log_file=logFile,outStream=logStream)
20
        self.input_data:pd.DataFrame=None
21
        
22

23
    def load_guids(self,guidsDir:str):
24
        """Загрузка справочников из csv
25

26
        Args:
27
            guidsDir (str): Директория с csv-файлами
28
        """
29
        try:
30
            self.mlog.info(f"Загрузка справочников из {guidsDir}")
31
            files = os.listdir(guidsDir)
32
            for file in files:
33
                fullName = f"{guidsDir}\\{file}"
34
                key = file.replace(".csv","")
35
                df = pd.read_csv(fullName,encoding="windows-1251",delimiter=";")
36
                self.guids[key]=df
37
                # self.mlog.info(f"Справочник '{key}' загружен")
38
        except: raise
39

40
    def load_guids_x(self,guidsDir:str):
41
        """Загрузка справочников из xlsx
42

43
        Args:
44
            guidsDir (str): Директория с xlsx-файлами
45
        """
46
        try:
47
            self.mlog.info(f"Загрузка справочников из {guidsDir}")
48
            files = [f for f in os.listdir(guidsDir) if "~$" not in f]
49
            for file in files:
50
                fullName = f"{guidsDir}\\{file}"
51
                key = file.replace(".xlsx","")
52
                df = pd.read_excel(fullName)
53
                self.guids[key]=df
54
                # self.mlog.info(f"Справочник '{key}' загружен")
55
        except: raise
56
    def load_input_data(self,filesList:list):
57
        """Загрузка входных данных
58

59
        Args:
60
            filesList (list): Список файлов
61
        """
62
        try:
63
            dfs=[]
64
            for file in filesList:
65
                try:
66
                    fname,fextension = os.path.splitext(file)
67
                    if str(fextension).lower()==".xlsx": dfs.append(self.__load_excel(file))
68
                    if str(fextension).lower()==".mdb": dfs.append(self.__load_mdb(file))                    
69
                except Exception as exp:
70
                    self.mlog.warning(f"Ошибка загрузки файла '{file}'",exc_info=True)
71
            if dfs.__len__()>0:
72
                    self.input_data=pd.concat(dfs)
73
                    self.mlog.info(f"Входные данные загружены: {self.input_data.shape[0]} строк, {self.input_data.shape[1]} столбцов")
74
                    for col in self.columns:
75
                        if col not in self.input_data.columns.to_list(): self.input_data[col]=pd.NA
76
                    self.input_data = self.input_data[self.columns]
77
                    self.input_data = self.input_data.apply(self.level1_handler,axis=1)
78
                    self.mlog.info("1 обработчик отработал")
79
                    self.input_data.fillna("|",inplace=True)
80
                    self.input_data.replace("|",pd.NA,inplace=True)
81
        except: raise
82

83
    ############# handlers ##########################################
84
    def level1_handler(self,rec:pd.Series)->pd.Series:
85
        try:
86
            # for itm in self.level1Config: rec[itm["column"]] = self.scalar_in_guid(itm["guid"],eval(itm["query"]),itm["quid_column"])            
87
            # 1. Оперделение производства
88
            prod2 = self.scalar_in_guid("Маппинг производств",f"`Производство`=='{rec['Производство']}'","Производство2")
89
            if pd.isna(prod2): prod2="-" # bug fix отстутствие атрибута 'производство'
90
            rec["Производство2"] = prod2
91
            # 2. Определение Группы марок стали по потребительским свойствам (ГМСПС)
92
            # Внимание! требуется проработка и корретировка
93
            rec["ГМСПС"] = self.scalar_in_guid("Группы марок стали по потребительским свойствам", f"`Марка стали`=='{rec['Марка стали <Производство>']}'","Группа марок")
94
            if pd.isna(rec['Марка стали <Производство>']) or str(rec['Марка стали <Производство>'])=="-": rec["ГМСПС"]="ГУ"
95

96
            # 3. Определение продукта и Признака БЗ по НТД
97
            ntd_list = rec[["НТД качества (прокат) <Производство>","НТД качества (трубы стальные) <Производство>","НТД качества (колеса) <Производство>"]].to_list()
98
            ntd = pd.NA
99
            for n in ntd_list: 
100
                if not pd.isna(n): ntd=n
101
            rec["НТД"]=ntd
102
            rec["Первичный продукт"] = self.scalar_in_guid("Продукт по НТД", f"`Производство`=='{prod2}' and `НТД качества (прокат) <Производство>`=='{ntd}'" ,"Продукт")
103
            if pd.isna(rec["Первичный продукт"]):
104
                rec["Первичный продукт"] = self.scalar_in_guid("Продукт по НТД", f"`Производство`=='{prod2}' and `НТД качества (прокат) <Производство>`=='*'" ,"Продукт")
105
            if "прокат рулонный о" in str(rec["Полное наименование материала"]).lower() or "прокат рулонный o" in str(rec["Полное наименование материала"]).lower(): rec["Первичный продукт"] = "ПОЛОСА"
106
            if prod2=="Трубы ТБД": rec["Первичный продукт"] = "ТР Д"
107
            if prod2=="Трубы ТБД" and rec["Диаметр, мм <Производство>"]=="813": rec["Первичный продукт"] = "ТРК Д"
108
            if prod2=="Трубы ТБД" and rec["Диаметр, мм <Производство>"]=="508": rec["Первичный продукт"] = "ТРК Д508"
109
            rec["Признак БЗ"] = self.scalar_in_guid("Продукт по НТД",f"`Производство`=='{prod2}' and `НТД качества (прокат) <Производство>`=='{ntd}'","Доп признак")
110

111
            # 4. Костыль для ТРУБЫ Д ГЮ
112
            if str(rec["Первичный продукт"])=="ТРУБА Д" and str(rec["ГМСПС"])=="ГЮ": rec["ГМСПС"]="ГУ"
113

114

115
            return rec
116
        except Exception as exp:
117
            self.mlog.warning(f"level1_handler [{rec['ID в SAP ERP VMZ']}] не обработано. ({exp})",exc_info=True)
118
            return rec
119
    #################################################################
120

121
    def debug_stub(self)->str:
122
        q= "`Производство`=='{rec['Производство']}'"
123
        return  f'f"{q}"'
124

125
    def __load_excel(self,fileName:str)->pd.DataFrame:
126
        try:
127
            dfs=[]
128
            short_name=ntpath.basename(fileName)
129
            # direct pandas version (work only with openpyxl==3.0.7)
130
            xl = pd.ExcelFile(fileName,engine="openpyxl")
131
            for sheet in  xl.sheet_names:
132
                try:
133
                    print(sheet)
134
                    df = pd.read_excel(fileName,sheet_name=sheet)
135
                    df["Файл"]=short_name
136
                    df["Лист"]=sheet
137
                    dfs.append(df)
138
                    self.mlog.info(f"[{short_name}].[{sheet}] загружен")
139
                except Exception as exp:
140
                    self.mlog.warning(f"Ошибка загрузки листа '{short_name}'[{sheet}]",exc_info=True)
141
            
142
            if dfs.__len__()==0: raise Exception("Файл не содержит данных")
143
            return pd.concat(dfs)
144
        except: raise
145
    
146
    def __load_mdb(self,fileName:str)->pd.DataFrame:
147
        try:
148
            # mdb = MdbHelper(fileName,autoconnect=True)
149
            # df:pd.DataFrame = mdb.get_table_df("Справочник МТР1")
150
            # mdb.close_connection()
151
            ## issue http://vsys01775:8282/flea/pyexcelcons3/-/issues/1
152
            mdb = MdbHelper(fileName)
153
            df:pd.DataFrame = mdb.get_mdbt_table_df()    
154
            if df.empty: raise Exception("Файл не содержит данных или отсутствует таблица [Справочник МТР1]")
155
            return df
156
        except: raise
157

158
    def dump_input_data(self,fileName:str="input_data.xlsx"):
159
        try:
160
            if self.input_data is not None: 
161
                cols = {"columns":self.input_data.columns.to_list()}
162
                with open(fileName.replace(".xlsx",".json"),mode="w") as fs:
163
                    json.dump(cols, fs, indent=2, sort_keys=False, ensure_ascii=False)
164
                    fs.flush()
165
                self.input_data.to_excel(fileName,index=False)
166
        except: raise
167

168
    def find_in_guid(self,guidName,query:str)->tuple:
169
        """Первая строка выборки из справочника контрагентов
170

171
        Args:
172
            guidName: Имя справочника \n
173
            query: Текст запроса
174

175
        Example of query:  f"`Дивизион`=='{row['Дивизион']}' and `Группа клиентов либо поставщиков`=='{row['Группа клиентов']}' and `Место поставки`=='{row['Место поставки']}'" \n
176
        
177
        Returns:
178
            _type_: кортеж (первая строка, реальное количество строк)
179
        """        
180
        try:
181

182
            gdf:pd.DataFrame = self.guids.get(guidName,None)
183
            if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")
184
            res = gdf.query(query)
185
            if res.empty: return tuple([None,0])
186
            return tuple([res.iloc[0],res.shape[0]])
187
        except: raise       
188

189
    def find_in_guid2(self,guidName,query:str,columnList:list)->pd.Series:
190
        """Первая строка выборки из справочника контрагентов
191

192
        Args:
193
            guidName: Имя справочника \n
194
            query: Текст запроса
195

196
        Example of query:  f"`Дивизион`=='{row['Дивизион']}' and `Группа клиентов либо поставщиков`=='{row['Группа клиентов']}' and `Место поставки`=='{row['Место поставки']}'" \n
197
        
198
        Returns:
199
            _type_: кортеж (первая строка, реальное количество строк)
200
        """        
201
        try:
202
            gdf:pd.DataFrame = self.guids.get(guidName,None)
203
            if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")
204
            res = gdf.query(query)
205
            if res.empty:
206
                data_={}
207
                for col in columnList: data_[col]=pd.NA 
208
                return pd.Series(data=data_)
209
            return res.iloc[0][columnList]
210
        except: raise
211

212
    def scalar_in_guid(self,guidName,query:str,column:str):
213
        """Первая строка выборки из справочника контрагентов (единичное значение)
214

215
        Args:
216
            guidName: Имя справочника \n
217
            query: Текст запроса
218

219
        Example of query:  f"`Дивизион`=='{row['Дивизион']}' and `Группа клиентов либо поставщиков`=='{row['Группа клиентов']}' and `Место поставки`=='{row['Место поставки']}'" \n
220
        
221
        Returns:
222
            _type_: Значение
223
        """        
224
        try:
225
            gdf:pd.DataFrame = self.guids.get(guidName,None)
226
            if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")
227
            res = gdf.query(query)
228
            if res.empty: 
229
                return pd.NA
230
            return res.iloc[0][column]
231
        except: raise
232

233
    def float_converter(self,row:pd.Series,colName:str)->pd.Series:
234
        try:
235
            row[colName]=float(str(row[colName]).replace(",","."))
236
            return row
237
        except:
238
            row[colName]=0
239
            return row
240

241
    def nearest_in_guid(self,guidName:str,query:str,nearestColumn:str,nearestValue:float,valueColumn:str):
242
        """Получить ближайшее значение из справочника 
243

244
        Args:
245
            guidName: Имя справочника \n
246
            query: Текст запроса \n
247
            nearestColumn (str): Имя колонки приближаемых значений \n
248
            nearestValue (float): Приближаемое входное значение \n
249
            valueColumn (str): Имя колонки с результатом \n
250

251
        Raises:
252
            Exception: _description_
253

254
        Returns:
255
            _type_: _description_
256
        """
257
        try:
258
            gdf:pd.DataFrame = self.guids.get(guidName,None)
259
            if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")
260
            df = gdf.query(query)
261
            if df.empty: return pd.NA
262
            df = df.apply(lambda row: self.float_converter(row,nearestColumn),axis=1)
263
            df_res = df.iloc[(df[nearestColumn]-nearestValue).abs().argsort()[:1]]
264
            return df_res[valueColumn].values[0]
265
        except: raise
266

267
    ###############    General methods    #########################
268

269
    def strength_by_steel(self,steel_mark:str):
270
        """П0110_Кл_прочности_по_марке_стали
271

272
        Args:
273
            steel_mark (str): Марка стали
274
        """
275
        try:
276
            return self.scalar_in_guid("Класс прочности по марке стали",f"`Марка стали`=='{steel_mark}'","Класс прочности")
277
        except: raise
278

279
    def strength_mapping(self,strength_class:str,steel_mark:str):
280
        """П0120_Маппинг_Кл_прочности
281
        Args:
282
            strength_class (str): _description_
283
            steel_mark (str): _description_
284
        """
285
        try:
286
            
287
            strength_class_ = self.scalar_in_guid("Маппинг класса прочности",f"`ИсX Класс прочности`=='{str(strength_class)}'","Результ Класс прочности")
288
            steel_mark_     = self.scalar_in_guid("Маппинг класса прочности",f"`ИсX Класс прочности`=='{str(steel_mark)}'","Результ Класс прочности")
289
            if pd.isna(strength_class_): strength_class_=strength_class
290
            if pd.isna(steel_mark_): steel_mark_=steel_mark
291
            return (strength_class_,steel_mark_)
292
        except: raise
293

294
    def strength_led(self,strength_class:str):
295
        """ П0121_Приведение_Кл_прочности """
296
        try:
297
            result:int=0
298
            strength_class_int:int = 1000
299
            try: 
300
                strength_class_int=int(strength_class[1:])
301
            except: strength_class_int=1000
302

303
            if strength_class_int<100: result=65
304
            if strength_class_int<65: result=60
305
            if strength_class_int<60: result=55
306
            if strength_class_int<55: result=52
307
            if strength_class_int==55 or result==55: result=56
308
            return f"К{result}"
309
        except: raise
310

311
    def get_gmsps(self,steel_mark:str,product:str):
312
        """ П0130_Марка_стали_по_Гр_свойств (ГМСПС) """
313
        try:
314
            if pd.isna(steel_mark): steel_mark="+"
315
            result = self.scalar_in_guid("Группы марок стали по потребительским свойствам", f"`Марка стали`=='{steel_mark}'","Группа марок")
316
            if isinstance(steel_mark,int):
317
                print("steel is digit")
318
                result = self.scalar_in_guid("Группы марок стали по потребительским свойствам", f"`Марка стали`=={steel_mark}","Группа марок")
319
            if pd.isna(steel_mark) or str(steel_mark)=="-": result="ГУ"
320
            if pd.notna(result) and product=="ТРУБА Д" and result=="ГЮ": result="ГУ"
321
            return result
322
        except: raise
323

324
    def strength_group_by_steel(self,strength_class:str,steel_mark:str):
325
        """ П0111_Группа_по_классу_прочности_для_труб """
326
        try:
327
            result1 = self.scalar_in_guid("Группа по классу прочности для труб", f"`Марка стали`=='{strength_class}'","Группа прочности")
328
            result2 = self.scalar_in_guid("Группа по классу прочности для труб", f"`Марка стали`=='{steel_mark}'","Группа прочности")
329
            if pd.isna(result1) and pd.isna(result2): return "ПD"
330
            else:
331
                if pd.isna(result2) : return result1
332
                else: return result2
333
        except: raise
334
        
335
    def carving_type(self,carving:str):
336
        """ П0140_Резьба_типовая """
337
        try:
338
            carving_type = self.scalar_in_guid("Резьба",f"`Исх резьба`=='{carving}'","Результ Резьба")
339
            if pd.isna(carving) or pd.isna(carving_type): carving_type="БР"
340
            return carving_type
341
        except: raise

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.