FlowAnalyser/sysjobs.py
Dennis Kerschus 7c6b05710c t
2023-06-13 13:19:05 +02:00

141 lines
5.5 KiB
Python

#from __future__ import annotations
from typing import List
from sqlalchemy import BOOLEAN, Column,INTEGER,NVARCHAR, ForeignKey,Select, and_, or_
from sqlalchemy.dialects.mssql import UNIQUEIDENTIFIER, TINYINT
from sqlalchemy.orm import relationship,Session,Mapped,mapped_column
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime,timedelta,time
import json
from json import JSONEncoder
#import sysjobhistory
import inn
class MyEncoder(JSONEncoder):
def default(self, obj):
if(isinstance(obj,Sysjobs) or isinstance(obj,Sysjobhistory)):
return obj.getDict()
else:
return object.__dict__
Base = declarative_base()
class Sysjobs(Base):
__tablename__ = "sysjobs"
job_id: Mapped[str] = mapped_column(UNIQUEIDENTIFIER,primary_key=True)
name=Column(NVARCHAR(128))
enabled=Column(TINYINT)
sysjobhistories: Mapped[List["Sysjobhistory"]] = relationship(back_populates="sysjob")
dataflow_jobs: Mapped[List["DataflowManagement_JobListe"]] = relationship(back_populates="sysjob")
def __iter__(self):
yield from {
"job_id": self.job_id,
"name": self.name
}.items()
def __iter__(self):
yield from {
"job_id": str(self.job_id),
"name": self.name
}.items()
def __str__(self):
return json.dumps(self.getDict())
def getDict(self):
return {'job_id': str(self.job_id), 'name': self.name}
def __repr__(self):
return self.__str__()
def getTest(self,session: Session):
stmt = Select(Sysjobs).join(Sysjobhistory).where(Sysjobhistory.run_date>20230601).distinct()
print(stmt)
with Session(engine) as session:
row : Sysjobs
res = session.execute(stmt).all()
for row in res:
print(row.Sysjobs.name + ' ' + str(row.Sysjobhistory.run_date) + ' ' + str(row.Sysjobhistory.run_time))
def getNattensKoersel(session) -> List['Sysjobs']:
natStat=(datetime.today()-timedelta(days=1)).replace(hour=20,minute=0,second=0,microsecond=0)
resReturn: List['Sysjobs'] = list()
stmt = Select(Sysjobs,Sysjobhistory).join(DataflowManagement_JobListe).join(Sysjobhistory).where(Sysjobhistory.step_id==0).where(DataflowManagement_JobListe.Aktiv==1).where(or_(and_(Sysjobhistory.run_date>=int((natStat.strftime('%Y%m%d'))),(Sysjobhistory.run_time>=int((natStat.strftime('%H%M%S'))))),Sysjobhistory.run_date>int((datetime.today().strftime('%Y%m%d'))))).distinct()
row : Sysjobs
res = session.execute(stmt).all()
# resReturn=[x[0] for x in res]
return res
class Sysjobhistory(Base):
__tablename__ = "sysjobhistory"
instance_id=Column(INTEGER,primary_key=True)
job_id: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"))
step_id = Column(INTEGER)
step_name=Column(NVARCHAR(128))
run_date=Column(INTEGER)
run_time=Column(INTEGER)
run_duration=Column(INTEGER)
sysjob: Mapped["Sysjobs"] = relationship(back_populates="sysjobhistories")
def __iter__(self):
yield from {
"instance_id": self.instance_id,
"job_id": str(self.job_id),
"step_id": self.step_id,
"step_name": self.step_name,
"getStartTime": str(self.getStartTime()),
"getEndTime": str(self.getEndTime())
}.items()
def __str__(self):
return json.dumps(self.getDict())
def getDict(self):
return {'instance_id': self.instance_id, 'job_id': str(self.job_id), 'step_id' : self.step_id, 'step_name': self.step_name, 'getStartTime': str(self.getStartTime()), 'getEndTime': str(self.getEndTime())}
def __repr__(self):
return self.__str__()
def getTest(self):
engine=inn.getEngine("msdb")
# stmt = Select(Sysjobhistory)
# with Session(engine) as session:
# for row in session.execute(stmt).first():
# print(row)
def getStartTime(self) -> datetime:
resReturn: datetime
resReturn = datetime.fromisoformat(str(self.run_date)[0:4]+'-'+ str(self.run_date)[4:6]+'-'+str(self.run_date)[6:8]+' '+str(1000000+self.run_time)[1:3]+':'+str(1000000+self.run_time)[3:5]+':'+str(1000000+self.run_time)[5:7])
return resReturn
def getEndTime(self) -> datetime:
resReturn: datetime
resReturn = self.getStartTime()+timedelta(seconds=self.getRunDurationInSec())
return resReturn
def getRunDurationInSec(self) -> int:
resReturn: int
resReturn = 3600*(int(str(self.run_duration+1000000)[1:3]))+60*(int(str(self.run_duration+1000000)[3:5]))+(int(str(self.run_duration+1000000)[3:5]))
return resReturn
def getRunDurationInHourMinSec(self) -> str:
resReturn: str
resReturn=str(timedelta(seconds=self.getRunDurationInSec()))
return resReturn
def getRunHistory(self,Sysjob):
engine=inn.getEngine("msdb")
stmt = Select(Sysjobhistory).join(Sysjobhistory).filter(Sysjob)
with Session(engine) as session:
row : Sysjobs
res = session.execute(stmt).all()
print(res.__len__)
class DataflowManagement_JobListe(Base):
__tablename__ = "JobListe"
__table_args__ = { "schema": "dataflowmanagement.flw" }
JobID: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"),primary_key=True)
Aktiv=Column(BOOLEAN)
sysjob: Mapped["Sysjobs"] = relationship(back_populates="dataflow_jobs")