1.sonarqube是一款代码分析的工具,通过soanrScanner扫描后的数据传递给sonarqube进行分析
2.sonarqube社区版没有对c++类代码的分析,但是可以找到一个开源的包,安装即可,扫描的话可以使用cppcheck来进行扫描
- 安装python对于sonarqube的api包:python-sonarqube-api
- 建立sonarqube连接
from sonarqube import SonarQubeClient
sonar = SonarQubeClient(
sonarqube_url="http://192.168.xx.xx:9000",
username='admin',
password='admin'
)
- 使用:建议大家先参考sonarqube的python-api
- Welcome to SonarQube Client with Python’s documentation! — SonarQube Client with Python 1.3.5 documentation
- 使用示例
# 通过项目名称获取id # 传递参数:创建分析配置文件时候的项目名称 component = sonar.components.get_project_component_and_ancestors("python_test")# 获取任务 # 参数1:上一步获取的component值 # 参数2:逗号分割的状态值 tasks1 = sonar.ce.search_tasks(componentId="AX5v36mo0Npud3J2od3a",status="FAILED,CANCELED,PENDING,IN_PROGRESS,SUCCESS" )# 获取所有项目 projects = list(sonar.projects.search_projects())# 获取这个项目下最近一次分析任务的详情 """ componentKeys:某一个项目名称 types:参数类型 CODE_SMELL==异常 BUG==bug VULNERABILITY==漏洞 SECURITY_HOTSPOT== """ issues2 = list(sonar.issues.search_issues(componentKeys="python_s", types="CODE_SMELL"))
- 通过metricKeys参数获取这个项目中需要的值
# 参数1:component 项目名称
# 参数2:metricKeys 想要获取的某个值,逗号分割
component_data = sonar.measures.get_component_with_specified_measures(component="python_test",metricKeys="functions,classes"
)['component']['measures']# 目前已收集的值和含义
'''
ncloc==总代码长度
ncloc_language_distribution==其中每种语言的行数
bugs==bug数
vulnerabilities==漏洞数
code_smells==异常数
duplicated_lines_density==重复度百分比
coverage==代码覆盖率百分比
files==文件数量
functions==方法数量
classes==类数量
'''
[root@localhost data]# cat wubo.py
#!/bin/python3
# encoding: utf-8
from sonarqube import SonarQubeClient
from operator import itemgetter
import json
import csv
import os
import time
import shutil
class SonarQube:def __init__(self,url,username="admin",password="123456aA") -> None:username = usernamepassword = passwordsonarqube_url = urlself.client = SonarQubeClient(username = username,password = password,sonarqube_url = sonarqube_url)def getProjects(self):"""获取项目列表"""projects = self.client.projects.search_projects().get("components")return projectsdef getIssues(self,jettech_project_name,issues_type):"""获取项目问题列表"""#projects = self.client.issues.search_issues(componentKeys="jettoloader-pressurecommon",types="BUG",s="FILE_LINE",resolved="false",ps=1,organization="default-organization",facets="authors",additionalFields="_all")projects = self.client.issues.search_issues(componentKeys=jettech_project_name,types=issues_type,s="FILE_LINE",resolved="false",ps=1,organization="default-organization",facets="authors",additionalFields="_all")list_issues = projects["facets"][0]["values"]#list_issues_name_count = []#for list_issues_item in list_issues:# list_issues_name_count.append(list_issues_item["val"])# list_issues_name_count.append(list_issues_item["count"])#print(list_issues)#list_issues[0]["val"])#list_issues[0]["count"])return list_issuesdef getMessages(self,component):""" 获取项目各个参数数据"""#metricKeys = "alert_status,bugs,,vulnerabilities,security_rating,code_smells,duplicated_lines_density,coverage,ncloc"metricKeys = "bugs,,vulnerabilities"messages = []messages.append(self.client.measures.get_component_with_specified_measures(component, metricKeys))return messages[0]def getMeasures(self,component,message):measures = []measures.insert(0,component)measures_all = message.get("component").get("measures")for measure_item in measures_all:measures_type = measure_item.get("metric")if "bugs" == measures_type:measures.insert(1,measure_item.get("value"))if "vulnerabilities" == measures_type:measures.insert(2,measure_item.get("value"))return measuresclass CSV:def __init__(self,filepath,filename) -> None:self.filepath = filepathself.filename = filenamedef csv_write(self,project_measure_all):#header = ['1project_name','2bugs','3vulnerabilities']with open(self.filepath+"/"+self.filename, 'a') as file_obj:writer = csv.writer(file_obj)#writer.writerow(header)for p in project_measure_all:writer.writerow(p)def csv_sort(self):datas=[]with open(self.filepath+"/"+self.filename, 'r') as f:table = []for line in f:line = line.replace("\n","").replace("\r","")col = line.split(',')col[0] = str(col[0])col[1] = col[1].strip("\n")table.append(col)table_sorted = sorted(table, key=itemgetter(0), reverse=False) # 精确的按照第1列排序for row in table_sorted:datas.append(row)f.close()with open(self.filepath+"/"+self.filename,"w", newline='') as csvfile:writer = csv.writer(csvfile)for data in datas:writer.writerow(data)csvfile.close()def csv_insert(self):header = 'project_name,bugs,vulnerabilities'with open(self.filepath+"/"+self.filename, 'r+', encoding='utf-8') as f:content = f.read()f.seek(0, 0)f.write(header + '\n' + content)f.close()def csv_delete(self):if (os.path.exists(self.filepath)):shutil.rmtree(self.filepath,ignore_errors=True)def csv_sum(self):with open(self.filepath+"/"+self.filename) as fin:readline_item=fin.readline()total_bug_api = 0total_bug_manager = 0total_bug_loader = 0total_bug_ui = 0total_vulnerabilities_api = 0total_vulnerabilities_manager = 0total_vulnerabilities_loader = 0total_vulnerabilities_ui = 0for row in csv.reader(fin):row_project_name=row[0].split("-")[0]if "jettoapi" == row_project_name:total_bug_api += int(row[1])total_vulnerabilities_api += int(row[2])if "jettoloader" == row_project_name:total_bug_loader += int(row[1])total_vulnerabilities_loader += int(row[2])if "jettomanager" == row_project_name:total_bug_manager += int(row[1])total_vulnerabilities_manager += int(row[2])if "jettoui" == row_project_name:total_bug_ui += int(row[1])total_vulnerabilities_ui += int(row[2])fin.close()header_kong = ['','','']header_api = ['jettoapi','bug总数',str(total_bug_api),'vulnerabilities总数',str(total_vulnerabilities_api)]header_loader = ['jettoloader','bug总数',str(total_bug_loader),'vulnerabilities总数',str(total_vulnerabilities_loader)]header_manager = ['jettomanager','bug总数',str(total_bug_manager),'vulnerabilities总数',str(total_vulnerabilities_manager)]header_ui = ['jettoui','bug总数',str(total_bug_ui),'vulnerabilities总数',str(total_vulnerabilities_ui)]with open(self.filepath+"/"+self.filename, 'a') as file_obj:writer = csv.writer(file_obj)writer.writerow(header_kong)writer.writerow(header_api)writer.writerow(header_loader)writer.writerow(header_manager)writer.writerow(header_ui)file_obj.close()class SCP: def __init__(self,localdir,remoteip,remotedir) -> None:self.localdir = localdirself.remoteip = remoteipself.remotedir = remotedirdef scp_operate(self):os.system('scp -r "%s" "%s:%s"' % (self.localdir, self.remoteip, self.remotedir))def main():sonarQube = SonarQube(url='http://172.16.10.1:9000/')all_project_info = sonarQube.getProjects()project_measure_all=[]project_issues_all=[]for project_info in all_project_info:component = project_info.get("key")message = sonarQube.getMessages(component)measure = sonarQube.getMeasures(component,message)project_issues=[]list_issues_s = sonarQube.getIssues(component,"BUG")project_issues.append(component)for list_issues_s_item in list_issues_s:project_issues.append(list_issues_s_item["val"])project_issues.append(list_issues_s_item["count"])project_measure_all.extend([tuple(measure)])project_issues_all.extend([tuple(project_issues)])print([tuple(measure)])#print(project_issues_all)filepath=time.strftime("%Y-%m-%d")filename="jettech_sornar_"+filepath+"_projects.csv"filename_isuess="jettech_sornar_"+filepath+"_projects_iseuss.csv"if not os.path.exists(filepath):os.makedirs(filepath)if os.path.exists(filepath+"/"+filename):os.remove(filepath+"/"+filename)if os.path.exists(filepath+"/"+filename_isuess):os.remove(filepath+"/"+filename_isuess)csv_obj = CSV(filepath=filepath,filename=filename)csv_obj_isuess = CSV(filepath=filepath,filename=filename_isuess)csv_obj.csv_write(project_measure_all)csv_obj_isuess.csv_write(project_issues_all)csv_obj.csv_sort()csv_obj.csv_insert()csv_obj.csv_sum()localdir=filepathremoteip="192.168.1.99"remotedir="/product/gohttpserver/data/sornar/" scp_obj = SCP(localdir,remoteip,remotedir)scp_obj.scp_operate()csv_obj.csv_delete()if __name__== "__main__" :main()