WOAautowork

自动化爬虫脚本实践项目总结

此脚本用于自动化爬取WOA爽文、科技文并尝试自行总结为精简的情节与信息,从而达到防沉迷的效果

目录

  1. 导入模块
  2. 全局配置
  3. 工具项
    1. 随机延时函数
    2. 模拟人类行为函数
    3. 时间过滤器
    4. 文件保存器
    5. 浏览器锁文件清理
    6. 验证码处理守卫
    7. dd推送底层请求
    8. dd消息推送
    9. AI总结接口
  4. 自动化发动机(包含全流程运作图)
  5. 主程序
  6. 总指挥
    1. 启动浏览器
    2. 检查登录
    3. 遍历WOA
      1. 返回书架自重试
      2. 进入文章列表
      3. 循环获取该WOA上的所有文章
  7. 终极战力

导入模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio #异步编程库,用于处理异步操作
# 导入Playwright的异步API,用于模拟浏览器操作
from playwright.async_api import async_playwright, Page
# 导入随机数库,用于生成随机延时、随机坐标等模拟人类行为
import random
# 导入操作系统库,用于文件路径、目录操作
import os
# 导入正则表达式库,用于文本匹配、清洗
import re
# 导入日期时间库,用于时间处理、格式转换
from datetime import datetime
# 导入警告处理库,用于忽略指定类型的警告
import warnings
# 导入系统库,用于程序退出、系统相关操作
import sys
# 导入HTTP请求库,用于调用dd机器人、DS API
import requests
# 导入时间库,用于同步延时(与asyncio.sleep区分)
import time
# 导入JSON库,用于构造请求体、解析响应
import json

playwright:剧作家(浏览器自动化测试工具)

async:异步

asyncio:异步I/O

全局配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 1. 预设随机语料库(用于dd推送的前置消息随机化)
# 可自定义增减,提升推送内容的随机性
RANDOM_PREFIXES = [
"🔔 发现一篇新文章:",
"📖 今日份深度阅读已送达:",
"🚀 抓取到一条新动态:",
"✨ 发现宝藏内容,请查收:",
"🤖 机器人小助提醒,新文更新:",
"📺 你订阅的频道有更新啦:",
"💡 这里的思考值得一看:"
]

# dd推送的随机表情库
RANDOM_EMOJIS = ["🌟", "✅", "🔥", "📢", "👀", "☕", "🍀"]

# 忽略未检索的future异常(避免Playwright异步操作触发无关警告)
warnings.filterwarnings("ignore", category=RuntimeWarning, message=".*Future exception was never retrieved.*")

# 全局退出信号标志:用于标记是否接收到退出指令(如Ctrl+C)
SHUTDOWN_REQUESTED = False

# 全局记录上一个处理的WOA:用于本地摘要文件的分组排版
LAST_PROCESSED_MP = None

# ========== 配置项 ==========
# dd机器人Webhook地址:用于推送摘要内容
DING_WEBHOOK_URL = "xxxxxaccess_token=xxxx"
# 调试模式:True=弹出浏览器可视化操作,False=无头模式静默运行
DEBUG_MODE = True
# 每WOA最多抓取最新文章数
MAX_ARTICLE_PER_ACCOUNT = 5
# 最多处理的WOA数量
MAX_ACCOUNT_TO_PROCESS = 8
# 仅抓取当天发布的文章(北京时间)
ONLY_TODAY_ARTICLE = True
# WOA书架地址
SHELF_URL = "xxxxx"
# 项目根目录(基于当前文件路径向上两级)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 浏览器持久化登录数据目录:用于保存WOA登录状态,避免重复扫码
USER_DATA_DIR = os.path.join(PROJECT_ROOT, "xxxxx")

# 导入time模块~~冗余导入,历史包袱~~
import time

cookie 常指网站存储在用户浏览器中的一小段文本数据,用于记录用户的登录状态或行为,后续自动携带从而保持状态、记录偏好、跟踪浏览行为。

dir:directory 文件夹、目录、名录

工具项

随机延时函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def random_sleep(min_s=2.0, max_s=5.0, reason=""):
"""
随机延时函数(异步):模拟人类操作的无规律停顿
:param min_s: 最小延时秒数,默认2.0秒
:param max_s: 最大延时秒数,默认5.0秒
:param reason: 延时原因描述,用于日志打印
"""
# 生成指定范围内的随机延时值
delay = random.uniform(min_s, max_s)
if reason:
# 打印延时原因和具体时长(保留1位小数)
print(f"⏳ {reason},随机等待 {delay:.1f} 秒...")
# 异步休眠(不阻塞事件循环)
await asyncio.sleep(delay)

这个函数用来模拟人类操作的停顿,避免操作太规整被识别成机器人~

模拟人类行为函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def simulate_human_behavior(page):
"""
模拟真实用户行为:随机鼠标移动 + 随机小滚动
:param page: Playwright的Page对象,代表浏览器标签页
"""
try:
# 1. 随机鼠标移动:获取当前视口尺寸
vp = page.viewport_size
if vp:
width = vp['width']
height = vp['height']
# 随机移动1-3次
for _ in range(random.randint(1, 3)):
# 生成视口内(100px偏移)的随机坐标
x = random.randint(100, width - 100)
y = random.randint(100, height - 100)
# 模拟鼠标移动(steps越大,移动越慢,更贴近人类)
await page.mouse.move(x, y, steps=random.randint(5, 15))
# 每次移动后短暂停顿
await asyncio.sleep(random.uniform(0.1, 0.4))

# 2. 随机轻微滚动:50%概率触发,模拟人类浏览时的犹豫/无效滚动
if random.random() > 0.5:
await page.mouse.wheel(0, random.randint(-50, 150))
except Exception:
# 忽略操作异常(如页面未加载完成导致的鼠标移动失败)
pass

用来模拟真实用户的鼠标移动和滚动,尽量让操作看起来像真人,减少被反爬的概率~

时间过滤器

1
2
3
4
5
6
7
8
9
10
#接收time_text字符串参数(时间信息)
#返回bool值
def is_today_article(time_text: str) -> bool:
"""判断是否是当天发布的文章"""
if not time_text:
return False #防御性编程
# 匹配WOA的时间格式:今天、X小时前、分钟前、刚刚
if "今天" in time_text or "小时前" in time_text or "分钟前" in time_text or "刚刚" in time_text:
return True
return False

这里比原来的版本多了对“刚刚”的判断,覆盖更多的时间表述~

文件保存器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def save_article(title, content, mp_name):
"""
保存原始文章内容到本地Markdown文件
:param title: 文章标题
:param content: 文章正文
:param mp_name: WOA名称
:return: 保存成功返回True,失败返回False
"""
try:
# 构造原始文章存储目录
base_dir = os.path.join(PROJECT_ROOT, "data/raw_articles")
# 目录不存在时创建
if not os.path.exists(base_dir):
os.makedirs(base_dir)

#清洗标题:移除文件名非法字符
safe_title=re.sub(r'[\\/:*?"<>|]','_', title).strip()
# 生成当前日期字符串(YYYYMMDD)
date_str = datetime.now().strftime("%Y%m%d")

#构造文件名:WOA_日期_标题简写.md
filename=f"{mp_name}_{date_str}_{safe_title[:15]}.md"
#取前15个字防止文件名太长报错
file_path=os.path.join(base_dir,filename)
#把文件夹和文件名拼接成完整路径

#写入markdown
#以写入模式'w'打开,指定encoDING=‘utf-8’防止中文乱码
with open(file_path,'w',encoDING='utf-8') as f:
f.write(f"---\ntitle: \"{title}\"\nmp: \"{mp_name}\"\ndate: {datetime.now()}\n---\n\n")
f.write(content)

print(f"📁 已保存至: {file_path}")
return True
except Exception as e:
print(f"❌ 存储失败: {e}")
return False
    1. re.sub(A,B,C):正则替换函数,在C里面找到符合A模式的东西,替换成B
    2. .strip()去除前后空格,内部可指定去除之物;返回新字符串,原字符串不修改
    3. r'...':前面的r表示原生字符串,防止将\当成反义字符
    4. [...]表示匹配括号里的任一个字符
  • .strtime("%Y%m%d"):表示str+f(format格式化)+time=把时间对象转成特定格式的日期字符串

  • 必须掌握的语法:

    1
    2
    3
    with open(文件路径,写入模式,编码) as 变量名:
    #读写文件模板
    #'w'表示write(写入模式)。

    with自动安全地关闭文件。

    'w':写入,不存在就新建,存在就覆盖原有内容

    1
    2
    except Exception as e:
    #跟在try后面检测“出事”

​ 意思是:如果在此之前代码报错了,不要直接卡死,而是把错误原因抓住存到变量e),执行补救措施

  • mp_name代表WOA名称:media platform

浏览器锁文件清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def clean_user_data_locks():
"""
清理Chrome用户数据目录下的崩溃锁文件:避免浏览器启动时因锁文件导致的异常
递归查找,精准匹配常见Chromium锁文件名
"""
# 目录不存在时直接返回
if not os.path.exists(USER_DATA_DIR):
return

# 常见的Chromium锁文件名(精准匹配)
lock_filenames = {"SingletonLock", "SingletonSocket", "SingletonCookie", "Lock", ".com.google.Chrome.*.lock"}

# 递归遍历目录下的所有文件
for root, dirs, files in os.walk(USER_DATA_DIR):
for filename in files:
# 匹配锁文件名(支持通配符)
if any(lock_name in filename for lock_name in lock_filenames):
filepath = os.path.join(root, filename)
try:
# 仅删除文件/软链接,不删除目录
if os.path.islink(filepath) or os.path.isfile(filepath):
os.remove(filepath)
print(f"🧹 已清除浏览器异常锁文件: {filepath}")
except Exception as e:
# 捕获删除失败异常
print(f"⚠️ 清理锁文件失败 {filepath}: {e}")

这个是用来处理浏览器异常退出后留下的锁文件,避免下次启动浏览器的时候报错,很实用的小工具~

验证码处理守卫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def handle_captcha(page):
"""
检测并等待用户手动处理验证码
:param page: Playwright的Page对象
:return: 验证通过返回True,超时/失败返回False
"""
# 引用全局退出标志:验证码处理超时后标记退出
global SHUTDOWN_REQUESTED
try:
# 验证码遮罩层常见选择器(覆盖主流验证码样式)
captcha_mask = page.locator("xxxxx")

# 快速检测(2秒超时),不拖慢正常流程
if await captcha_mask.count() > 0 and await captcha_mask.first.is_visible(timeout=2000):
print("\n🚨🚨🚨 触发人机验证!程序已自动暂停 🚨🚨🚨")
print("👉 请在弹出的浏览器窗口中手动完成滑块/点选验证...")

try:
# 5分钟(300000毫秒)超时,等待验证码隐藏(验证完成)
await captcha_mask.first.wait_for(state="hidden", timeout=300000)
print("✅ 验证通过!恢复自动执行...")
await asyncio.sleep(2) # 缓冲,模拟人类验证后的停顿
return True
except TimeoutError:
# 验证码处理超时,标记退出
print("\n⏰ 验证码处理超时(5分钟),程序将退出!")
SHUTDOWN_REQUESTED = True
return False

except Exception:
# 绝大多数时候无验证码,忽略异常
pass
return False

遇到人机验证的时候,程序会自动停下来等你手动处理,处理完了就继续跑,不用重启程序

dd推送底层请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def _request_DING(payload: dict, retry_times=3):
"""
底层dd推送请求逻辑:负责实际HTTP请求,包含指数退避重试
:param payload: dd推送的JSON请求体
:param retry_times: 重试次数,默认3次
:return: ddAPI响应(JSON格式)或None
"""
# 未配置Webhook时跳过推送
if not DING_WEBHOOK_URL:
print("❌ dd Webhook未配置,跳过推送")
return None

# 构造请求头(JSON格式)
headers = {'Content-Type': 'application/json'}
# 重试逻辑:指数退避(1s→2s→4s)
for retry in range(retry_times):
try:
# 发送POST请求(同步)
response = requests.post(DING_WEBHOOK_URL, json=payload, headers=headers, timeout=10)
# 触发HTTP错误(如404/500)
response.raise_for_status()
# 返回解析后的JSON响应
return response.json()
except Exception as e:
# 非最后一次重试时,等待后重试
if retry < retry_times - 1:
wait_time = 2 ** retry # 指数退避时间
print(f"❌ dd推送失败(第{retry+1}次): {e}{wait_time}秒后重试...")
time.sleep(wait_time)
# 最后一次重试失败,打印错误
else:
print(f"❌ dd推送重试{retry_times}次均失败: {e}")
return None

这个是推送的底层逻辑,带了重试机制,网络不好的时候也能尽量推送成功,不会丢消息

dd消息推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def send_to_DING(title: str, summary: str, mp_name: str, time_text: str):
"""
发送摘要到dd机器人:分两步(前置消息+Markdown主体)
:param title: 文章标题
:param summary: AI生成的摘要
:param mp_name: WOA名称
:param time_text: 文章发布时间
"""
# 未配置Webhook时直接返回
if not DING_WEBHOOK_URL:
return
# 1. 发送独立的前置消息(Text类型):随机化提升真实性
# 随机选择前缀语料和表情
prefix_text = random.choice(RANDOM_PREFIXES)
emoji = random.choice(RANDOM_EMOJIS)
# 拼接前置消息内容
prefix_content = f"{emoji} {prefix_text}{mp_name}】"

# 构造前置消息请求体
prefix_payload = {
"msgtype": "text",
"text": {
"content": prefix_content
}
}
# 发送前置消息
print(f"📡 正在推送前置提醒: {prefix_content}")
_request_DING(prefix_payload)

# 2. 发送主体的Markdown摘要
# 构造Markdown格式文本
markdown_text = f"### {title}\n**来源**:{mp_name} \n**时间**:{time_text} \n---\n{summary}"
# 构造Markdown请求体
main_payload = {
"msgtype": "markdown",
"markdown": {
"title": f"【摘要】{title}",
"text": markdown_text
}
}

# 发送Markdown主体消息
print(f"📡 正在推送文章摘要...")
result = _request_DING(main_payload)

# 校验推送结果
if result and result.get('errcode') == 0:
print("✅ dd全部推送成功")
else:
print(f"❌ dd摘要推送失败: {result}")

推送的时候分两步,先发个提醒,再发摘要,而且每次的提醒语都是随机的,看起来更像真人发的,不会被当成机器人消息~

AI总结接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def summarize_with_ai(title: str, content: str, mp_name: str, time_text: str):
"""
DS AI总结逻辑:调用API生成固定格式摘要,写入本地文件并推送dd
:param title: 文章标题
:param content: 文章正文
:param mp_name: WOA名称
:param time_text: 文章发布时间
:return: AI生成的摘要或None
"""
# 引用全局变量:记录上一个处理的WOA(用于文件分组)
global LAST_PROCESSED_MP
print(f"\n🤖 正在调用 DS 生成摘要:{title}...")

# DS API密钥
api_key = "xxxx"
# DS API地址
url = "xxxx"

# 提示词:强制固定格式,要求客观不浮夸
prompt = f"""
请阅读以下WOA文章,并生成一段客观、冷静、不浮夸的摘要。
只输出摘要内容,不要任何开场白。

格式严格如下(请替换内容):
事件:xx公司的xx模型/xxAPP/xx+突破xx领域/实现了xx功能+昭示当前xx领域处于xx态势

文章内容:
{content[:4000]}
"""

# 构造API请求体
payload = {
"model": "deepseek-chat", # 使用的模型名称
"messages": [
{"role": "user", "content": prompt} # 用户提示词
],
"temperature": 0.5 # 随机性参数(0-1,越低越固定)
}

# 构造请求头(携带API密钥)
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}

try:
# 异步调用同步的requests.post:避免阻塞Playwright事件循环
response = await asyncio.to_thread(requests.post, url, json=payload, headers=headers)
# 触发HTTP错误
response.raise_for_status()
# 解析响应JSON
result = response.json()
# 提取AI生成的摘要(去除首尾空格)
ai_summary = result['choices'][0]['message']['content'].strip()

# 构造写入本地文件的内容:按WOA分组
summary_content = ""
# 新WOA时,先写入WOA名称(分组)
if mp_name != LAST_PROCESSED_MP:
summary_content += f"\n{mp_name}:\n"
LAST_PROCESSED_MP = mp_name
# 追加文章信息和摘要
summary_content += f"文章标题:{title}\n"
summary_content += f"时间:{time_text}\n"
summary_content += f"{ai_summary}\n"

# 追加写入本地摘要文件(UTF-8编码避免中文乱码)
summary_file = os.path.join(PROJECT_ROOT, "AI_Daily_Summary.txt")
with open(summary_file, "a", encoDING="utf-8") as f:
f.write(summary_content)

print(f"📝 摘要已写入 {summary_file}")

# 异步调用dd推送(避免阻塞)
await asyncio.to_thread(send_to_DING, title, ai_summary, mp_name, time_text)

return ai_summary
except Exception as e:
# 捕获API调用异常
print(f"❌ DS 调用失败: {e}")
return None

这里调用DS的API来生成文章的摘要,生成完了会先写到本地的摘要文件里,然后推送到dd上,这样你不用看全文,就能快速知道文章讲了啥~~,防沉迷的效果~~

自动化发动机

1
2
3
4
5
6
7
8
9
10
11
12
graph TD
A[main: 程序入口] -->|调用| B[run_summarizer: 总指挥]
B -->|1.启动| C[launch_browser: 开浏览器+反爬]
B -->|2.登录| D[wait_for_shelf_login: 检查登录]
B -->|3.遍历| E[process_all_accounts: 遍历所有WOA]
E -->|3.1 回书架| F[navigate_to_shelf: 跳转回书架]
E -->|3.2 进列表| G[enter_article_list_page: 进入文章列表]
E -->|3.3 抓文章| H[process_account_articles: 抓单个WOA]
H -->|循环调用| I[process_single_article: 抓单篇文章]
I -->|调用| J[summarize_with_ai: AI总结预览]
I -->|调用| K[save_article: 保存文件]
I -->|调用| L[is_today_article: 时间过滤]

主程序

1
2
3
4
5
6
7
8
9
#标准程序入口:只有直接运行这个文件时,才执行下面代码(被import不执行)
if __name__=="__main__":
try:
#启动异步编程库,运行主程序
asyncio.run(run_summarizer()) #✨run_summarizer参见后文
except KeyboardInterrupt:
#如果检测到手动ctrl+c制止
pass
#空操作语句,作为语法占位符;表示忽略中断继续运行

关于asyncio的函数:

函数/类说明
asyncio.run()运行一个入口协程,自动管理事件循环,程序启动用。
asyncio.create_task()将协程包装为任务并立即调度,实现后台并发。
asyncio.gather()并发运行多个协程/任务,等待所有完成并收集结果。
asyncio.sleep()暂停当前协程指定时间,主动让出控制权给事件循环。
asyncio.wait()等待一组任务完成,可设置超时,返回已完成/未完成集合。
asyncio.Lock异步互斥锁,用于保护共享资源,防止数据竞争。
asyncio.Queue异步队列,用于生产者-消费者模式,安全传递数据。
asyncio.wait_for()为协程/任务设置超时,超时则取消并抛出异常。

总指挥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def run_summarizer():
async with async_playwright() as p: #p:遥控器
#1.启动浏览器,拿浏览器的上下文和标签
context,page=await launch_browser(p) #🌟launch_browser()实现见后文
try:
#2.访问书架,等待页面加载完成
##访问书架网址
await page.goto(SHELF_URL,wait_until="domcontentloaded")
##检查登录,未登录提供登录渠道
await wait_for_shelf_login(page) #🌟wait_for_shelf_login()实现见后文
##遍历所有WOA
await process_all_accounts(page,context) #🌟process_all_accounts()实现参见后文

##容错模板try...except...finally
except Exception as e:
print(f"❌程序运行出错:{str(e)}")
finally: #无论如何都会做
await context.close()
print("程序运行结束,浏览器关闭")

await挂起当前协程(在async def定义的异步函数内使用)

此处await配套async使得多个请求同时发送,节省时间
wait_until后的常用取值:

取值含义说明
"load"等待 load 事件触发页面的所有资源(图片、样式、脚本等)都加载完成。这是默认值,但通常比较慢。
"domcontentloaded"等待 DOMContentLoaded 事件触发初始 HTML 文档被完全加载和解析,但可能还在加载外部资源(如图片)。速度较快。
"networkidle"等待网络空闲(没有超过 0 个连接)相当于所有网络请求都结束。有 "networkidle0""networkidle2" 两种变体,后者允许少量连接。最慢但最稳妥。
"commit"等待导航被提交(收到响应头)最快,但此时页面内容可能还没开始解析。

Playwright 的 Page 对象提供了大量方法来控制浏览器页面。以下是按功能分类的最常用函数,所有方法均为异步,调用时需加 await

分类函数名说明
导航goto(url, options)跳转到指定 URL,可设置 wait_until 等选项
go_back() / go_forward()后退/前进到历史记录中的上一页/下一页
reload()刷新当前页面
元素定位query_selector(selector)返回第一个匹配 CSS 选择器的元素(ElementHandle
query_selector_all(selector)返回所有匹配元素的列表
wait_for_selector(selector, options)等待元素出现,超时则抛出异常
交互操作click(selector, options)点击指定元素
fill(selector, value)填充输入框(先清空再输入)
type(selector, text, options)逐个字符输入(模拟真实打字)
select_option(selector, values)选择下拉框选项
check(selector) / uncheck(selector)勾选/取消勾选复选框或单选按钮
hover(selector)鼠标悬停
获取内容inner_text(selector)获取元素的可见文本
inner_html(selector)获取元素的内部 HTML
text_content(selector)获取元素的文本内容(包含隐藏元素)
get_attribute(selector, name)获取元素的指定属性值
等待wait_for_timeout(timeout)等待指定毫秒数(通常不推荐,优先使用其他等待方法)
wait_for_function(pageFunction, arg, options)等待页面内函数返回真值
wait_for_load_state(state, options)等待特定加载状态(如 loaddomcontentloaded
截图与PDFscreenshot(options)截图,可指定路径、质量、全屏等
pdf(options)将页面导出为 PDF(仅无头 Chrome 支持)
执行脚本evaluate(pageFunction, arg)在页面上下文中执行 JavaScript 函数,返回序列化的结果
evaluate_handle(pageFunction, arg)同上,但返回 JSHandle(可操作复杂对象)
Cookiescontext.cookies(urls)通过 page.context 获取 Cookies
context.add_cookies(cookies)添加 Cookies
事件监听on(event, callback)监听页面事件(如 'dialog''popup''request'
其他url 属性获取当前页面 URL
title()获取页面标题
content()获取整个页面的 HTML
set_viewport_size(size)设置视口大小
close()关闭页面,释放资源

提示:以上只是最常用的一部分,Playwright 的 API 非常丰富,完整列表可查阅 官方文档

启动浏览器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def launch_browser(p): #传进来的playwright遥控器
# 启动前清理锁文件:避免浏览器启动异常
clean_user_data_locks()
# 启动带持久化数据的浏览器上下文(保留登录状态)
context = await p.chromium.launch_persistent_context(
user_data_dir=USER_DATA_DIR, # 持久化登录数据目录
headless="new" if not DEBUG_MODE else False, # 调试模式下显示浏览器
args=[
'--no-sandbox', # 禁用沙箱(Linux环境必要)
'--disable-blink-features=AutomationControlled', # 禁用自动化特征
'--disable-infobars', # 禁用信息栏(如“Chrome正被自动化软件控制”)
'--window-size=1440,900', # 设置窗口尺寸
# 自定义UA:模拟真实Chrome浏览器(需与实测一致)
'--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36'
],
ignore_https_errors=True, # 忽略HTTPS错误(如自签名证书)
viewport={"width": 1440, "height": 900}, # 设置视口尺寸
locale='zh-CN', # 设置语言为中文
timezone_id='Asia/Shanghai' # 设置时区为上海(北京时间)
)

# 抹除webdriver属性(避免被检测为自动化),保留真实window.chrome
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
""")

# 获取默认页面:优先使用已存在的页面,无则新建
page = context.pages[0] if context.pages else await context.new_page()

print("🚀 浏览器已以真实指纹启动,正在访问WOA书架...")
return context, page

contextBrowserContext 类的对象,可以认为是一个独立的浏览器会话(类似于无痕模式下的隔离环境)。拥有自己的 cookies、缓存和页面,常用于模拟多用户或多标签页操作

函数说明
new_page()创建一个新页面(标签页)。
cookies([urls])获取当前上下文的 cookies。
add_cookies(cookies)添加 cookies。
clear_cookies()清除所有 cookies。
add_init_script(script)在每个页面加载前执行指定脚本(常用于绕过反爬)。
expect_page(options)监听并等待一个新页面被创建(如点击链接打开新标签页)。
pages()返回当前上下文所有打开的页面列表。
close()关闭上下文,释放资源。
storage_state(**kwargs)获取当前上下文的存储状态(cookies、localStorage),可用于保存登录态。

此外,还可以通过 context 设置全局的 timeoutviewport 等属性

  • p.chromium:「用 Playwright 启动 Chrome 浏览器」(Playwright 还支持 Firefox、Safari)

  • launch_persistent_context:记住登录状态

  • context:「浏览器上下文」整个浏览器窗口的遥控,可以管理多个标签页、设置全局反爬

  • 内部参数:

    • 对应配置项USER_DATA_DIR = "./wechat_workdir"(删去后无cookie,每次都要手动登录)
    • headless="new" if not DEBUG_MODE else False对应配置项:DEBUG_MODE = True (“new”表示无头模式)
  • add_init_script:意思是「添加初始化脚本」。

    作用:在每一个网页加载之前,先执行这段 JavaScript 代码,在网页还没反应过来先机器人标记抹掉。

    1
    Object.defineProperty(navigator, 'webdriver', {get: () => undefined})
    • 所有自动化浏览器(Playwright、Selenium),都会在网页里留一个「机器人指纹」:navigator.webdriver = true,改为undefined去除标记

检查登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def wait_for_shelf_login(page):
"""
等待书架加载并处理登录逻辑:检测登录状态,未登录则等待用户扫码
:param page: Playwright的Page对象
"""
# 等待WOA卡片加载完成
print("正在检查登录状态...")
try:
# 5秒内等待书架元素出现(已登录状态)
await page.wait_for_selector("xxxxx", timeout=5000)
except:
# 未检测到书架元素,提示用户扫码登录
print("⚠️ 未检测到书架元素,请在弹出的浏览器中手动扫码登录...")
# 扫码前检查验证码
await handle_captcha(page)

# 给用户60秒扫码时间
await page.wait_for_selector("xxxxx", timeout=60000)
print("书架加载完成,正在获取WOA列表...")

如何确定xxxxx:F12找到找到“元素”<a class="xxxxx" ...>WOA名称</a>

  • 构造选择器
    • 标签名是 a(超链接)。
    • 类名是 xxxxx(注意有时可能有多个类名,用点号连接)。
    • 因此 CSS 选择器可以写成 "xxxxx",表示“所有 class 包含对应标识的 <a> 元素”。
  • 验证选择器
    在开发者工具的“控制台”(Console)中,输入 document.querySelectorAll("xxxxx"),如果返回的元素列表与页面上的WOA卡片数量一致,说明选择器正确。

遍历WOA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def process_all_accounts(page,context):
#1.定义书架上的元素选择器
account_selector="xxxxx"

#2.第一次获取:统计多少WOA元素
accounts_initial=await page.query_selector_all(account_selector)
total_account=len(accounts_initial)
print(f"共找到{total_account}个元素")

#3.逐一处理每一个WOA
for account_idx in range(total_account):
#3.1 跳转回书架页面(防卡顿)
goto_success=await navigate_to_shelf(page) #🌙参见后文详解
if not goto_success:
print("false")
break

#3.2强制刷新,获取最新列表(感觉似乎没什么必要)
# await page.goto(SHELF_URL)
# await page.wait_for_selector(account_selector)

# 3.3 重新获取WOA元素(每次循环都刷新,防止报错)
accounts = await page.query_selector_all(account_selector)
if account_idx >= len(accounts):
break

#3.4 拿到当前要处理的WOA
current_account = accounts[account_idx]

# 3.5 获取WOA名称(清理换行、空格)
account_name = await current_account.inner_text()
account_name = account_name.replace("\n", "").strip()
print(f"---------- 正在处理WOA:{account_name} ----------")

# 3.6 点击进入WOA
await current_account.click()

# 3.7 检查:是否符合需求内容,如是否WOA
try:
await page.wait_for_selector("xxxxx", timeout=10000)
except:
print(f"⚠️ {account_name} 不是,跳过")
# 不是就退回书架,继续下一个
await page.goto(SHELF_URL)
await page.wait_for_selector(account_selector, timeout=5000)
continue #跳过当前循环的剩余代码,直接下一个WOA

# 3.8 进入文章列表页(处理新标签页)
article_list_page = await enter_article_list_page(page, context) #🌙详见后文

# 3.9 【交给下级】获取并处理该WOA所有文章
await process_account_articles(article_list_page, account_name) #🌙详见后文(主要处理方法)
  1. current_account 是什么类型的对象?
1
2
accounts = await page.query_selector_all(account_selector)
current_account = accounts[account_idx]
  • page.query_selector_all() 返回的是一个 ElementHandle 对象的列表
  • ElementHandle 是 Playwright 中代表页面中真实 DOM 元素的句柄,可以通过它来获取元素的属性、文本、点击元素等。
  • 这里代表当前索引的WOA卡片元素。
  1. inner_text() 是成员函数吗?
1
account_name = await current_account.inner_text()
  • 是的inner_text()ElementHandle 类的一个异步成员方法
  • 它的作用是获取该 DOM 元素的可见文本内容(JavaScript 中的 element.innerText
  1. 关于3.7
    - 关于xxxxx:文章页顶部的一个元素(通常是WOA名称或标题链接),标志一个正常的WOA文章页已经加载完成
    - await page.wait_for_selector(account_selector) 是 Playwright 中的一个等待方法,作用是一直等待,直到页面上出现能够匹配 account_selector 的 DOM 元素

  2. 关于DOM元素

    - **DOM** 的全称 Document Object Model(文档对象模型)。当浏览器打开一个网页时,它会将 HTML 代码转换成一个树形结构,这个结构就是 DOM。
    - **元素** 是 DOM 树中的基本组成部分,对应 HTML 中的各种标签。比如:
      - `<div>` 是一个元素
      - `<a>` 是一个元素
      - `<p>` 是一个元素
      - `<img>` 也是一个元素
    

返回书架自重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def navigate_to_shelf(page):
"""导航回书架并确保元素加载,自重试防网络波动"""
account_selector = "xxxxx"
retry_count = 0
goto_success = False
# 最多重试3次
while retry_count < 3 and not goto_success:
try:
# 跳转到书架,只等待DOM加载完成(不等图片)
await page.goto(SHELF_URL, wait_until="domcontentloaded", timeout=10000)
# 确认书架元素加载出来
await page.wait_for_selector(account_selector, timeout=5000)
goto_success = True
except Exception as e:
print(f"跳转书架失败,重试第{retry_count+1}次")
retry_count += 1
await asyncio.sleep(1) # 歇1秒再试,防被强制fh
return goto_success

进入文章列表

  • 点击小标题打开文章列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def enter_article_list_page(page, context):
try:
# 监听:点击后是否弹出【新标签页】
async with context.expect_page(timeout=5000) as new_page_info:
# 点击WOA标题,展开文章列表
await page.click("xxxxx")
# 获取新打开的标签页
article_list_page = await new_page_info.value
except:
# 如果没有新标签页,就用当前页
article_list_page = page
# 等待文章列表元素出现(5秒超时)
await article_list_page.wait_for_selector("xxxxx", state="visible", timeout=5000)
return article_list_page
  1. 关于"xxxxx",F12检查该元素,发现标签是 <span>,并且类名包含对应标识,因此写此用于唯一标识

  2. expect_page():开始监听浏览器上下文中是否有新页面被创建(例如点击一个链接打开了新标签页)

  3. new_page_infoexpect_page 上下文管理器返回的事件信息对象,它包含新页面的相关信息

  4. .value 是该对象的一个可等待属性,最终返回新创建的 Page 对象(Playwright 的页面类),此处为了获取真正的 Page 实例,赋值给 article_list_page

循环获取该WOA上的所有文章

1
2
3
4
5
6
7
async def process_account_articles(article_list_page, account_name):
# 循环:最多的篇数
for idx in range(min(MAX_ARTICLE_PER_ACCOUNT, 20)):
should_continue = await process_single_article(article_list_page, account_name, idx) # 调用最底层函数:抓【单篇】☀️详见后文
# 如果返回False,说明文章抓完了,直接结束循环
if not should_continue:
break

终极战力

对于单篇文章的过滤处理

处理单篇文章的全流程:

打开列表(之前函数) → 选中文章 → 加载正文 → 智能提取 → 保存文件 → AI 预览→回到列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def process_single_article(article_list_page,account_name,idx):
#1.定义元素定位符
catalog_toggle_selector = "xxxxx" # 展开列表按钮
catalog_container_selector = "xxxxx" # 文章列表容器

#item_selector = "li.xxxxx" # 单篇文章条目,但这个是错误的,因为这只是一个包含三篇文章的大框
# 因此重新定义选择器:同时选中头条和副条
# 逗号在 CSS 选择器里代表“或”
target_selector = "xxxxx, xxxxx"


#2.保证文章列表展开
try:
# 检查列表是否隐藏
is_list_visible = await article_list_page.is_visible(catalog_container_selector)
if not is_list_visible:
# 隐藏就点击展开
await article_list_page.click(catalog_toggle_selector)
# 等待列表渲染完成
await article_list_page.wait_for_selector(catalog_container_selector, state="visible", timeout=5000)
except Exception as e:
print(f"⚠️ 列表呼出失败,跳过当前WOA | 错误: {e}")
return False # 返回False=停止抓此WOA

#3.直接获取所有文章列表
current_items = await article_list_page.query_selector_all(target_selector)
# 如果序号超过文章总数,说明抓完了
if idx >= len(current_items):
print("✅ 已遍历完所有可抓取的文章")
return False

# 定位到当前要抓的文章
current_item = current_items[idx]

# 滚动到元素位置,防止点击不到
await current_item.scroll_into_view_if_needed()

title_el = await current_item.query_selector("xxxxx, xxxxx")
if not title_el:
return True # 还是没找到就跳过

preview_title = (await title_el.inner_text()).strip()


#4.时间过滤逻辑(优化版)
if ONLY_TODAY_ARTICLE:
# 1. 先尝试在当前文章块里找时间(适用于头条)
time_el = await current_item.query_selector("xxxxx")

# 2. 如果没找到(说明是副条),则向上找父级 li 容器里的时间
if not time_el:
# 使用 evaluate 并在 JS 层级向上找最近的 .xxxxx 里的时间
time_text = await current_item.evaluate("""(node) => {
const parentLi = node.closest('xxxxx');
const timeNode = parentLi ? parentLi.querySelector('xxxxx') : null;
return timeNode ? timeNode.innerText : "";
}""")
else:
time_text = await time_el.inner_text()
#兜底代码
if not time_text:
print(f"⚠️ 无法获取时间,跳过: {preview_title}")
return True
# 3. 判断并过滤
if not is_today_article(time_text):
print(f"⏭️ 跳过旧文: {preview_title} ({time_text})")
return True


#5.点击切换正文(优化版:点击整个文章块,更稳定)
print(f"🚀 正在加载文章: {preview_title}")
try:
await current_item.click()
except Exception as e:
print(f"⚠️ 文章点击失败,跳过 | 标题: {preview_title} | 错误: {e}")
return True

#5.5验证登录文章内部
try:
# 等待文章正文加载
await article_list_page.wait_for_selector("xxxxx", timeout=10000)
print("✅ 文章加载成功,开始提取正文")
except Exception as e:
print(f"❌ 文章加载超时,跳过: {preview_title}")
return True

于是,自动小助手便大功告成啦极度疲惫