1. “极验” 滑动验证码如何科学调整
-
模拟人工操作
1. 轨迹模拟:人类正常滑动滑块是先加速后减速,可通过代码模拟此轨迹。使用 Python 的selenium
库结合ActionChains
类实现滑块拖动,并随机生成轨迹模拟人类行为。
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import random# 初始化浏览器
driver = webdriver.Chrome()
driver.get('https://example.com') # 替换为实际的验证码页面# 找到滑块元素
slider = driver.find_element_by_id('slider')# 模拟轨迹
def get_track(distance):track = []current = 0mid = distance * 3 / 4t = 0.2v = 0while current < distance:if current < mid:a = 2else:a = -3v0 = vv = v0 + a * tmove = v0 * t + 1 / 2 * a * t * tcurrent += movetrack.append(round(move))return trackdistance = 200 # 假设缺口距离为200px
track = get_track(distance)# 拖动滑块
ActionChains(driver).click_and_hold(slider).perform()
for x in track:ActionChains(driver).move_by_offset(xoffset=x, yoffset=0).perform()
ActionChains(driver).release().perform()
2. 缺口识别:利用图像识别技术,如 OpenCV 库处理验证码背景图和缺口图,计算缺口位置。
import cv2# 读取背景图和缺口图
bg_img = cv2.imread('bg.png', 0)
gap_img = cv2.imread('gap.png', 0)# 匹配缺口位置
result = cv2.matchTemplate(bg_img, gap_img, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
top_left = max_loc
distance = top_left[0]
print(f"缺口距离: {distance}")
-
打码平台:将验证码图片和相关信息提交给打码平台,平台的人工或算法会完成验证并返回结果。不同打码平台的 API 使用方式不同。
import requests# 假设打码平台的API地址和密钥
api_url = 'https://api.captcha.com/solve'
api_key = 'your_api_key'# 读取验证码图片
with open('captcha.png', 'rb') as f:img_data = f.read()# 发送请求到打码平台
data = {'api_key': api_key,'captcha_type': 'geetest','img': img_data
}
response = requests.post(api_url, data=data)
result = response.json()
print(result)
2. 爬虫一般多久爬一次,爬下来的数据怎么存储
爬取频率需根据网站规则、更新频率以及避免对服务器造成过大压力来设置。数据存储方式有文件、数据库和云存储等。
-
爬取频率
-
根据网站的
robots.txt
文件和更新频率设置爬取间隔,可使用 Python 的time.sleep()
函数实现。
-
import time
import requestswhile True:try:response = requests.get('https://example.com')print(response.text)except Exception as e:print(f"请求出错: {e}")time.sleep(3600) # 每小时爬取一次
-
数据存储
-
文本文件:使用 Python 的
open
函数或pandas
库读写 TXT、CSV 文件。
-
import pandas as pddata = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]
df = pd.DataFrame(data)
df.to_csv('data.csv', index=False)# 读取CSV文件
new_df = pd.read_csv('data.csv')
print(new_df)
-
数据库:以 MySQL 和 MongoDB 为例,使用
pymysql
和pymongo
库进行操作。
import pymysql
import pymongo# MySQL示例
# 连接数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()# 创建表
cursor.execute('CREATE TABLE IF NOT EXISTS users (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), age INT)')# 插入数据
data = [('Alice', 25), ('Bob', 30)]
cursor.executemany('INSERT INTO users (name, age) VALUES (%s, %s)', data)
conn.commit()# 查询数据
cursor.execute('SELECT * FROM users')
results = cursor.fetchall()
for row in results:print(row)conn.close()# MongoDB示例
# 连接数据库
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['test']
collection = db['users']# 插入数据
data = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]
collection.insert_many(data)# 查询数据
results = collection.find()
for doc in results:print(doc)
-
云存储:以阿里云 OSS 为例,使用
oss2
库进行操作。
import oss2# 阿里云OSS配置
auth = oss2.Auth('your_access_key_id', 'your_access_key_secret')
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'your_bucket_name')# 上传文件
with open('data.csv', 'rb') as f:bucket.put_object('data.csv', f)# 下载文件
result = bucket.get_object('data.csv')
with open('downloaded_data.csv', 'wb') as f:f.write(result.read())
3. cookie 过期如何处理
处理 cookie 过期问题可采用自动刷新、定期更新和使用代理 cookie 等方法。
-
自动刷新:使用
requests
库的会话对象Session
管理 cookie,当请求返回登录页面或提示 cookie 过期信息时,执行登录操作。
import requests# 创建会话对象
session = requests.Session()# 登录函数
def login():login_url = 'https://example.com/login'data = {'username': 'your_username','password': 'your_password'}response = session.post(login_url, data=data)if response.status_code == 200:print("登录成功")# 访问需要登录的页面
url = 'https://example.com/protected'
response = session.get(url)
if '登录' in response.text: # 假设返回登录页面包含“登录”字样login()response = session.get(url)
print(response.text)
-
定期更新:使用
APScheduler
库定时执行登录操作获取新的 cookie。
from apscheduler.schedulers.blocking import BlockingScheduler
import requestssession = requests.Session()def login():login_url = 'https://example.com/login'data = {'username': 'your_username','password': 'your_password'}response = session.post(login_url, data=data)if response.status_code == 200:print("登录成功")# 创建调度器
scheduler = BlockingScheduler()
scheduler.add_job(login, 'interval', hours=1) # 每小时执行一次登录操作
scheduler.start()
-
使用代理 cookie:有多个可用 cookie 时,一个 cookie 过期后切换到另一个继续请求。
import requests# 多个cookie
cookies_list = [{'name': 'cookie1', 'value': 'value1'},{'name': 'cookie2', 'value': 'value2'}
]url = 'https://example.com'
for cookies in cookies_list:try:response = requests.get(url, cookies=cookies)if response.status_code == 200:print(response.text)breakexcept Exception as e:print(f"使用 {cookies} 出错: {e}")
4. 动态加载又对及时性要求很高怎么处理
处理动态加载且及时性要求高的页面,可使用浏览器自动化工具、分析接口请求和消息队列等方法。
-
使用浏览器自动化工具:使用 Selenium 模拟浏览器行为,设置显式等待或隐式等待确保页面元素加载完成。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC# 初始化浏览器
driver = webdriver.Chrome()
driver.get('https://example.com')# 显式等待
try:element = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'dynamic_element')))print(element.text)
finally:driver.quit()
-
分析接口请求:通过浏览器开发者工具分析页面动态加载的接口请求,直接请求接口获取数据。
import requests# 假设分析得到的接口地址
api_url = 'https://example.com/api/data'
response = requests.get(api_url)
if response.status_code == 200:data = response.json()print(data)
-
消息队列:使用 RabbitMQ 作为消息队列,将数据采集和处理分离。
import pika
import requests# 连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 创建队列
channel.queue_declare(queue='data_queue')# 采集数据
def collect_data():url = 'https://example.com'response = requests.get(url)if response.status_code == 200:data = response.textchannel.basic_publish(exchange='', routing_key='data_queue', body=data)print("数据已发送到队列")collect_data()# 处理数据
def callback(ch, method, properties, body):print(f"收到数据: {body.decode()}")channel.basic_consume(queue='data_queue', on_message_callback=callback, auto_ack=True)
print('等待数据...')
channel.start_consuming()
5. HTTPS 有什么优点和缺点
HTTPS 的优点包括数据加密、身份验证和完整性保证;缺点有性能开销、成本和配置复杂等问题。
-
优点
-
数据加密:使用 SSL/TLS 协议对数据加密,防止传输中被窃取或篡改。例如,用户登录网站时,账号密码等敏感信息通过加密后传输,保障信息安全。
-
身份验证:通过数字证书验证服务器身份,防止中间人攻击。用户访问银行网站时,浏览器会验证银行服务器的证书,确保访问的是真实的银行网站。
-
完整性:使用哈希算法保证数据完整性,确保数据在传输过程中未被修改。
-
-
缺点
-
性能开销:SSL/TLS 握手过程增加网络延迟,降低网站访问速度。特别是在高并发场景下,性能影响更明显。
-
成本:购买和维护数字证书需要费用,对于小型网站来说可能是一笔不小的开支。
-
配置复杂:服务器需要进行复杂配置以确保 HTTPS 正常运行,需要专业的技术知识。
-
6. HTTPS 是如何实现安全传输数据的
HTTPS 通过 SSL/TLS 握手建立安全连接,然后使用会话密钥对数据进行加密传输,确保数据的安全性。
简单模拟 SSL/TLS 握手过程:
import socket
import ssl# 服务器端代码
def server():server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.bind(('localhost', 8443))server_socket.listen(1)context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)context.load_cert_chain(certfile='server.crt', keyfile='server.key')while True:client_socket, client_address = server_socket.accept()ssl_socket = context.wrap_socket(client_socket, server_side=True)data = ssl_socket.recv(1024)print(f"收到数据: {data.decode()}")ssl_socket.sendall(b"Hello, client!")ssl_socket.close()# 客户端代码
def client():client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)context.load_verify_locations('server.crt')ssl_socket = context.wrap_socket(client_socket, server_hostname='localhost')ssl_socket.connect(('localhost', 8443))ssl_socket.sendall(b"Hello, server!")data = ssl_socket.recv(1024)print(f"收到响应: {data.decode()}")ssl_socket.close()if __name__ == "__main__":import threadingserver_thread = threading.Thread(target=server)server_thread.start()client()
7. 什么是TTL,MSL,RTT
-
TTL(Time To Live):IP 数据包中的字段,限制数据包在网络中的生存时间,防止无限循环。在网络故障排查中,TTL 值可以帮助判断数据包是否在网络中循环。如果 TTL 值异常低,可能存在网络环路问题。
-
MSL(Maximum Segment Lifetime):TCP 分段在网络中能够存在的最长时间,确保延迟分段不影响后续连接。在 TCP 连接关闭时,会进入 TIME_WAIT 状态,持续时间为 2 倍的 MSL,以确保最后一个 ACK 包能够被对方收到。
-
RTT(Round-Trip Time):从发送方发送数据到收到接收方确认信息所经历的时间,用于计算 TCP 重传超时时间。RTT 的变化可以反映网络的拥塞情况。当 RTT 突然增大时,可能表示网络出现拥塞。
8. 什么是Selenium和PhantomJS
-
Selenium:用于自动化浏览器操作,支持多种浏览器,功能强大但性能相对较低,占用系统资源较多。
-
PhantomJS:无界面的无头浏览器,性能较高,占用资源少,但已停止维护,对新 Web 标准支持可能不足。
from selenium import webdriver# 初始化浏览器
driver = webdriver.Chrome()
driver.get('https://example.com')# 找到元素并点击
element = driver.find_element_by_id('button')
element.click()# 获取页面标题
title = driver.title
print(f"页面标题: {title}")# 关闭浏览器
driver.quit()
9. 爬虫平常怎么使用代理
爬虫使用代理可以通过免费代理、付费代理和代理池等方式。
-
使用免费代理:从免费代理网站获取代理 IP,使用
requests
库的proxies
参数设置代理。
import requestsproxies = {'http': 'http://127.0.0.1:8080','https': 'http://127.0.0.1:8080'
}try:response = requests.get('http://example.com', proxies=proxies)print(response.text)
except Exception as e:print(f"请求出错: {e}")
-
使用付费代理:购买专业代理服务提供商的代理 IP,通过 API 接口获取并使用。
import requests# 假设付费代理的API地址和密钥
api_url = 'https://proxy-api.com/get_proxy'
api_key = 'your_api_key'# 获取代理IP
response = requests.get(api_url, params={'api_key': api_key})
proxy = response.json()['proxy']proxies = {'http': f'http://{proxy}','https': f'http://{proxy}'
}try:response = requests.get('http://example.com', proxies=proxies)print(response.text)
except Exception as e:print(f"请求出错: {e}")
-
代理池:使用 Redis 管理代理池,每次请求从池中随机选择代理 IP,失效时移除。
import redis
import requests
import random# 连接Redis
r = redis.Redis(host='localhost', port=63
10. 怎么监控爬虫的状态
监控爬虫状态主要通过日志记录、性能监控、任务状态监控和报警机制这几个方面来实现。日志记录用于记录爬虫运行状态和错误信息;性能监控关注爬虫的 CPU、内存、网络带宽等指标;任务状态监控借助数据库或消息队列记录任务的完成情况;报警机制在爬虫出现异常时及时通知管理员。
- 要点
-
日志记录:除了记录基本的运行状态和错误信息外,还可以记录关键步骤的执行时间,便于后续分析性能瓶颈。对于分布式爬虫,不同节点的日志可以通过日志收集工具(如 ELK 栈)进行统一管理和分析。
-
性能监控:可以根据不同的爬虫场景设置性能指标的阈值,例如对于高并发的爬虫任务,更关注网络带宽和 CPU 使用率;对于数据处理密集型的爬虫,内存使用情况可能更为关键。还可以使用可视化工具(如 Grafana)将性能指标以图表的形式展示,方便直观地观察爬虫的运行状态。
-
任务状态监控:对于复杂的爬虫任务,可能存在多个子任务,可以为每个子任务设置状态标识,便于跟踪整个任务的进度。同时,可以结合任务的优先级和依赖关系,在任务出现异常时进行更合理的处理。
-
报警机制:除了邮件和短信报警外,还可以集成即时通讯工具(如 Slack、钉钉)进行报警,提高信息传递的及时性。可以根据不同的异常级别设置不同的报警方式,例如严重错误使用短信报警,一般错误使用邮件或即时通讯工具报警。
日志记录
import logging# 配置日志
logging.basicConfig(filename='spider.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')try:# 模拟爬虫执行logging.info('开始爬取网页')# 爬虫代码...logging.info('网页爬取完成')
except Exception as e:logging.error(f'爬取过程中出现错误: {e}')
性能监控
import psutil
import time# 监控爬虫性能
def monitor_performance():while True:cpu_percent = psutil.cpu_percent(interval=1)memory_percent = psutil.virtual_memory().percentnetwork_io = psutil.net_io_counters()logging.info(f'CPU使用率: {cpu_percent}%,内存使用率: {memory_percent}%,网络IO: 发送 {network_io.bytes_sent} 字节,接收 {network_io.bytes_recv} 字节')time.sleep(60) # 每分钟监控一次# 启动性能监控线程
import threading
monitor_thread = threading.Thread(target=monitor_performance)
monitor_thread.start()
任务状态监控
import sqlite3# 初始化数据库
conn = sqlite3.connect('spider_tasks.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT, status TEXT)')# 模拟任务执行
url = 'https://example.com'
try:# 开始任务cursor.execute('INSERT INTO tasks (url, status) VALUES (?,?)', (url, '正在进行'))conn.commit()# 爬虫代码...# 任务完成cursor.execute('UPDATE tasks SET status =? WHERE url =?', ('完成', url))conn.commit()
except Exception as e:cursor.execute('UPDATE tasks SET status =? WHERE url =?', ('失败', url))conn.commit()logging.error(f'任务 {url} 失败: {e}')# 查询任务状态
cursor.execute('SELECT * FROM tasks')
tasks = cursor.fetchall()
for task in tasks:print(f'任务ID: {task[0]},URL: {task[1]},状态: {task[2]}')conn.close()
报警机制
import smtplib
from email.mime.text import MIMEText# 邮件报警函数
def send_email_alert(subject, message):sender = 'your_email@example.com'receivers = ['recipient_email@example.com']msg = MIMEText(message)msg['Subject'] = subjectmsg['From'] = sendermsg['To'] = ', '.join(receivers)try:smtpObj = smtplib.SMTP('smtp.example.com', 587)smtpObj.starttls()smtpObj.login(sender, 'your_email_password')smtpObj.sendmail(sender, receivers, msg.as_string())logging.info('邮件报警发送成功')except Exception as e:logging.error(f'邮件报警发送失败: {e}')# 模拟异常情况触发报警
try:# 爬虫代码...raise Exception('模拟异常')
except Exception as e:send_email_alert('爬虫异常报警', f'爬虫出现异常: {e}')
代理池:使用 Redis 管理代理池,每次请求从池中随机选择代理 IP,失效时移除。
import redis
import requests
import random# 连接Redis
r = redis.Redis(host='localhost', port=63
友情提示:本文已经整理成文档,可以到如下链接免积分下载阅读
https://download.csdn.net/download/ylfhpy/90422308