需求
在网络攻击溯源时,需要对攻击者的位置进行定位。
已知参数
已知攻击者发送攻击报文的接入基站的位置区识别码(LCA)和小区识别(CI)码
目标
获取攻击者位置
技术路线
- request 调用API查询经纬度位置
- openpyxl 读取 excel 表格
- sqlite3 读写数据库
- json 数据解析
Python 代码
import requests
from openpyxl import load_workbook
import re
import os
import json
import sqlite3# 传入一个列表,里面包含字典数据。
def writeDB(conn, cursor, lac, ci, mnc, real_data):errcode = real_data['errcode']lat = real_data['lat']lon = real_data['lon']radius = real_data['radius']address = real_data['address']cursor.execute("INSERT INTO lac_ci_cache(errcode, mnc, lac, ci, lat, lon, radius, address) VALUES (?,?,?,?,?,?,?,?)",(errcode, mnc, lac, ci, lat, lon, radius, address))conn.commit()def queryCache(cursor, lac, ci):cursor.execute("SELECT * FROM lac_ci_cache WHERE lac = ? AND ci = ?",(lac, ci))results = cursor.fetchall()# 获取查询结果的列名column_names = [desc[0] for desc in cursor.description]# 将查询结果转换为字典列表result_dict_list = []for row in results:row_dict = dict(zip(column_names, row))result_dict_list.append(row_dict)print(result_dict_list)return result_dict_listdef writeData(sheet, row, operator_code, content_str, status):print('content_str:' + str(content_str))print(type(content_str))dict_data = ["lat","lon","radius","address"]# 正常写入逻辑if(status == 0):real_data = json.loads(content_str)for number in range(4, 4 + len(dict_data)):sheet.cell(row = row, column = number, value = real_data[dict_data[number-4]])# 8 运营商operator_str = ""if(operator_code == 0):operator_str = "移动"elif(operator_code == 1):operator_str = "联通"else:operator_str = "未知"sheet.cell(row = row, column = 4 + len(dict_data), value = '移动')# 异常写入逻辑else:for number in range(4, 4 + len(dict_data) + 1):sheet.cell(row = row, column = number, value = 'N/A')def saveData(wb, file_name):wb.save(file_name)wb.close()# return sheet
def getSheet(file_name, sheet_name):# 获取当前目录current_directory = os.getcwd()# subdirectory = "xxx"# current_directory = os.path.join(current_directory, subdirectory)print(current_directory)# 读取Excel文件excel_file_path = file_name # 替换为你的Excel文件路径wb = load_workbook(filename=excel_file_path)# 选择工作表sheet = wb[sheet_name] # 替换为你的工作表名称return sheet,wb# return json text of lac ci
def getLocation(mnc, lac, ci, output):# 访问地址url = "http://api.cellocation.com:84/cell/?mcc=460&mnc={}&lac={}&ci={}&output={}".format(mnc, lac, ci, output) # 打印URLprint(url)# 定义要设置的Cookie# 发送HTTP GET请求获取页面内容response = requests.get(url)if response.status_code == 200:# 使用response.text获取页面的文本内容page_content = response.textreturn page_content;else:return 'none'if __name__ == "__main__":# 连接到 SQLite 数据库conn = sqlite3.connect('data.db')# 创建一个游标对象,用于执行 SQL 语句cursor = conn.cursor()# 创建表格cursor.execute('''CREATE TABLE IF NOT EXISTS lac_ci_cache (id INTEGER PRIMARY KEY AUTOINCREMENT,errcode INTEGER,mnc INTEGER,lac INTEGER,ci INTEGER,lat REAL,lon REAL,radius REAL,address TEXT)''') # 1 获取表单信息openpyxlsheet, wb= getSheet('lacci_test.xlsx','lacci_test')# 添加标题dict_data = ["纬度","精度","半径","地址","运营商"]for number in range(4, 4+len(dict_data)): sheet.cell(row = 1, column = number, value = dict_data[number-4])# 2 遍历查询row_index = 1;for row in sheet.iter_rows(min_row=2, max_row=201, min_col=1, max_col=2):row_index = row_index + 1;print('row_index' + str(row_index))# 获取单元格的值lac_value = row[0].valueci_value = row[1].value# 查询数据库是否已经存在cache_value = queryCache(cursor, lac_value, ci_value)# 数据库有直接填充if(len(cache_value)!=0):content_str = cache_value[0]operator_code = content_str['mnc']status = content_str['errcode']content_str = str(json.dumps(content_str))writeData(sheet, row_index, operator_code, content_str, status)continue;# 目前缓存的数据库不存在的情况 请求API 返回json字符串content_str = getLocation(0, lac_value, ci_value, 'json')real_data = json.loads(content_str)# 如果是移动的就写入if(real_data['errcode'] == 0):operator_code = 0;status = 0;writeData(sheet, row_index, operator_code, content_str, status)writeDB(conn, cursor, lac_value, ci_value, operator_code, real_data)continue;# 移动没有查到,换成联通的再调用API# 如果是联通的就写入content_str = getLocation(1, lac_value, ci_value, 'json')real_data = json.loads(content_str)if(real_data['errcode'] == 0):operator_code = 1;status = 0;writeData(sheet, row_index, operator_code, content_str, status)writeDB(conn, cursor, lac_value, ci_value, operator_code, real_data)continue;# 既不是移动也不是联通,触发异常逻辑operator_code = 0;status = 10001;writeData(sheet, row_index, operator_code, content_str, status)writeDB(conn, cursor, lac_value, ci_value, operator_code, real_data)saveData(wb,'lacci_test.xlsx')conn.close()