爬取APP :https://app5.scrape.center
思路分析
遍历首页已有的所有电影条目, 一次模拟点击每个电影条目,进入详情页
爬取详情页的数据,之后模拟点击回退按钮返回首页
当首页已有的电影条目即将爬取完毕时, 模拟上拉操作, 加载更多数据
爬取过程中将已经爬取的数据记录下来,以免重复爬取
100 条数据全部爬取完毕后,终止爬取
实战爬取
确保 app5 已经安装,并且能正确启动。然后打开 AirtestIDE 切换到 Poco 模式,本节中 , AirtestIDE 仅仅是辅助我们审查节点属性的,所以界面左侧可以只显示 Poco 辅助窗, 中间栏只保留 Log查看窗。 至于代码, 可以单独在 Python 文件中编写,不一定非要在这里
首先引入一些必要的库,并初始化一些变量
from airtest.core.api import * from poco.drivers.android.uiautomation import AndroidUiautomationPocopoco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False) window_width, window_height = poco.get_screen_size() PACKAGE_NAME = 'com.goldze.mvvmhabit' TOTAL_NUMBER = 100
这里引入了 Airtest 的 API 和 AndroidUiautomationPoco 类, 然后初始化了 poco 对象。接着调用 poco 对象的 get_screen_size 方法获取了屏幕的宽高, 并分别赋值给 window_width, window_height 。 之后定义了两个常量, PACKAGE_NUMBER 代表包名, TOTAL_NUMBER 代表爬取数据的总数
接下来就是先爬取首页的所有电影数据, 用 AirtestIDE 来查看一下节点的属性,选中一个电影条目
所选中的节点的 name 是 com.goldze.mvvmhabit:id/item ,而且不会和其他层级节点的 name 有重复,所以我们可以直接使用 name 属性选择节点, 实现一个 scrape_index 方法
def scrape_index():elements = poco(f'{PACKAGE_NAME}:id/item')elements.wait_for_appearance()return elements
这里直接将 name 作为参数传给了 poco 对象, 并赋值为 elements 变量, 然后调用它的 wait_for_appearance 方法等待节点加载出来, 加载出来后返回。 在正常情况下, scrape_index 方法可以获得首页当前呈现的所有电影条目。 我们定义一个 main 方法来调用 scrape_index 方法
from loguru import loggerdef main():elements = scrape_index()for element in elements:element_data = scrape_detail(element)logger.debug(f'scrape data {element_data}')if __name__ == '__main__':init_device("Android")stop_app(PACKAGE_NAME)start_app(PACKAGE_NAME)main()
在main 方法中, 我们首先调用 scrape_index 方法提取了首页当前已有的所有电影条目, 赋值为 elements 变量。 然后就遍历这个变量中的元素, 并希望通过一个 scrape_detail 方法爬取每部电影的详细信息,之后输出日志,返回
这里提到的 scrape_detail 方法的基本实现思路
模拟点击 element , 即首页中的某个电影条目
进入电影详情页之后,爬取详情信息
点击回退按钮返回首页
在 AirtestIDE 中, 点击首页的任意一个电影条目,进入详情页, 查看节点信息
可以看到整体详情信息的最外侧是 name 为 com.goldze.mvvmhabit:id/content 的面板,内部是一个个具体的 TextVIew , 所以这里可以先选定这个面板的节点, 然后等待其加载, 加载出来之后,再依次选择标题,类别,评分等节点, 通过调用 attr 方法并传入对应的属性名称 text ,即可获取节点文本, scrape_detail 的实现方法如下
def scrape_detail(element):element.click()panel = poco(f'{PACKAGE_NAME}:id/content')panel.wait_for_appearance()title = poco(f'{PACKAGE_NAME}:id/title').attr('text')categories = poco(f'{PACKAGE_NAME}:id/categories_value').attr('text')score = poco(f'{PACKAGE_NAME}:id/score_value').attr('text')published_at = poco(f'{PACKAGE_NAME}:id/published_at_value').attr('text')drama = poco(f'{PACKAGE_NAME}:id/drama_value').attr('text')keyevent('BACK')return {'title': title,'categories': categories,'score': score,'published_at': published_at,'drama': drama}
这里的 scrape_detail 方法的 element 参数就是某个电影条目,对应一个 UIObjectProxy 对象,调用 click 方法就会跳转到对应的的详情页, 然后爬取其中的信息,爬取完毕后调用 keyevent 方法并传入 BACK 参数,返回首页,最后将爬取的信息返回即可
运行一下代码
2024-08-17 13:26:30.681 | DEBUG | __main__:main:39 - scrape data {'title': '霸王别姬', 'categories': '剧情、爱情', 'score': '9.5', 'published_at': '1993-07-26', 'drama': '影片借一出《霸王别姬》的京戏,牵扯出三个人之间一段随时代风云变幻的爱恨情仇。段小楼(张丰毅 饰)与程蝶衣(张国荣 饰)是一对打小一起长大的师兄弟,两人一个演生,一个饰旦,一向配合天衣无缝,尤其一出《霸王别姬》,更是誉满京城,为此,两人约定合演一辈子《霸王别姬》。但两人对戏剧与人生关系的理解有本质不同,段小楼深知戏非人生,程蝶衣则是人戏不分。段小楼在认为该成家立业之时迎娶了名妓菊仙(巩俐 饰),致使程蝶衣认定菊仙是可耻的第三者,使段小楼做了叛徒,自此,三人围绕一出《霸王别姬》生出的爱恨情仇战开始随着时代风云的变迁不断升级,终酿成悲剧。'}
这时我们可以获取首页最开加载的几条电影信息了
上拉加载逻辑
现在添加上拉加载逻辑-----当爬取的节点对应的电影条目差不多位于页面高度的 80% 以下时,就触发加载, 将 main 方法改写如下
def main():elements = scrape_index()for element in elements:_, element_y = element.get_position()if element_y > 0.5:scroll_up()element_data = scrape_detail(element)logger.debug(f'scrape data {element_data}')
这里调用了 element 的 get_position 方法获取了当前节点的纵坐标, 返回结果是 0 和 1 之间的数字, 而非绝对的像素点位置, 所以这里可以直接做判断, 当返回的数字大于 0.5(根据自己设备的具体情况) 时, 就调用 scroll_up 方法模拟上拉, 以加载新数据。 scroll_up 定义如下
def scroll_up():swipe((window_width * 0.5, window_height * 0.8), vector=[0, -0.5], duration=1)
这里我们直接调用 Airtest API 里的 swipe 方法, 第一个参数是初始点击位置, 第二个参数是滑动方向,第三个参数是滑动时间 (单位秒)
去重, 终止
我们需要额外添加根据标题进行去重和判断终止的逻辑, 所以在遍历首页中每个电影条目的时候,还需要爬取一下标题,并将其存入一个全局变量中, 将 main 方法改写如下
from loguru import loggerscraped_titles = [] def main():while len(scraped_titles) < TOTAL_NUMBER:elements = scrape_index()for element in elements:element_title = element.offspring((f'{PACKAGE_NAME}:id/tv_title'))if not element_title.exists():continuetitle = element_title.attr('text')logger.debug(f'get title {title}')if title in scraped_titles:continue_, element_y = element.get_position()if element_y > 0.5:scroll_up()element_data = scrape_detail(element)scraped_titles.append(title)logger.debug(f'scrape data {element_data}')
这里我们调用 element 的 offspring 方法传入了标题对应的 name , 并提取了其内容,然后声明全局变量 scraped_titles 来存储已经爬取的电影标题。 每次爬取之前, 先判断 title 是否已经存在于 scraped_titles 中, 如果已经存在,就跳过, 否则接着爬取,爬取完后将得到的标题存到 scraped_titles 里, 这样就实现去重了,另外, 我们在 main 方法中添加了 while 循环, 如果爬取的电影条目数目尚未达到目标数量 TOTAL_NUMBER, 就接着爬取,直到爬取完毕
保存数据
现在添加一个保存数据的逻辑,将爬取的数据以JSON 形式保存到本地的 movie 文件夹
import os import jsonOUTPUT_FOLDER = 'movie' os.path.exists(OUTPUT_FOLDER) or os.makedirs(OUTPUT_FOLDER)def save_data(element_data):with open(f"{OUTPUT_FOLDER}/{element_data.get('title')}.json", 'w', encoding='utf-8') as f:f.write(json.dumps(element_data, ensure_ascii=False, indent=2))logger.debug(f"saved as file {element_data.get('title')}.json")
最后在 main 方法中调用即可
整理后的全部代码
from airtest.core.api import * from poco.drivers.android.uiautomation import AndroidUiautomationPoco from loguru import logger import os import jsonpoco = AndroidUiautomationPoco(use_airtest_input=True, screenshot_each_action=False) window_width, window_height = poco.get_screen_size() PACKAGE_NAME = 'com.goldze.mvvmhabit' TOTAL_NUMBER = 100 OUTPUT_FOLDER = 'movie' os.path.exists(OUTPUT_FOLDER) or os.makedirs(OUTPUT_FOLDER) scraped_titles = []def scrape_index():elements = poco(f'{PACKAGE_NAME}:id/item')elements.wait_for_appearance()return elementsdef scrape_detail(element):element.click()panel = poco(f'{PACKAGE_NAME}:id/content')panel.wait_for_appearance()title = poco(f'{PACKAGE_NAME}:id/title').attr('text')categories = poco(f'{PACKAGE_NAME}:id/categories_value').attr('text')score = poco(f'{PACKAGE_NAME}:id/score_value').attr('text')published_at = poco(f'{PACKAGE_NAME}:id/published_at_value').attr('text')drama = poco(f'{PACKAGE_NAME}:id/drama_value').attr('text')keyevent('BACK')return {'title': title,'categories': categories,'score': score,'published_at': published_at,'drama': drama}def scroll_up():swipe((window_width * 0.5, window_height * 0.8), vector=[0, -0.5], duration=1)def save_data(element_data):with open(f"{OUTPUT_FOLDER}/{element_data.get('title')}.json", 'w', encoding='utf-8') as f:f.write(json.dumps(element_data, ensure_ascii=False, indent=2))logger.debug(f"saved as file {element_data.get('title')}.json")def main():while len(scraped_titles) < TOTAL_NUMBER:elements = scrape_index()for element in elements:element_title = element.offspring((f'{PACKAGE_NAME}:id/tv_title'))if not element_title.exists():continuetitle = element_title.attr('text')logger.debug(f'get title {title}')if title in scraped_titles:continue_, element_y = element.get_position()if element_y > 0.5:scroll_up()element_data = scrape_detail(element)scraped_titles.append(title)save_data(element_data)logger.debug(f'scrape data {element_data}')if __name__ == '__main__':init_device("Android")stop_app(PACKAGE_NAME)start_app(PACKAGE_NAME)main()