274 lines
12 KiB
Python
274 lines
12 KiB
Python
#from __future__ import annotations
|
||
from typing import List
|
||
|
||
from sqlalchemy import BOOLEAN, Column,INTEGER,NVARCHAR, ForeignKey,Select, and_, or_,DateTime,text,VARCHAR
|
||
from sqlalchemy.dialects.mssql import UNIQUEIDENTIFIER, TINYINT
|
||
from sqlalchemy.orm import relationship,Session,Mapped,mapped_column,contains_eager
|
||
from sqlalchemy.ext.declarative import declarative_base
|
||
from datetime import datetime,timedelta,time
|
||
import json
|
||
from json import JSONEncoder
|
||
#import sysjobhistory
|
||
import inn
|
||
|
||
class MyEncoder(JSONEncoder):
|
||
def default(self, obj):
|
||
if(isinstance(obj,Sysjobs) or isinstance(obj,Sysjobhistory)):
|
||
return obj.getDict()
|
||
else:
|
||
return object.__dict__
|
||
|
||
Base = declarative_base()
|
||
|
||
class Sysjobs(Base):
|
||
__tablename__ = "sysjobs"
|
||
job_id: Mapped[str] = mapped_column(UNIQUEIDENTIFIER,primary_key=True)
|
||
name=Column(NVARCHAR(128))
|
||
enabled=Column(TINYINT)
|
||
sysjobhistories: Mapped[List["Sysjobhistory"]] = relationship(back_populates="sysjob")
|
||
dataflow_jobs: Mapped[List["DataflowManagement_JobListe"]] = relationship(back_populates="sysjob")
|
||
sysjobsteps: Mapped[List["msdb_sysjobsteps"]] = relationship(back_populates="sysjob")
|
||
|
||
parents: Mapped[List["DataflowManagement_JobAfhaengighed"]] =relationship(back_populates="child", foreign_keys="DataflowManagement_JobAfhaengighed.ChildJobID")
|
||
children: Mapped[List["DataflowManagement_JobAfhaengighed"]] = relationship(back_populates="parent", foreign_keys="DataflowManagement_JobAfhaengighed.ParentJobID")
|
||
|
||
def __iter__(self):
|
||
yield from {
|
||
"job_id": self.job_id,
|
||
"name": self.name
|
||
}.items()
|
||
|
||
def __iter__(self):
|
||
yield from {
|
||
"job_id": str(self.job_id),
|
||
"name": self.name
|
||
}.items()
|
||
def __str__(self):
|
||
return json.dumps(self.getDict())
|
||
|
||
def getDict(self):
|
||
return {'job_id': str(self.job_id), 'name': self.name}
|
||
|
||
def __repr__(self):
|
||
return self.__str__()
|
||
|
||
def getTest(self,session: Session):
|
||
stmt = Select(Sysjobs).join(Sysjobhistory).where(Sysjobhistory.run_date>20230601).distinct()
|
||
print(stmt)
|
||
with Session(engine) as session:
|
||
row : Sysjobs
|
||
res = session.execute(stmt).all()
|
||
for row in res:
|
||
print(row.Sysjobs.name + ' ' + str(row.Sysjobhistory.run_date) + ' ' + str(row.Sysjobhistory.run_time))
|
||
|
||
def getMedianDag(self,stepID: int,antalDage: int,ugeDag: int=0):
|
||
session=Session.object_session(self)
|
||
sqlStr='''DECLARE @dage int = :antalDage
|
||
DECLARE @job VARCHAR(200) = :selfName
|
||
DECLARE @ugeDag INT = :ugeDag
|
||
DECLARE @stepID INT = :stepID
|
||
;WITH ctedateconversion AS(
|
||
SELECT PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY run_datetime) OVER() AS MedianRun_datetime
|
||
, [t].[run_datetime]
|
||
, PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY [t].[run_duration]) OVER() AS run_duration
|
||
FROM
|
||
(
|
||
SELECT job_name, run_datetime, run_duration
|
||
from
|
||
(
|
||
SELECT TOP (@dage)
|
||
j.name as job_name,
|
||
run_datetime = CONVERT(DATETIME, '1970-01-01') +
|
||
(run_time * 9 + run_time % 10000 * 6 + run_time % 100 * 10) / 216e4,
|
||
run_duration = (run_duration/10000*3600 + (run_duration/100)%100*60 + run_duration%100)
|
||
from msdb..sysjobhistory h
|
||
inner join msdb..sysjobs j
|
||
ON h.job_id = j.job_id
|
||
WHERE
|
||
[j].[name] LIKE @job AND [h].[step_id]=@stepID
|
||
'''
|
||
if(ugeDag!=0):
|
||
sqlStr+=''' AND DATEPART(WEEKDAY,CONVERT(DATETIME, RTRIM(run_date))
|
||
+ (run_time * 9 + run_time % 10000 * 6 + run_time % 100 * 10) / 216e4)=@ugeDag --1 = mandag
|
||
'''
|
||
sqlStr+=''' ORDER BY [h].[run_date] DESC
|
||
) t
|
||
) t
|
||
|
||
)
|
||
SELECT [ctedateconversion].[MedianRun_datetime] AS MedianRun_datetime_SpecificDate,DATEADD(SECOND,[ctedateconversion].[run_duration],[ctedateconversion].[MedianRun_datetime])
|
||
FROM [ctedateconversion]
|
||
WHERE [ctedateconversion].[MedianRun_datetime] = [ctedateconversion].[run_datetime]
|
||
GROUP BY [ctedateconversion].[MedianRun_datetime],
|
||
[ctedateconversion].[run_duration]'''
|
||
stmt=text(sqlStr).params(antalDage=antalDage,selfName=self.name,ugeDag=ugeDag,stepID=stepID)
|
||
res=session.execute(stmt).all()
|
||
resResult=[]
|
||
if(len(res)>0):
|
||
resResult=list(res[0])
|
||
if(resResult[0]<datetime(1970, 1, 1, 20, 0,0) and self.name!='BI - Flow - Batch Start – Daglig kl. 20.00'):
|
||
resResult[0]+= timedelta(days=1)
|
||
if(resResult[1]<datetime(1970, 1, 1, 20, 0,0) and self.name!='BI - Flow - Batch Start – Daglig kl. 20.00'):
|
||
resResult[1]+= timedelta(days=1)
|
||
return resResult
|
||
|
||
|
||
def printParent(self, sysjobs:List['Sysjobs']):
|
||
resReturn=''
|
||
if(self.name=='BI - Flow - Batch Start – Daglig kl. 20.00'): #or self.name=='BI - Flow - Batch Slut – Daglig kl. 20.00'):
|
||
return ''
|
||
for parent in self.parents:
|
||
if(any(parent.parent in sysjob for sysjob in sysjobs)):
|
||
if(len(resReturn)>0):
|
||
resReturn+=','
|
||
resReturn+=parent.parent.name
|
||
return resReturn
|
||
|
||
def getNattensKoersel(session) -> List['Sysjobs']:
|
||
natStat=(datetime.today()-timedelta(days=1)).replace(hour=20,minute=0,second=0,microsecond=0)
|
||
resReturn: List['Sysjobs'] = list()
|
||
stmt = Select(Sysjobs).join(DataflowManagement_JobListe).join(Sysjobhistory).join(DataflowManagement_JobsForExecution).where(Sysjobhistory.step_id==0).where(DataflowManagement_JobListe.Aktiv==1).where(or_(and_(Sysjobhistory.run_date>=int((natStat.strftime('%Y%m%d'))),(Sysjobhistory.run_time>=int((natStat.strftime('%H%M%S'))))),Sysjobhistory.run_date>=int((datetime.today().strftime('%Y%m%d'))))).distinct()
|
||
row : Sysjobs
|
||
stmt.options(contains_eager(Sysjobs.sysjobhistories), contains_eager(Sysjobs.parents), contains_eager(DataflowManagement_JobAfhaengighed.parent))
|
||
res = session.execute(stmt).all()
|
||
return res
|
||
|
||
def getSysjob(session, sysjobName: str, fullName:bool=True,historikDage: int=0):
|
||
resReturn=None
|
||
natStat=(datetime.today()-timedelta(days=historikDage)).replace(hour=20,minute=0,second=0,microsecond=0)
|
||
if(sysjobName!=None):
|
||
stmt=Select(Sysjobs)
|
||
if(historikDage>0):
|
||
stmt=stmt.join(Sysjobhistory).where(or_(and_(Sysjobhistory.run_date>=int((natStat.strftime('%Y%m%d'))),(Sysjobhistory.run_time>=int((natStat.strftime('%H%M%S'))))),Sysjobhistory.run_date>=int((datetime.today().strftime('%Y%m%d')))))
|
||
if(fullName==False):
|
||
stmt = stmt.where(Sysjobs.name.like(sysjobName))
|
||
else:
|
||
stmt = stmt.where(Sysjobs.name==sysjobName)
|
||
if(historikDage>0):
|
||
stmt=stmt.options(contains_eager(Sysjobs.sysjobhistories))
|
||
|
||
try:
|
||
resReturn=session.execute(stmt).unique().all()
|
||
except:
|
||
session.rollback()
|
||
return resReturn
|
||
|
||
def getSmallestWaitOfParent(self):
|
||
return
|
||
|
||
class Sysjobhistory(Base):
|
||
__tablename__ = "sysjobhistory"
|
||
instance_id=Column(INTEGER,primary_key=True)
|
||
job_id: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"))
|
||
step_id = Column(INTEGER)
|
||
step_name=Column(NVARCHAR(128))
|
||
run_date=Column(INTEGER)
|
||
run_time=Column(INTEGER)
|
||
run_duration=Column(INTEGER)
|
||
sysjob: Mapped["Sysjobs"] = relationship(back_populates="sysjobhistories")
|
||
|
||
def __iter__(self):
|
||
yield from {
|
||
"instance_id": self.instance_id,
|
||
"job_id": str(self.job_id),
|
||
"step_id": self.step_id,
|
||
"step_name": self.step_name,
|
||
"getStartTime": str(self.getStartTime()),
|
||
"getEndTime": str(self.getEndTime())
|
||
}.items()
|
||
def __str__(self):
|
||
return json.dumps(self.getDict())
|
||
|
||
def getDict(self):
|
||
return {'instance_id': self.instance_id, 'job_id': str(self.job_id), 'step_id' : self.step_id, 'step_name': self.step_name, 'getStartTime': str(self.getStartTime()), 'getEndTime': str(self.getEndTime())}
|
||
|
||
def __repr__(self):
|
||
return self.__str__()
|
||
|
||
def getTest(self):
|
||
engine=inn.getEngine("msdb")
|
||
# stmt = Select(Sysjobhistory)
|
||
# with Session(engine) as session:
|
||
# for row in session.execute(stmt).first():
|
||
# print(row)
|
||
def getStartTime(self) -> datetime:
|
||
resReturn: datetime
|
||
resReturn = datetime.fromisoformat(str(self.run_date)[0:4]+'-'+ str(self.run_date)[4:6]+'-'+str(self.run_date)[6:8]+' '+str(1000000+self.run_time)[1:3]+':'+str(1000000+self.run_time)[3:5]+':'+str(1000000+self.run_time)[5:7])
|
||
return resReturn
|
||
|
||
def getEndTime(self) -> datetime:
|
||
resReturn: datetime
|
||
resReturn = self.getStartTime()+timedelta(seconds=self.getRunDurationInSec())
|
||
return resReturn
|
||
|
||
|
||
def getRunDurationInSec(self) -> int:
|
||
resReturn: int
|
||
resReturn = 3600*(int(str(self.run_duration+1000000)[1:3]))+60*(int(str(self.run_duration+1000000)[3:5]))+(int(str(self.run_duration+1000000)[3:5]))
|
||
return resReturn
|
||
|
||
def getRunDurationInHourMinSec(self) -> str:
|
||
resReturn: str
|
||
resReturn=str(timedelta(seconds=self.getRunDurationInSec()))
|
||
return resReturn
|
||
|
||
def getRunHistory(self,Sysjob):
|
||
engine=inn.getEngine("msdb")
|
||
stmt = Select(Sysjobhistory).join(Sysjobhistory).filter(Sysjob)
|
||
with Session(engine) as session:
|
||
row : Sysjobs
|
||
res = session.execute(stmt).all()
|
||
print(res.__len__)
|
||
|
||
|
||
class DataflowManagement_JobListe(Base):
|
||
__tablename__ = "JobListe"
|
||
__table_args__ = { "schema": "dataflowmanagement.flw" }
|
||
JobID: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"),primary_key=True)
|
||
Aktiv=Column(BOOLEAN)
|
||
sysjob: Mapped["Sysjobs"] = relationship(back_populates="dataflow_jobs")
|
||
|
||
|
||
class DataflowManagement_JobAfhaengighed(Base):
|
||
__tablename__ = "JobAfhaengighed"
|
||
__table_args__ = { "schema": "dataflowmanagement.flw" }
|
||
ParentJobID: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"), primary_key=True)
|
||
ChildJobID: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"), primary_key=True)
|
||
OprettetDato: Mapped[datetime] = Column(DateTime)
|
||
AendretDato: Mapped[datetime] = Column(DateTime)
|
||
parent: Mapped[List["Sysjobs"]] = relationship(back_populates="parents",foreign_keys=[ParentJobID])
|
||
child: Mapped[List["Sysjobs"]] = relationship(back_populates="children",foreign_keys=[ChildJobID])
|
||
|
||
class DataflowManagement_JobMasterSetup(Base):
|
||
__tablename__ = "JobMasterSetup"
|
||
__table_args__ = { "schema": "dataflowmanagement.flw" }
|
||
JobID: Mapped[str] = mapped_column(UNIQUEIDENTIFIER,primary_key=True)
|
||
CurrentBatchID: Mapped[int] = mapped_column(INTEGER)
|
||
MinMellemAfvikling: Mapped[int] = mapped_column(INTEGER)
|
||
|
||
def getCurrentBatchId(session):
|
||
stmt = Select(DataflowManagement_JobMasterSetup)
|
||
res = session.execute(stmt).all()
|
||
return res
|
||
|
||
class DataflowManagement_JobsForExecution(Base):
|
||
__tablename__ = "JobsForExecution"
|
||
__table_args__ = { "schema": "dataflowmanagement.flw" }
|
||
JobID: Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"),primary_key=True)
|
||
BatchID: Mapped[int] = mapped_column(INTEGER)
|
||
ExecutionID: Mapped[int] = mapped_column(primary_key=True)
|
||
|
||
class msdb_sysjobsteps(Base):
|
||
__tablename__ = "sysjobsteps"
|
||
__table_args__ = { "schema": "msdb.dbo" }
|
||
job_id:Mapped[str] = mapped_column(ForeignKey("sysjobs.job_id"),primary_key=True)
|
||
step_id: Mapped[int] = mapped_column(INTEGER,primary_key=True)
|
||
step_uid: Mapped[str] = mapped_column(UNIQUEIDENTIFIER)
|
||
command:Mapped[str] = mapped_column(NVARCHAR)
|
||
sysjob: Mapped["Sysjobs"] = relationship(back_populates="sysjobsteps")
|
||
|
||
|
||
|
||
|
||
#class AllExecutionMessages(Base):
|
||
|