141 lines
5.5 KiB
Python
141 lines
5.5 KiB
Python
#from __future__ import annotations
|
|
from typing import List
|
|
|
|
from sqlalchemy import BOOLEAN, Column,INTEGER,NVARCHAR, ForeignKey,Select, and_, or_
|
|
from sqlalchemy.dialects.mssql import UNIQUEIDENTIFIER, TINYINT
|
|
from sqlalchemy.orm import relationship,Session,Mapped,mapped_column
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from datetime import datetime,timedelta,time
|
|
import json
|
|
from json import JSONEncoder
|
|
#import sysjobhistory
|
|
import inn
|
|
|
|
class MyEncoder(JSONEncoder):
|
|
def default(self, obj):
|
|
if(isinstance(obj,Sysjobs) or isinstance(obj,Sysjobhistory)):
|
|
return obj.getDict()
|
|
else:
|
|
return object.__dict__
|
|
|
|
Base = declarative_base()
|
|
|
|
class Sysjobs(Base):
|
|
__tablename__ = "sysjobs"
|
|
job_id: Mapped[str] = mapped_column(UNIQUEIDENTIFIER,primary_key=True)
|
|
name=Column(NVARCHAR(128))
|
|
enabled=Column(TINYINT)
|
|
sysjobhistories: Mapped[List["Sysjobhistory"]] = relationship(back_populates="sysjob")
|
|
dataflow_jobs: Mapped[List["DataflowManagement_JobListe"]] = relationship(back_populates="sysjob")
|
|
|
|
def __iter__(self):
|
|
yield from {
|
|
"job_id": self.job_id,
|
|
"name": self.name
|
|
}.items()
|
|
|
|
def __iter__(self):
|
|
yield from {
|
|
"job_id": str(self.job_id),
|
|
"name": self.name
|
|
}.items()
|
|
def __str__(self):
|
|
return json.dumps(self.getDict())
|
|
|
|
def getDict(self):
|
|
return {'job_id': str(self.job_id), 'name': self.name}
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
def getTest(self,session: Session):
|
|
stmt = Select(Sysjobs).join(Sysjobhistory).where(Sysjobhistory.run_date>20230601).distinct()
|
|
print(stmt)
|
|
with Session(engine) as session:
|
|
row : Sysjobs
|
|
res = session.execute(stmt).all()
|
|
for row in res:
|
|
print(row.Sysjobs.name + ' ' + str(row.Sysjobhistory.run_date) + ' ' + str(row.Sysjobhistory.run_time))
|
|
|
|
def getNattensKoersel(session) -> List['Sysjobs']:
|
|
natStat=(datetime.today()-timedelta(days=1)).replace(hour=20,minute=0,second=0,microsecond=0)
|
|
resReturn: List['Sysjobs'] = list()
|
|
stmt = Select(Sysjobs,Sysjobhistory).join(DataflowManagement_JobListe).join(Sysjobhistory).where(Sysjobhistory.step_id==0).where(DataflowManagement_JobListe.Aktiv==1).where(or_(and_(Sysjobhistory.run_date>=int((natStat.strftime('%Y%m%d'))),(Sysjobhistory.run_time>=int((natStat.strftime('%H%M%S'))))),Sysjobhistory.run_date>=int((datetime.today().strftime('%Y%m%d'))))).distinct()
|
|
row : Sysjobs
|
|
res = session.execute(stmt).all()
|
|
# resReturn=[x[0] for x in res]
|
|
return res
|
|
|
|
|
|
|
|
class Sysjobhistory(Base):
|
|
__tablename__ = "sysjobhistory"
|
|
instance_id=Column(INTEGER,primary_key=True)
|
|
job_id: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"))
|
|
step_id = Column(INTEGER)
|
|
step_name=Column(NVARCHAR(128))
|
|
run_date=Column(INTEGER)
|
|
run_time=Column(INTEGER)
|
|
run_duration=Column(INTEGER)
|
|
sysjob: Mapped["Sysjobs"] = relationship(back_populates="sysjobhistories")
|
|
|
|
def __iter__(self):
|
|
yield from {
|
|
"instance_id": self.instance_id,
|
|
"job_id": str(self.job_id),
|
|
"step_id": self.step_id,
|
|
"step_name": self.step_name,
|
|
"getStartTime": str(self.getStartTime()),
|
|
"getEndTime": str(self.getEndTime())
|
|
}.items()
|
|
def __str__(self):
|
|
return json.dumps(self.getDict())
|
|
|
|
def getDict(self):
|
|
return {'instance_id': self.instance_id, 'job_id': str(self.job_id), 'step_id' : self.step_id, 'step_name': self.step_name, 'getStartTime': str(self.getStartTime()), 'getEndTime': str(self.getEndTime())}
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
def getTest(self):
|
|
engine=inn.getEngine("msdb")
|
|
# stmt = Select(Sysjobhistory)
|
|
# with Session(engine) as session:
|
|
# for row in session.execute(stmt).first():
|
|
# print(row)
|
|
def getStartTime(self) -> datetime:
|
|
resReturn: datetime
|
|
resReturn = datetime.fromisoformat(str(self.run_date)[0:4]+'-'+ str(self.run_date)[4:6]+'-'+str(self.run_date)[6:8]+' '+str(1000000+self.run_time)[1:3]+':'+str(1000000+self.run_time)[3:5]+':'+str(1000000+self.run_time)[5:7])
|
|
return resReturn
|
|
|
|
def getEndTime(self) -> datetime:
|
|
resReturn: datetime
|
|
resReturn = self.getStartTime()+timedelta(seconds=self.getRunDurationInSec())
|
|
return resReturn
|
|
|
|
|
|
def getRunDurationInSec(self) -> int:
|
|
resReturn: int
|
|
resReturn = 3600*(int(str(self.run_duration+1000000)[1:3]))+60*(int(str(self.run_duration+1000000)[3:5]))+(int(str(self.run_duration+1000000)[3:5]))
|
|
return resReturn
|
|
|
|
def getRunDurationInHourMinSec(self) -> str:
|
|
resReturn: str
|
|
resReturn=str(timedelta(seconds=self.getRunDurationInSec()))
|
|
return resReturn
|
|
|
|
def getRunHistory(self,Sysjob):
|
|
engine=inn.getEngine("msdb")
|
|
stmt = Select(Sysjobhistory).join(Sysjobhistory).filter(Sysjob)
|
|
with Session(engine) as session:
|
|
row : Sysjobs
|
|
res = session.execute(stmt).all()
|
|
print(res.__len__)
|
|
|
|
|
|
class DataflowManagement_JobListe(Base):
|
|
__tablename__ = "JobListe"
|
|
__table_args__ = { "schema": "dataflowmanagement.flw" }
|
|
JobID: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"),primary_key=True)
|
|
Aktiv=Column(BOOLEAN)
|
|
sysjob: Mapped["Sysjobs"] = relationship(back_populates="dataflow_jobs") |