RepositoryAutoCheck/TfsHandler.py

94 lines
3.6 KiB
Python

from ast import List
from pathlib import Path
from os import chdir
import subprocess
from LabelHandler import LabelHandler
class TfsHandler():
def __init__(self,tfPath,tfsBasePath):
self.tfPath=Path(tfPath)
self.tfsBasePath=Path(tfsBasePath)
def checkPropertyName(self):
pass
def updateRepository(self):
pass
def checkFile(self, file:Path) -> List:
fileResult=[]
f=open(file,"r", encoding='utf-8')
for x in f:
if(x.find("DTS:ObjectName")>-1):
fileResult=[file]
propertyNameTokens=x.split('=')
count=0
propertyNameToken=None
while count < len(propertyNameTokens):
if(propertyNameTokens[count].find("DTS:ObjectName")>-1):
propertyNameToken=propertyNameTokens[count+1].split('"')[1]
break
count+=2
if(propertyNameToken==None):
print(f"propertyNameToken findes ikke i {file.name}")
fileResult+=['',False]
continue
fileResult+=[propertyNameToken]
if( str.lower(file.stem)!=str.lower(propertyNameToken)):
fileResult+=[False]
else:
fileResult+=[True]
break
else:
continue
f.close()
return fileResult
def updateRepo(self,label,path):
chdir(path)
updateStatus=subprocess.run([str(self.tfPath.joinpath('tf.exe')),"vc","get","/all","/version:L"+label,str(path)],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,text=True)
if(len(updateStatus.stdout)>0):
print("STDOUT:\n" + updateStatus.stdout)
return True
else:
print("STDERR:\n" + updateStatus.stdout)
return False
def getProjectPath(self,label):
labelInfo=subprocess.run([str(self.tfPath.joinpath('tf.exe')),"vc","labels","/owner:*","/format:detailed",label],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,text=True)
if(len(labelInfo.stdout)>0):
print("STDOUT:\n" + labelInfo.stdout)
for a in labelInfo.stdout.split('\n'):
if(a.find('$/Datavarehus')!=-1 and a.find('Scope')==-1):
relativePath=a.split('$')[1]
break
else:
print("STDERR:\n" + labelInfo.stdout)
return None
resPath=self.tfsBasePath.joinpath(relativePath.removeprefix('/'))
if(resPath.is_dir() == False):
return None
else:
return resPath
def traverseDirectory(self, x:Path,propertyNameDiffs:List):
[print(z) for z in x.iterdir() if z.is_file and z.name]
[propertyNameDiffs.append(self.checkFile(z)) for z in x.iterdir() if z.is_file and z.name.endswith('.dtsx')]
[self.traverseDirectory(y,propertyNameDiffs) for y in x.iterdir() if y.is_dir() and y.name!='obj']
print(propertyNameDiffs)
def checkLabel(self,label):
labelResult=[]
if(label[0:4]=='SSIS'):
projectPath=self.getProjectPath(label)
if projectPath == None:
return
updated=self.updateRepo(label,projectPath)
if(updated==False):
return
labelResult=self.checkSSISProject(projectPath)
return labelResult
def checkSSISProject(self, projectPath):
labelResult=[]
self.traverseDirectory(Path(projectPath),labelResult)
failed=[a for a in labelResult if a[-1]==False]
return failed