from env import * from flask import Flask, render_template import os import FlowTest from sysjobs import * import sysjob2html import pandas as pd import logging from dateutil import tz from pytz import timezone import koerselsOverblikUtils def create_app(test_config=None): # create and configure the app FlowAnalyserMain = Flask(__name__, instance_relative_config=True) #app.config.from_mapping( ## SECRET_KEY='dev', # DATABASE=os.path.join(app.instance_path, 'flaskr.sqlite'), #) if test_config is None: # load the instance config, if it exists, when not testing FlowAnalyserMain.config.from_pyfile('config.py', silent=True) else: # load the test config if passed in FlowAnalyserMain.config.from_mapping(test_config) # ensure the instance folder exists try: os.makedirs(FlowAnalyserMain.instance_path) except OSError: pass @FlowAnalyserMain.context_processor def inject_debug(): return dict(debug=FlowAnalyserMain.debug) # a simple page that says hello @FlowAnalyserMain.route('/hello') def hello(): return FlowTest.test() @FlowAnalyserMain.route('/test') def test(): listSysjobs=[] listStartSlutjobs=[Sysjobs.getSysjob(session,'BI - Flow - Batch Start – Daglig kl. 20.00',True)[0][0]] listStartSlutjobs.append(Sysjobs.getSysjob(session,'BI - Flow - Batch Slut – Daglig kl. 20.00',True)[0][0]) listStartSlutjobs=koerselsOverblikUtils.convertToAlike(listStartSlutjobs,0,30,0) sysjobsAlike=koerselsOverblikUtils.timeRangeMerge(listStartSlutjobs, "Batch køretid",0,30,2) listSysjobs.append(sysjobsAlike) listSysjobs.extend(listStartSlutjobs) sysjobs=Sysjobs.getSysjob(session,'% - Admin - %',False) sysjobs=[a for a in sysjobs if a[0].name not in ["BI - Admin - Kill Blocking Queries","BI - Admin - Flow Job Restarter"]] if(sysjobs!=None and len(sysjobs)>0): sysjobs=koerselsOverblikUtils.convertToAlike(sysjobs,0,30,2) listSysjobs.extend(sysjobs) listSysjobs = [x for x in listSysjobs if len(x.getMedianDag(0,30,2))>0] listSysjobs = sorted(listSysjobs, key=lambda x: x.getMedianDag(0,30,2)[0]) return render_template('index.html', test=listSysjobs)#,startSlut=listStartSlutjobs) @FlowAnalyserMain.route('/test3') def test3(): sysjobs=(Sysjobs.getNattensKoersel(session)) return render_template('index3.html', test3=sysjobs) @FlowAnalyserMain.route('/test4') def test4(): sessions=(biadmin_log_ActiveSessionsByInspari.getSessionTimeSpan(session, datetime.fromisoformat('2023-06-01 23:14:16.817'), datetime.fromisoformat('2023-06-02 03:14:18.817'))) return render_template('index4.html', test4=sessions) @FlowAnalyserMain.route('/test4//') def test4_getSession(sessionID,logdate): sessions=(biadmin_log_ActiveSessionsByInspari.getSessionByID(session,int(sessionID), datetime.fromisoformat(logdate).astimezone(tz.gettz('Europe/Copenhagen')))) return render_template('index4.html', test4=sessions) @FlowAnalyserMain.route('/test2') def test2(): with Session(engine) as session: sysjobs=(Sysjobs.getNattensKoersel(session)) data2=json.dumps([str(b) for b in sysjobs]) return render_template('index2.html', test2=sysjobs) if __name__ == '__main__': FlowAnalyserMain.run() return FlowAnalyserMain engine=inn.getEngine("msdb") logging.basicConfig() logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG) logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG) with Session(engine) as session: FlowAnalyserMain=create_app()