import sqlite3
import json
import os
import psycopg2
class DataBaseConnector:
def __init__(self, _db_config: dict):
self.db_host = _db_config['host'] # 数据库服务器
self.db_port = _db_config['port'] # 数据库端口,MySQL默认3306,PostgreSQL默认5432
self.db_user = _db_config['user'] # 数据库用户
self.db_password = _db_config['password'] # 数据库密码
self.db_name = _db_config['db_name'] # 数据库名
self.db_table = _db_config['db_table'] # 数据表名
self.conn = self.connect_db() # 数据库连接对象
self._current_path = os.path.dirname(__file__) # 当前文件路径
self.start_year = int(_db_config['start_year']) # NVD开始年份
self.end_year = int(_db_config['end_year']) # NVD结束年份
def connect_db(self):
"""
连接数据库,返回conn
:return: 数据库连接对象
"""
conn = sqlite3.connect('test.db')
# conn = psycopg2.connect(
# host=self.db_host,
# port=self.db_port,
# user=self.db_user,
# password=self.db_password,
# database=self.db_name
# )
return conn
def fill_db(self):
cursor = self.conn.cursor()
print("正在清理过时的数据库......")
cursor.execute("drop table if exists nvd ")
#cursor.execute("drop type if exists vector")
#cursor.execute("drop type if exists level")
# 对于攻击向量的枚举类型: ('NETWORK', 'ADJACENT_NETWORK', 'LOCAL', 'PHYSICAL')
#enum_vector_sql = "create type vector as enum ('NETWORK', 'ADJACENT_NETWORK', 'LOCAL', 'PHYSICAL')"
# 对于漏洞等级的枚举类型: ('CRITICAL', 'HIGH', 'MEDIUM', 'LOW')
#enum_level_sql = "create type level as enum ('CRITICAL', 'HIGH', 'MEDIUM', 'LOW')"
# 创建nvd数据表的SQL语句
create_table_sql = f"create table {self.db_table} (cve_id varchar(20) not null, \
attack_vector vector not null, vuln_level level not null, base_score decimal not null, \
exploitability_socre decimal not null, impact_score decimal not null, primary key(cve_id))"
#cursor.execute(enum_vector_sql) # 创建vector
#cursor.execute(enum_level_sql) # 创建level
cursor.execute(create_table_sql) # 创建nvd数据表
# 向nvd数据表中插入数据
for year in range(self.start_year, self.end_year + 1):
with open(f'nvdcve-1.1-{year}.json', 'r',encoding='gb18030',errors='ignore') as f:
print(f"正在导入{year}年的CVE数据......", end="")
load_dict = json.load(f)
for cve in load_dict['CVE_Items']:
"""
因为json文件中有的项可能只是占位项,并没有真的漏洞,
因此首先要判断是不是占位项,如果是,跳过当前项。
判断的标准很简单,看"impact"字段是否为空。
"""
if cve['impact'] == {}:
continue
cve_id = cve['cve']['CVE_data_meta']['ID'] # CVE ID
"""
2015年之前的漏洞都只有CVSS2评分,因此只能使用CVSS2;
而2021年最新的几个漏洞只有CVSS3,没有CVSS2,因此只能使用CVSS3;
中间的年份既有CVSS2,又有CVSS3。因此,无法统一标准,只能妥协:
如果存在CVSS3,优先使用CVSS3;否则再使用CVSS2。
"""
if 'baseMetricV3' in cve['impact']: # CVSS3
attack_vector = cve['impact']['baseMetricV3']['cvssV3']['attackVector'] # 攻击向量
vuln_level = cve['impact']['baseMetricV3']['cvssV3']['baseSeverity'] # 漏洞等级
base_score = cve['impact']['baseMetricV3']['cvssV3']['baseScore'] # base score
exploitability_score = cve['impact']['baseMetricV3'][
'exploitabilityScore'] # exploitability score
impact_score = cve['impact']['baseMetricV3']['impactScore'] # impact score
else: # CVSS2
attack_vector = cve['impact']['baseMetricV2']['cvssV2']['accessVector']
vuln_level = cve['impact']['baseMetricV2']['severity']
base_score = cve['impact']['baseMetricV2']['cvssV2']['baseScore']
exploitability_score = cve['impact']['baseMetricV2']['exploitabilityScore']
impact_score = cve['impact']['baseMetricV2']['impactScore']
# 向数据库中插入数据的SQL语句
insert_sql = f"insert into {self.db_table} values ('{cve_id}','{attack_vector}'," \
f"'{vuln_level}',{base_score:.2f},{exploitability_score:.2f},{impact_score:.2f})"
cursor.execute(insert_sql) # 插入本条CVE数据
print("导入成功!")
self.conn.commit()
print(f"\n[SUCCESS]成功向数据库中导入{self.start_year}年到{self.end_year}年的所有漏洞数据!")
def download_nvd_json(self):
"""
从NVD下载指定年份区间内的json文件并解压
"""
#os.system(f"rm -rf {self._current_path}/nvd_json/*")
for year in range(self.start_year, self.end_year + 1):
# 下载文件到 /db/nvd_json/目录下
os.system(f"c:\sqlite3\wget https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz")
#os.system(f"c:\sqlite3\gzip {self._current_path}\nvdcve-1.1-{year}.json.gz")
#os.system(f"tar -xvf c:\个人目录\编程\nvdcve-1.1-2020.json.gz")
def close(self):
"""
关闭数据库连接
"""
self.conn.colse()
if __name__ == "__main__":
db_config = {'host': '127.0.0.1', 'port': '5432', 'user': 'nvd', 'password': '自定义密码', 'db_name': 'nvd', 'db_table': 'nvd', 'start_year': 2020, 'end_year': 2021}
db = DataBaseConnector(db_config)
#db.download_nvd_json()
db.fill_db()