consolidator
43 строки · 1.5 Кб
1import os
2import sys
3import pandas as pd
4import datetime
5from MainConsolidator import MainConsolidator
6
7class handler(object):
8@staticmethod
9def module_name():
10return 'Трубы со сторонней обработкой'
11@staticmethod
12def handle(row:pd.Series,consolidator:MainConsolidator)->pd.Series:
13try:
14row["error"] = pd.NA
15except Exception as exp:
16row["error"] = f"{exp}"
17finally:
18row["Продукт УП"]='не реализовано'
19row["Время укрупнения"]=datetime.datetime.now()
20return row
21
22def debug_handle(self,inputDf:pd.DataFrame,consolidator:MainConsolidator,module:str=None)->pd.DataFrame:
23try:
24if module is None: return inputDf.apply(lambda row: self.handle(row,consolidator), axis=1)
25else: return inputDf[inputDf["Производство2"]==module].apply(lambda row: self.handle(row,consolidator), axis=1)
26except: raise
27
28def get_compiled(self):
29code:str=f"print('{__file__}')"
30try:
31with open(__file__,encoding="utf-8",mode="r") as f:
32code=f.read()
33except Exception as exp:
34code=f"print('error: {exp}')"
35finally:
36cc = compile(code,"","exec")
37return cc
38
39if __name__=="__pipe_contractor__":
40consolidator:MainConsolidator
41record:pd.Series
42result:pd.Series
43result = handler.handle(record,consolidator)
44
45