一、项目需求
在一个业务网站有可以一个个打开有相关内容的文本,需要逐个保存为TXT,数据量是以千为单位,人工操作会麻木到崩溃。
二、解决方案
目前的基础办法就是使用python+selenium自动化来代替人工去操作,虽然效率比其他爬虫低,但是也防止被封IP的风险。也能满足项目的需求。准备工作,先从网站下载项目清单xls文件,里面会有对应的唯一识别码,就是编号。
三、写代码具体技术路线
1. init一个初始化函数,把selenium的驱动相关参数设置好。
2.get_html函数,实现打开浏览器登录进入需要爬取的主页面。
3.excel_to_dict函数,实现将前面准备的xls文件转为字典。
4.read_leftover函数,实现将尚未下载和下载失败的清单单独提取出来。
5.text_write函数,实现把页面上的文本内容保存到指定路径下的TXT文件中。
6.download函数,实现从主页面进入需要爬取的页面,完成导出后,再恢复初始状态。
7.run函数,是把所有功能函数驱动起来,实现爬虫过程。
8.讲以上函数定义到一个类里去,实现代码规范化。
四、具体代码如下:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import time
import xlrd
from xlutils.copy import copy
import osclass Zbph_Spider:def __init__(self):self.url = 'https://******'self.options = webdriver.FirefoxOptions()# 设置无头模式# self.options.add_argument("--headless")self.options.add_argument("--disable-gpu")# 隐身模式(无痕模式)self.driver = webdriver.Firefox(options=self.options)def get_html(self):# 调用WebDriver 对象的get方法 可以让浏览器打开指定网址self.driver.get(self.url)# 设置最大等待时长为 3秒,全局的。self.driver.implicitly_wait(15)# 最大化窗口self.driver.maximize_window()# 设置为本台电脑最大的分辨率,不然有头模式能运行,无头模式将无法运行。self.driver.set_window_rect(x=0, y=0, width=1920, height=1080)# 把弹窗点击确认让其消失self.driver.find_element(By.XPATH, '//span[contains(text(),"确 定")]').click()# 输入账号self.driver.find_element(By.XPATH, '//input[@id="txtLoginId"]').send_keys('****')# 输入密码self.driver.find_element(By.XPATH, '//input[@id="txtPassword"]').send_keys('*****')# 输入行政区代码self.driver.find_element(By.XPATH, '//input[@id="txtXzqdm"]').send_keys('522700')# 点击获取(手机)验证码,然后等待页面提示弹窗出来self.driver.find_element(By.XPATH, '//input[@id="btn_getValidCode"]').click()time.sleep(2)# 切换到alert弹窗并点击确定self.driver.switch_to.alert.accept()sj_yzm = input('请输入手机验证码:')time.sleep(0.5)self.driver.find_element(By.XPATH, '//input[@id="CheckCode"]').send_keys(sj_yzm)self.driver.find_element(By.XPATH, '//div/input[@value="登 录"]').click()time.sleep(3)# 把excel内容读取转为字典的函数def excel_to_dict(self, path):# 打开excel表格data_excel = xlrd.open_workbook(path)# 获取book中的第一sheet工作表法,返回一个xlrd.sheet.Sheet()对象table = data_excel.sheets()[0] # 通过索引顺序获取sheetkeys = table.row_values(0)# 获取总行数rowNum = table.nrows# 获取总列数colNum = table.ncols# 需要读取的列,这里选择读取第二列,和第六列和第17列design_col_num = [0, 1, 5, 16]if rowNum <= 1:print("总行数小于1")else:r = []j = 1for i in range(rowNum - 1):s = {}# 从第二行获取对应的values值,方便下一步建立Keys使用values = table.row_values(j)for x in design_col_num:s[keys[x]] = values[x]r.append(s)j += 1return r# 把下载失败的项目清单写入exceldef append_to_excel(self, words, row, column, filename):# 打开excelword_book = xlrd.open_workbook(filename)# 把xlrw对象变成xlwt对象,压入内存,使之可编辑new_work_book = copy(word_book)# 把可编辑的第一张表拿到new_sheet = new_work_book.get_sheet(0)# 设置开始写入的行数# 把文本写入对应的单元格内,参数分别是行、列和值,行列数从0起算。new_sheet.write(row + 1, column, words)# 从内存中把修改后的数据保存覆盖原来的文件new_work_book.save(filename)# print('下载状态追加写入成功!')# 读取仍需下载的项目清单。通过删掉下载状态为”下载成功“的记录实现,下一步只针对未下载或者下载失败的条目进行操作def read_leftover(self, list):i = 0while i < len(list):if list[i]['下载状态'] == '下载成功':del list[i]else:i += 1return list# 创建txt文件并并写入文本def text_write(self, name, msg, directory):full_path = directory + '\\TXT\\' + name + '.txt'file = open(full_path, 'w')file.write(msg)file.close()# 调用项目清单逐个下载坐标def download(self, xmmc, directory):# 点击左侧功能列表if xmmc['项目阶段'] == '验收项目':self.driver.find_element(By.XPATH, '//li/div[@title="项目验收阶段"]').click()time.sleep(0.5)self.driver.find_element(By.XPATH, '//span[text()="验收项目查询"]').click()time.sleep(0.5)else:self.driver.find_element(By.XPATH, '//li/div[@title="项目立项阶段"]').click()time.sleep(0.5)self.driver.find_element(By.XPATH, '//span[text()="立项项目查询"]').click()time.sleep(0.5)# 切入默认框架self.driver.switch_to.parent_frame()# 切入页面内嵌的第一层框架self.driver.switch_to.frame('u0')# 清除并输入新的备案编号self.driver.find_element(By.XPATH, '//*[@id="inp_xmbh"]').clear()self.driver.find_element(By.XPATH, '//*[@id="inp_xmbh"]').send_keys(xmmc['备案编号'])time.sleep(1)# 点击查询self.driver.find_element(By.XPATH, '//*[@id="btn_query"]').click()time.sleep(2.5)# 显性等待,等查询结果第一条是被查询对象,一直等到后面的条件成立(能定位)。WebDriverWait(self.driver, 20).until(EC.presence_of_element_located((By.XPATH, "//table/tbody/tr[1]/td[3][text()='" + xmmc['备案编号'] + "']")))# 定位到项目名称并双击element = self.driver.find_element(By.XPATH, '//*[@id="tlw_list_table"]/tbody/tr[1]/td[4]/input')ActionChains(self.driver).double_click(element).perform()time.sleep(1)# 等待新增耕地坐标按钮出现WebDriverWait(self.driver, 20).until(EC.element_to_be_clickable((By.XPATH, '//div[(text()="新增耕地坐标")]')))# 点击新增耕地坐标,使用JS脚本来点击,能解决需要点击的要素被遮挡无法点击的情况。xzgdzb = self.driver.find_element(By.XPATH, '//div[(text()="新增耕地坐标")]')self.driver.execute_script('arguments[0].click()', xzgdzb)time.sleep(3)# 切入内嵌的第二层框架self.driver.switch_to.frame('iframe_tab')time.sleep(0.2)dc_zz = self.driver.find_element(By.XPATH, '//div[(text()="导出")]')self.driver.execute_script('arguments[0].click()', dc_zz)time.sleep(0.5)# 切换到最新打开的标签页self.driver.switch_to.window(self.driver.window_handles[-1])try:# 获取页面文本words = self.driver.find_element(By.XPATH, '/html/body/pre').text# 调用写入txt函数把页面文本内容写入txt文件self.text_write(xmmc['备案编号'], words, directory)time.sleep(0.2)# 关闭当前标签页self.driver.close()print('---下载成功---')self.append_to_excel('下载成功', int(xmmc['序号']) - 1, 16, 'demo.xls')except:print('---下载失败---')self.append_to_excel('下载失败', int(xmmc['序号'] - 1), 16, 'demo.xls')self.driver.switch_to.window(self.driver.window_handles[0])# 切入页面内嵌的第一层框架self.driver.switch_to.frame('u0')# 点击返回self.driver.find_element(By.XPATH, '//div[text()="返回"]').click()time.sleep(0.5)# 把左侧列表返回默认状态if xmmc['项目阶段'] == '验收项目':# 退出框架,回到主页面self.driver.switch_to.parent_frame()self.driver.find_element(By.XPATH, '//li/div[@title="项目验收阶段"]').click()time.sleep(0.2)else:# 退出框架,回到主页面self.driver.switch_to.parent_frame()self.driver.find_element(By.XPATH, '//li/div[@title="项目立项阶段"]').click()time.sleep(0.2)# 驱动函数def run(self, path):# 调用读取xls函数读取项目清单list = self.excel_to_dict(path)new_list = self.read_leftover(list)# 打开、登录进入内部的页面self.get_html()# 调用下载函数根据项目清单依次下载所有项目坐标directory = os.path.dirname(path) # 提取xls表格所在文件夹i = 1numbers_sum = len(new_list)for Id in range(numbers_sum):print("{}/{} {}".format(i, numbers_sum, new_list[Id]['备案编号']))self.download(new_list[Id], directory)i += 1print(20*"-", '下载完毕', 20*"-")# 下载完毕后退出驱动self.driver.quit()# 主函数
if __name__ == '__main__':# 调用读取xls函数读取项目清单Path = r"D:\demo\批量下载\demo.xls"spider = Zbph_Spider()spider.run(Path)