consolidator
/
MainConsolidator.py
341 строка · 17.7 Кб
1import ntpath2import os3import sys4import pandas as pd5import openpyxl as excel6import json7from MyLogger import MyLogger8from MdbHelper import MdbHelper9
10class MainConsolidator(object):11
12version = "1.1"13columns:list14
15def __init__(self,logFile,logStream) -> None:16self.productions={}17self.guids={}18self.positions:pd.DataFrame = None19self.mlog=MyLogger(__name__,log_file=logFile,outStream=logStream)20self.input_data:pd.DataFrame=None21
22
23def load_guids(self,guidsDir:str):24"""Загрузка справочников из csv25
26Args:
27guidsDir (str): Директория с csv-файлами
28"""
29try:30self.mlog.info(f"Загрузка справочников из {guidsDir}")31files = os.listdir(guidsDir)32for file in files:33fullName = f"{guidsDir}\\{file}"34key = file.replace(".csv","")35df = pd.read_csv(fullName,encoding="windows-1251",delimiter=";")36self.guids[key]=df37# self.mlog.info(f"Справочник '{key}' загружен")38except: raise39
40def load_guids_x(self,guidsDir:str):41"""Загрузка справочников из xlsx42
43Args:
44guidsDir (str): Директория с xlsx-файлами
45"""
46try:47self.mlog.info(f"Загрузка справочников из {guidsDir}")48files = [f for f in os.listdir(guidsDir) if "~$" not in f]49for file in files:50fullName = f"{guidsDir}\\{file}"51key = file.replace(".xlsx","")52df = pd.read_excel(fullName)53self.guids[key]=df54# self.mlog.info(f"Справочник '{key}' загружен")55except: raise56def load_input_data(self,filesList:list):57"""Загрузка входных данных58
59Args:
60filesList (list): Список файлов
61"""
62try:63dfs=[]64for file in filesList:65try:66fname,fextension = os.path.splitext(file)67if str(fextension).lower()==".xlsx": dfs.append(self.__load_excel(file))68if str(fextension).lower()==".mdb": dfs.append(self.__load_mdb(file))69except Exception as exp:70self.mlog.warning(f"Ошибка загрузки файла '{file}'",exc_info=True)71if dfs.__len__()>0:72self.input_data=pd.concat(dfs)73self.mlog.info(f"Входные данные загружены: {self.input_data.shape[0]} строк, {self.input_data.shape[1]} столбцов")74for col in self.columns:75if col not in self.input_data.columns.to_list(): self.input_data[col]=pd.NA76self.input_data = self.input_data[self.columns]77self.input_data = self.input_data.apply(self.level1_handler,axis=1)78self.mlog.info("1 обработчик отработал")79self.input_data.fillna("|",inplace=True)80self.input_data.replace("|",pd.NA,inplace=True)81except: raise82
83############# handlers ##########################################84def level1_handler(self,rec:pd.Series)->pd.Series:85try:86# for itm in self.level1Config: rec[itm["column"]] = self.scalar_in_guid(itm["guid"],eval(itm["query"]),itm["quid_column"])87# 1. Оперделение производства88prod2 = self.scalar_in_guid("Маппинг производств",f"`Производство`=='{rec['Производство']}'","Производство2")89if pd.isna(prod2): prod2="-" # bug fix отстутствие атрибута 'производство'90rec["Производство2"] = prod291# 2. Определение Группы марок стали по потребительским свойствам (ГМСПС)92# Внимание! требуется проработка и корретировка93rec["ГМСПС"] = self.scalar_in_guid("Группы марок стали по потребительским свойствам", f"`Марка стали`=='{rec['Марка стали <Производство>']}'","Группа марок")94if pd.isna(rec['Марка стали <Производство>']) or str(rec['Марка стали <Производство>'])=="-": rec["ГМСПС"]="ГУ"95
96# 3. Определение продукта и Признака БЗ по НТД97ntd_list = rec[["НТД качества (прокат) <Производство>","НТД качества (трубы стальные) <Производство>","НТД качества (колеса) <Производство>"]].to_list()98ntd = pd.NA99for n in ntd_list:100if not pd.isna(n): ntd=n101rec["НТД"]=ntd102rec["Первичный продукт"] = self.scalar_in_guid("Продукт по НТД", f"`Производство`=='{prod2}' and `НТД качества (прокат) <Производство>`=='{ntd}'" ,"Продукт")103if pd.isna(rec["Первичный продукт"]):104rec["Первичный продукт"] = self.scalar_in_guid("Продукт по НТД", f"`Производство`=='{prod2}' and `НТД качества (прокат) <Производство>`=='*'" ,"Продукт")105if "прокат рулонный о" in str(rec["Полное наименование материала"]).lower() or "прокат рулонный o" in str(rec["Полное наименование материала"]).lower(): rec["Первичный продукт"] = "ПОЛОСА"106if prod2=="Трубы ТБД": rec["Первичный продукт"] = "ТР Д"107if prod2=="Трубы ТБД" and rec["Диаметр, мм <Производство>"]=="813": rec["Первичный продукт"] = "ТРК Д"108if prod2=="Трубы ТБД" and rec["Диаметр, мм <Производство>"]=="508": rec["Первичный продукт"] = "ТРК Д508"109rec["Признак БЗ"] = self.scalar_in_guid("Продукт по НТД",f"`Производство`=='{prod2}' and `НТД качества (прокат) <Производство>`=='{ntd}'","Доп признак")110
111# 4. Костыль для ТРУБЫ Д ГЮ112if str(rec["Первичный продукт"])=="ТРУБА Д" and str(rec["ГМСПС"])=="ГЮ": rec["ГМСПС"]="ГУ"113
114
115return rec116except Exception as exp:117self.mlog.warning(f"level1_handler [{rec['ID в SAP ERP VMZ']}] не обработано. ({exp})",exc_info=True)118return rec119#################################################################120
121def debug_stub(self)->str:122q= "`Производство`=='{rec['Производство']}'"123return f'f"{q}"'124
125def __load_excel(self,fileName:str)->pd.DataFrame:126try:127dfs=[]128short_name=ntpath.basename(fileName)129# direct pandas version (work only with openpyxl==3.0.7)130xl = pd.ExcelFile(fileName,engine="openpyxl")131for sheet in xl.sheet_names:132try:133print(sheet)134df = pd.read_excel(fileName,sheet_name=sheet)135df["Файл"]=short_name136df["Лист"]=sheet137dfs.append(df)138self.mlog.info(f"[{short_name}].[{sheet}] загружен")139except Exception as exp:140self.mlog.warning(f"Ошибка загрузки листа '{short_name}'[{sheet}]",exc_info=True)141
142if dfs.__len__()==0: raise Exception("Файл не содержит данных")143return pd.concat(dfs)144except: raise145
146def __load_mdb(self,fileName:str)->pd.DataFrame:147try:148# mdb = MdbHelper(fileName,autoconnect=True)149# df:pd.DataFrame = mdb.get_table_df("Справочник МТР1")150# mdb.close_connection()151## issue http://vsys01775:8282/flea/pyexcelcons3/-/issues/1152mdb = MdbHelper(fileName)153df:pd.DataFrame = mdb.get_mdbt_table_df()154if df.empty: raise Exception("Файл не содержит данных или отсутствует таблица [Справочник МТР1]")155return df156except: raise157
158def dump_input_data(self,fileName:str="input_data.xlsx"):159try:160if self.input_data is not None:161cols = {"columns":self.input_data.columns.to_list()}162with open(fileName.replace(".xlsx",".json"),mode="w") as fs:163json.dump(cols, fs, indent=2, sort_keys=False, ensure_ascii=False)164fs.flush()165self.input_data.to_excel(fileName,index=False)166except: raise167
168def find_in_guid(self,guidName,query:str)->tuple:169"""Первая строка выборки из справочника контрагентов170
171Args:
172guidName: Имя справочника \n
173query: Текст запроса
174
175Example of query: f"`Дивизион`=='{row['Дивизион']}' and `Группа клиентов либо поставщиков`=='{row['Группа клиентов']}' and `Место поставки`=='{row['Место поставки']}'" \n
176
177Returns:
178_type_: кортеж (первая строка, реальное количество строк)
179"""180try:181
182gdf:pd.DataFrame = self.guids.get(guidName,None)183if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")184res = gdf.query(query)185if res.empty: return tuple([None,0])186return tuple([res.iloc[0],res.shape[0]])187except: raise188
189def find_in_guid2(self,guidName,query:str,columnList:list)->pd.Series:190"""Первая строка выборки из справочника контрагентов191
192Args:
193guidName: Имя справочника \n
194query: Текст запроса
195
196Example of query: f"`Дивизион`=='{row['Дивизион']}' and `Группа клиентов либо поставщиков`=='{row['Группа клиентов']}' and `Место поставки`=='{row['Место поставки']}'" \n
197
198Returns:
199_type_: кортеж (первая строка, реальное количество строк)
200"""201try:202gdf:pd.DataFrame = self.guids.get(guidName,None)203if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")204res = gdf.query(query)205if res.empty:206data_={}207for col in columnList: data_[col]=pd.NA208return pd.Series(data=data_)209return res.iloc[0][columnList]210except: raise211
212def scalar_in_guid(self,guidName,query:str,column:str):213"""Первая строка выборки из справочника контрагентов (единичное значение)214
215Args:
216guidName: Имя справочника \n
217query: Текст запроса
218
219Example of query: f"`Дивизион`=='{row['Дивизион']}' and `Группа клиентов либо поставщиков`=='{row['Группа клиентов']}' and `Место поставки`=='{row['Место поставки']}'" \n
220
221Returns:
222_type_: Значение
223"""224try:225gdf:pd.DataFrame = self.guids.get(guidName,None)226if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")227res = gdf.query(query)228if res.empty:229return pd.NA230return res.iloc[0][column]231except: raise232
233def float_converter(self,row:pd.Series,colName:str)->pd.Series:234try:235row[colName]=float(str(row[colName]).replace(",","."))236return row237except:238row[colName]=0239return row240
241def nearest_in_guid(self,guidName:str,query:str,nearestColumn:str,nearestValue:float,valueColumn:str):242"""Получить ближайшее значение из справочника243
244Args:
245guidName: Имя справочника \n
246query: Текст запроса \n
247nearestColumn (str): Имя колонки приближаемых значений \n
248nearestValue (float): Приближаемое входное значение \n
249valueColumn (str): Имя колонки с результатом \n
250
251Raises:
252Exception: _description_
253
254Returns:
255_type_: _description_
256"""
257try:258gdf:pd.DataFrame = self.guids.get(guidName,None)259if gdf is None: raise Exception(f"Справочника '{guidName}' не существует")260df = gdf.query(query)261if df.empty: return pd.NA262df = df.apply(lambda row: self.float_converter(row,nearestColumn),axis=1)263df_res = df.iloc[(df[nearestColumn]-nearestValue).abs().argsort()[:1]]264return df_res[valueColumn].values[0]265except: raise266
267############### General methods #########################268
269def strength_by_steel(self,steel_mark:str):270"""П0110_Кл_прочности_по_марке_стали271
272Args:
273steel_mark (str): Марка стали
274"""
275try:276return self.scalar_in_guid("Класс прочности по марке стали",f"`Марка стали`=='{steel_mark}'","Класс прочности")277except: raise278
279def strength_mapping(self,strength_class:str,steel_mark:str):280"""П0120_Маппинг_Кл_прочности281Args:
282strength_class (str): _description_
283steel_mark (str): _description_
284"""
285try:286
287strength_class_ = self.scalar_in_guid("Маппинг класса прочности",f"`ИсX Класс прочности`=='{str(strength_class)}'","Результ Класс прочности")288steel_mark_ = self.scalar_in_guid("Маппинг класса прочности",f"`ИсX Класс прочности`=='{str(steel_mark)}'","Результ Класс прочности")289if pd.isna(strength_class_): strength_class_=strength_class290if pd.isna(steel_mark_): steel_mark_=steel_mark291return (strength_class_,steel_mark_)292except: raise293
294def strength_led(self,strength_class:str):295""" П0121_Приведение_Кл_прочности """296try:297result:int=0298strength_class_int:int = 1000299try:300strength_class_int=int(strength_class[1:])301except: strength_class_int=1000302
303if strength_class_int<100: result=65304if strength_class_int<65: result=60305if strength_class_int<60: result=55306if strength_class_int<55: result=52307if strength_class_int==55 or result==55: result=56308return f"К{result}"309except: raise310
311def get_gmsps(self,steel_mark:str,product:str):312""" П0130_Марка_стали_по_Гр_свойств (ГМСПС) """313try:314if pd.isna(steel_mark): steel_mark="+"315result = self.scalar_in_guid("Группы марок стали по потребительским свойствам", f"`Марка стали`=='{steel_mark}'","Группа марок")316if isinstance(steel_mark,int):317print("steel is digit")318result = self.scalar_in_guid("Группы марок стали по потребительским свойствам", f"`Марка стали`=={steel_mark}","Группа марок")319if pd.isna(steel_mark) or str(steel_mark)=="-": result="ГУ"320if pd.notna(result) and product=="ТРУБА Д" and result=="ГЮ": result="ГУ"321return result322except: raise323
324def strength_group_by_steel(self,strength_class:str,steel_mark:str):325""" П0111_Группа_по_классу_прочности_для_труб """326try:327result1 = self.scalar_in_guid("Группа по классу прочности для труб", f"`Марка стали`=='{strength_class}'","Группа прочности")328result2 = self.scalar_in_guid("Группа по классу прочности для труб", f"`Марка стали`=='{steel_mark}'","Группа прочности")329if pd.isna(result1) and pd.isna(result2): return "ПD"330else:331if pd.isna(result2) : return result1332else: return result2333except: raise334
335def carving_type(self,carving:str):336""" П0140_Резьба_типовая """337try:338carving_type = self.scalar_in_guid("Резьба",f"`Исх резьба`=='{carving}'","Результ Резьба")339if pd.isna(carving) or pd.isna(carving_type): carving_type="БР"340return carving_type341except: raise