from ast import List from pathlib import Path from os import chdir import subprocess from LabelHandler import LabelHandler class TfsHandler(): def __init__(self,tfPath,tfsBasePath): self.tfPath=Path(tfPath) self.tfsBasePath=Path(tfsBasePath) def checkPropertyName(self): pass def updateRepository(self): pass def checkFile(self, file:Path) -> List: fileResult=[] f=open(file,"r", encoding='utf-8') for x in f: if(x.find("DTS:ObjectName")>-1): fileResult=[file] propertyNameTokens=x.split('=') count=0 propertyNameToken=None while count < len(propertyNameTokens): if(propertyNameTokens[count].find("DTS:ObjectName")>-1): propertyNameToken=propertyNameTokens[count+1].split('"')[1] break count+=2 if(propertyNameToken==None): print(f"propertyNameToken findes ikke i {file.name}") fileResult+=['',False] continue fileResult+=[propertyNameToken] if( str.lower(file.stem)!=str.lower(propertyNameToken)): fileResult+=[False] else: fileResult+=[True] break else: continue f.close() return fileResult def updateRepo(self,label,path): chdir(path) updateStatus=subprocess.run([str(self.tfPath.joinpath('tf.exe')),"vc","get","/all","/version:L"+label,str(path)],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,text=True) if(len(updateStatus.stdout)>0): print("STDOUT:\n" + updateStatus.stdout) return True else: print("STDERR:\n" + updateStatus.stdout) return False def getProjectPath(self,label): labelInfo=subprocess.run([str(self.tfPath.joinpath('tf.exe')),"vc","labels","/owner:*","/format:detailed",label],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True,text=True) if(len(labelInfo.stdout)>0): print("STDOUT:\n" + labelInfo.stdout) for a in labelInfo.stdout.split('\n'): if(a.find('$/Datavarehus')!=-1 and a.find('Scope')==-1): relativePath=a.split('$')[1] break else: print("STDERR:\n" + labelInfo.stdout) return None resPath=self.tfsBasePath.joinpath(relativePath.removeprefix('/')) if(resPath.is_dir() == False): return None else: return resPath def traverseDirectory(self, x:Path,propertyNameDiffs:List): [print(z) for z in x.iterdir() if z.is_file and z.name] [propertyNameDiffs.append(self.checkFile(z)) for z in x.iterdir() if z.is_file and z.name.endswith('.dtsx')] [self.traverseDirectory(y,propertyNameDiffs) for y in x.iterdir() if y.is_dir() and y.name!='obj'] print(propertyNameDiffs) def checkLabel(self,label): labelResult=[] if(label[0:4]=='SSIS'): projectPath=self.getProjectPath(label) if projectPath == None: return updated=self.updateRepo(label,projectPath) if(updated==False): return labelResult=self.checkSSISProject(projectPath) return labelResult def checkSSISProject(self, projectPath): labelResult=[] self.traverseDirectory(Path(projectPath),labelResult) failed=[a for a in labelResult if a[-1]==False] return failed