2026年Python网络爬虫完全指南:10个实战案例教你合法抓取数据,新手也能快速上手
在数据驱动的2026年,Python爬虫依然是最受欢迎的数据采集技术之一。无论是做市场分析、竞品调研还是学术研究,掌握Python网络爬虫技能都能让你事半功倍。本文将从新手入门开始,通过10个可直接运行的实战案例,带你系统掌握Python爬虫的核心技术,同时了解合法合规的数据采集边界。
一、新手入门指南:30分钟快速搭建开发环境
1. 环境安装与基础配置
开始学习Python爬虫前,你只需要准备好基础开发环境:
**Python版本**:推荐使用Python 3.10+,下载地址:https://www.python.org/downloads/
**必备库安装**:
“`bash
# 安装基础爬虫库
pip install requests beautifulsoup4 lxml
# 安装动态网页爬取库
pip install selenium playwright
# 安装数据解析与存储库
pip install pandas aiohttp httpx
# 初始化playwright浏览器驱动
playwright install chromium
“`
**IDE选择**:新手推荐使用VS Code + Python插件,专业开发者可以选择PyCharm。
2. 必须遵守的合规原则
在开始任何爬虫项目前,你必须了解以下规则,避免法律风险:
**robots协议**:访问目标网站的`/robots.txt`文件,明确哪些页面允许抓取,哪些禁止抓取。例如:
“`
User-agent: *
Disallow: /admin/
Allow: /public/
“`
表示所有爬虫都不能抓取`/admin/`路径下的内容,但可以抓取`/public/`路径。
**法律合规**:2025年最新《网络数据安全管理条例》明确规定,爬虫不得窃取个人信息、不得破坏网站正常运行、不得用于商业牟利的非法数据交易。建议在抓取前阅读目标网站的《服务条款》。
**合理访问频率**:控制请求间隔在1-3秒以上,避免给服务器造成过大压力,同时减少被封禁的风险。
二、10个实战Python爬虫案例(可直接运行)
所有案例均基于2026年最新的Python生态开发,兼容性强,注释完整,新手可以直接复制运行。
案例1:静态网页基础抓取(requests + BeautifulSoup)
场景:抓取静态网页的标题、正文等基础信息,适合新闻网站、博客等内容型网站。
import requests
from bs4 import BeautifulSoup
import time
def get_static_page(url: str) -> dict:
"""
抓取静态网页内容
:param url: 目标网页URL
:return: 包含标题、正文、发布时间的字典
"""
# 伪装浏览器请求头,避免被反爬识别
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9"
}
try:
# 发送GET请求,超时时间10秒
response = requests.get(url, headers=headers, timeout=10)
# 检查请求是否成功
response.raise_for_status()
# 设置正确的编码
response.encoding = response.apparent_encoding
# 解析HTML内容
soup = BeautifulSoup(response.text, "lxml")
# 提取页面标题
title = soup.title.string.strip() if soup.title else "无标题"
# 提取正文(这里以p标签为例,不同网站结构不同,需要调整选择器)
content = "\n".join([p.get_text().strip() for p in soup.find_all("p") if p.get_text().strip()])
# 提取发布时间(根据实际网站调整选择器)
publish_time = soup.find("span", class_="publish-time").get_text().strip() if soup.find("span", class_="publish-time") else "未知"
return {
"title": title,
"content": content,
"publish_time": publish_time,
"url": url
}
except Exception as e:
print(f"抓取失败:{str(e)}")
return None
# 使用示例
if __name__ == "__main__":
result = get_static_page("https://example.com/article/123")
if result:
print(f"标题:{result['title']}")
print(f"发布时间:{result['publish_time']}")
print(f"正文摘要:{result['content'][:100]}...")
# 控制请求频率
time.sleep(2)
使用说明:只需修改URL和对应的HTML选择器,即可适配大多数静态网站。
案例2:动态网页爬取(Playwright)
场景:抓取需要JavaScript渲染的动态网页,例如加载更多、滚动加载的内容。
from playwright.sync_api import sync_playwright
import time
def get_dynamic_page(url: str, scroll_times: int = 3) -> list:
"""
抓取动态网页内容,支持滚动加载
:param url: 目标网页URL
:param scroll_times: 滚动次数,用于加载更多内容
:return: 抓取到的内容列表
"""
with sync_playwright() as p:
# 启动浏览器,headless=False可以看到浏览器操作过程,调试时使用
browser = p.chromium.launch(headless=True)
page = browser.new_page(user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36")
try:
# 访问页面
page.goto(url, timeout=30000)
# 等待页面加载完成
page.wait_for_load_state("networkidle")
# 模拟滚动加载更多内容
for i in range(scroll_times):
# 滚动到页面底部
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
# 等待加载
time.sleep(2)
# 等待新内容加载完成
page.wait_for_load_state("networkidle")
print(f"第{i+1}次滚动完成")
# 提取列表内容(以商品列表为例,根据实际网站调整选择器)
items = page.query_selector_all(".product-item")
results = []
for item in items:
name = item.query_selector(".product-name").inner_text().strip() if item.query_selector(".product-name") else "无名称"
price = item.query_selector(".product-price").inner_text().strip() if item.query_selector(".product-price") else "无价格"
results.append({"name": name, "price": price})
return results
except Exception as e:
print(f"动态页面抓取失败:{str(e)}")
return []
finally:
# 关闭浏览器
browser.close()
# 使用示例
if __name__ == "__main__":
products = get_dynamic_page("https://example.com/products", scroll_times=2)
print(f"共抓取到{len(products)}个商品:")
for product in products[:5]:
print(f"{product['name']} - {product['price']}")
使用说明:Playwright是2026年最流行的动态网页爬取工具,比Selenium更稳定、速度更快。
案例3:API接口数据抓取
场景:直接调用网站的API接口获取数据,效率比解析HTML高得多。
import requests
import json
import time
def get_api_data(api_url: str, params: dict = None) -> dict:
"""
调用API接口获取数据
:param api_url: API接口地址
:param params: 请求参数
:return: 接口返回的JSON数据
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Referer": "https://example.com/" # 很多API会校验来源
}
try:
response = requests.get(api_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
# 解析JSON数据
return response.json()
except json.JSONDecodeError:
print("接口返回不是JSON格式")
return None
except Exception as e:
print(f"API调用失败:{str(e)}")
return None
# 使用示例
if __name__ == "__main__":
# 示例:获取某电商平台的商品列表API
api_url = "https://api.example.com/products"
params = {
"category": "electronics",
"page": 1,
"page_size": 20
}
data = get_api_data(api_url, params)
if data and data.get("code") == 200:
products = data.get("data", {}).get("products", [])
print(f"接口返回{len(products)}个商品")
for product in products:
print(f"{product['title']} - {product['price']}元")
time.sleep(1)
使用说明:通过浏览器开发者工具的”网络”面板可以找到网站的API接口,这种方式抓取效率最高。
案例4:图片批量下载
场景:批量下载网页中的图片,适合素材收集、壁纸下载等场景。
import requests
import os
from bs4 import BeautifulSoup
from urllib.parse import urljoin
def download_images(url: str, save_dir: str = "images") -> int:
"""
批量下载网页中的图片
:param url: 目标网页URL
:param save_dir: 图片保存目录
:return: 成功下载的图片数量
"""
# 创建保存目录
os.makedirs(save_dir, exist_ok=True)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Referer": url # 很多图片服务器会校验来源,避免403错误
}
try:
# 获取网页内容
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "lxml")
# 提取所有图片标签
img_tags = soup.find_all("img")
count = 0
for img in img_tags:
img_url = img.get("src") or img.get("data-src") # 很多网站用data-src做懒加载
if not img_url:
continue
# 补全相对路径
img_url = urljoin(url, img_url)
# 过滤非图片链接
if not img_url.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".webp")):
continue
try:
# 下载图片
img_response = requests.get(img_url, headers=headers, timeout=10)
img_response.raise_for_status()
# 生成文件名
filename = os.path.join(save_dir, f"img_{count+1}_{os.path.basename(img_url).split('?')[0]}")
# 保存图片
with open(filename, "wb") as f:
f.write(img_response.content)
count += 1
print(f"已下载:{filename}")
except Exception as e:
print(f"下载图片失败{img_url}:{str(e)}")
continue
return count
except Exception as e:
print(f"页面抓取失败:{str(e)}")
return 0
# 使用示例
if __name__ == "__main__":
total = download_images("https://example.com/gallery", save_dir="travel_photos")
print(f"共成功下载{total}张图片")
使用说明:注意图片的版权问题,不要下载有版权保护的图片用于商业用途。
案例5:数据保存为CSV格式
场景:将抓取到的数据保存为CSV文件,方便用Excel或Pandas分析。
import csv
from typing import List, Dict
def save_to_csv(data: List[Dict], filename: str = "data.csv") -> bool:
"""
将数据保存为CSV文件
:param data: 数据列表,每个元素是字典
:param filename: 保存的文件名
:return: 是否保存成功
"""
if not data:
print("数据为空,无需保存")
return False
try:
# 获取所有字段名
fieldnames = data[0].keys()
# 写入CSV文件,使用utf-8编码带BOM,避免Excel打开乱码
with open(filename, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据
writer.writerows(data)
print(f"数据已成功保存到{filename},共{len(data)}条记录")
return True
except Exception as e:
print(f"保存CSV失败:{str(e)}")
return False
# 使用示例
if __name__ == "__main__":
# 模拟抓取到的商品数据
products = [
{"name": "手机", "price": 3999, "brand": "小米", "sales": 10000},
{"name": "笔记本电脑", "price": 5999, "brand": "华为", "sales": 5000},
{"name": "平板", "price": 2999, "brand": "苹果", "sales": 8000}
]
save_to_csv(products, "products.csv")
使用说明:CSV是最通用的数据格式之一,几乎所有数据分析工具都支持。
案例6:数据保存为JSON格式
场景:保存结构化数据,适合后续程序调用或API返回。
import json
from typing import List, Dict
def save_to_json(data: List[Dict], filename: str = "data.json", indent: int = 4) -> bool:
"""
将数据保存为JSON文件
:param data: 数据列表
:param filename: 保存的文件名
:param indent: 缩进空格数,方便阅读
:return: 是否保存成功
"""
if not data:
print("数据为空,无需保存")
return False
try:
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
print(f"数据已成功保存到{filename}")
return True
except Exception as e:
print(f"保存JSON失败:{str(e)}")
return False
# 使用示例
if __name__ == "__main__":
articles = [
{"title": "Python入门教程", "author": "张三", "publish_time": "2026-01-01", "views": 10000},
{"title": "爬虫进阶技巧", "author": "李四", "publish_time": "2026-02-01", "views": 8000}
]
save_to_json(articles, "articles.json")
使用说明:JSON格式保留了数据的结构,适合在程序之间传递数据。
案例7:异步爬虫(aiohttp)
场景:需要批量抓取大量页面时,异步爬虫可以大幅提高效率。
import asyncio
import aiohttp
import time
from typing import List
async def fetch_url(session: aiohttp.ClientSession, url: str) -> str:
"""
异步获取单个页面内容
:param session: aiohttp会话
:param url: 目标URL
:return: 页面内容
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
try:
async with session.get(url, headers=headers, timeout=10) as response:
if response.status == 200:
return await response.text()
else:
print(f"请求{url}失败,状态码:{response.status}")
return ""
except Exception as e:
print(f"异步请求失败{url}:{str(e)}")
return ""
async def async_crawler(urls: List[str]) -> List[str]:
"""
异步爬虫主函数
:param urls: 待抓取的URL列表
:return: 页面内容列表
"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 使用示例
if __name__ == "__main__":
# 待抓取的URL列表
urls = [
"https://example.com/page/1",
"https://example.com/page/2",
"https://example.com/page/3",
"https://example.com/page/4",
"https://example.com/page/5"
]
start_time = time.time()
# 运行异步爬虫
contents = asyncio.run(async_crawler(urls))
end_time = time.time()
print(f"异步抓取完成,共抓取{len(contents)}个页面,耗时{end_time - start_time:.2f}秒")
# 对比:同步抓取5个页面需要至少10秒,异步只需要2-3秒
使用说明:异步爬虫适合批量抓取场景,但要注意控制并发数,避免给服务器造成过大压力。
案例8:数据去重处理
场景:避免重复抓取相同内容,节省存储空间和时间。
import hashlib
from typing import Any
class DataDeduplicator:
"""数据去重器,基于MD5哈希实现"""
def __init__(self, cache_file: str = "deduplication_cache.txt"):
self.cache_file = cache_file
# 加载已有的哈希缓存
self.cache = self._load_cache()
def _load_cache(self) -> set:
"""加载本地缓存"""
cache = set()
try:
with open(self.cache_file, "r", encoding="utf-8") as f:
for line in f:
cache.add(line.strip())
except FileNotFoundError:
pass
return cache
def _save_cache(self, md5_hash: str):
"""保存新的哈希到本地缓存"""
with open(self.cache_file, "a", encoding="utf-8") as f:
f.write(f"{md5_hash}\n")
def is_duplicate(self, content: Any) -> bool:
"""
检查内容是否重复
:param content: 待检查的内容,可以是字符串、字典等
:return: 是否重复
"""
# 将内容转换为字符串
if isinstance(content, dict):
content_str = str(sorted(content.items()))
else:
content_str = str(content)
# 计算MD5哈希
md5_hash = hashlib.md5(content_str.encode("utf-8")).hexdigest()
# 检查是否已存在
if md5_hash in self.cache:
return True
else:
self.cache.add(md5_hash)
self._save_cache(md5_hash)
return False
# 使用示例
if __name__ == "__main__":
deduplicator = DataDeduplicator()
# 模拟抓取到的内容
contents = [
"这是第一篇文章",
"这是第二篇文章",
"这是第一篇文章", # 重复内容
"这是第三篇文章"
]
unique_contents = []
for content in contents:
if not deduplicator.is_duplicate(content):
unique_contents.append(content)
else:
print(f"发现重复内容:{content}")
print(f"去重后剩余{len(unique_contents)}条内容")
使用说明:在增量抓取场景中,数据去重是必不可少的步骤,可以避免重复处理相同数据。
案例9:IP代理使用
场景:解决IP封禁问题,提高爬虫的稳定性。
import requests
from typing import Optional
def get_proxy() -> Optional[dict]:
"""
获取代理IP,这里可以替换为你的代理服务商API
:return: 代理配置字典
"""
# 示例:从代理服务商获取IP(实际使用时替换为真实的API)
# proxy_api = "https://api.proxy-provider.com/get_ip?key=your_key"
# response = requests.get(proxy_api)
# ip = response.text.strip()
# 本地测试用,实际使用时注释掉下面这行
ip = "123.45.67.89:8080"
if ip:
return {
"http": f"http://{ip}",
"https": f"http://{ip}"
}
return None
def crawl_with_proxy(url: str, use_proxy: bool = True) -> Optional[str]:
"""
使用代理IP抓取页面
:param url: 目标URL
:param use_proxy: 是否使用代理
:return: 页面内容
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
proxies = get_proxy() if use_proxy else None
try:
response = requests.get(url, headers=headers, proxies=proxies, timeout=15)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response.text
except Exception as e:
print(f"使用代理抓取失败:{str(e)}")
# 失败后尝试不使用代理
if use_proxy:
print("尝试不使用代理重新抓取...")
return crawl_with_proxy(url, use_proxy=False)
return None
# 使用示例
if __name__ == "__main__":
content = crawl_with_proxy("https://example.com")
if content:
print(f"抓取成功,内容长度:{len(content)}字符")
使用说明:选择正规的代理服务商,避免使用免费代理,稳定性差且有安全风险。
案例10:爬虫异常处理与重试机制
场景:提高爬虫的稳定性,处理网络波动、临时故障等问题。
import requests
import time
from functools import wraps
def retry(max_retries: int = 3, delay: int = 2):
"""
重试装饰器
:param max_retries: 最大重试次数
:param delay: 重试间隔(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
print(f"第{retries}次重试,错误:{str(e)}")
time.sleep(delay)
print(f"重试{max_retries}次后仍然失败")
return None
return wrapper
return decorator
@retry(max_retries=3, delay=2)
def robust_crawler(url: str) -> Optional[str]:
"""
健壮的爬虫函数,自带重试机制
:param url: 目标URL
:return: 页面内容
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response.text
# 使用示例
if __name__ == "__main__":
content = robust_crawler("https://example.com")
if content:
print("抓取成功")
使用说明:合理的重试机制可以大大提高爬虫的成功率,特别是在网络不稳定的情况下。
三、新手常见踩坑指南
1. 反爬规避技巧
**请求头伪装**:务必设置完整的User-Agent、Referer、Accept-Language等请求头,模拟真实浏览器行为。
**请求间隔随机化**:不要使用固定的请求间隔,使用`time.sleep(random.uniform(1, 3))`让间隔在1-3秒之间随机。
**Cookie管理**:对于需要登录的网站,使用requests.Session()自动管理Cookie,避免频繁登录。
**避免规律操作**:不要每天固定时间抓取,不要每次都按相同顺序抓取页面,模拟人类的随机行为。
2. IP封禁处理方案
**降低请求频率**:如果出现IP封禁,首先降低请求频率,间隔改为5-10秒。
**使用代理IP**:购买正规的代理IP服务,轮流使用不同IP抓取。
**更换网络环境**:如果是家庭网络,可以重启路由器获取新的IP地址。
**使用云服务器**:在云服务器上部署爬虫,避免本地IP被封禁影响正常上网。
3. 数据去重实用技巧
**URL去重**:对于列表页抓取,首先对URL进行去重,避免重复抓取相同页面。
**内容哈希去重**:对于内容页,使用MD5哈希对正文内容进行去重,即使URL不同但内容相同也能识别。
**数据库唯一索引**:将数据存入数据库时,对关键信息(如文章ID、商品ID)设置唯一索引,避免重复插入。
四、Python爬虫变现盈利方向
1. 数据采集接单
目前各大接单平台都有大量数据采集需求,常见的订单类型包括:
电商商品数据抓取(价格、销量、评价)
企业信息、工商信息抓取
新闻资讯、行业数据抓取
点评、评论、舆情数据抓取
接单渠道:猪八戒网、一品威客、技术社群、淘宝闲鱼等,新手入门每月可以赚3000-8000元。
2. 行业报告制作
通过Python爬虫采集细分行业的公开数据,制作行业分析报告出售:
电商行业报告:分析品类销量趋势、价格走势、热门产品
房地产行业报告:分析房价走势、成交量、库存数据
餐饮行业报告:分析热门菜品、消费趋势、门店分布
盈利方式:在知识付费平台出售报告,或为企业提供定制化分析服务,单份报告售价几百到几千元不等。
3. 竞品分析服务
为企业提供竞品监测服务,帮助企业了解竞争对手动态:
竞品价格监测:实时监控竞争对手的产品价格变化,及时调整定价策略
竞品上新监测:第一时间获取竞争对手的新产品信息
营销活动监测:分析竞争对手的营销活动效果,为企业提供参考
收费模式:按月收费,每月服务费几千到几万元,适合有一定客户资源的开发者。
五、总结
Python爬虫作为2026年最实用的编程技能之一,不仅能帮助你高效获取数据,还能带来实实在在的收入。但请始终记住:合法合规是爬虫的底线,在享受技术带来的便利的同时,一定要遵守相关法律法规,尊重网站的robots协议,合理控制访问频率,做一个负责任的网络爬虫开发者。
如果你想深入学习Python爬虫,可以关注后续的进阶教程,我们会分享更多关于分布式爬虫、验证码识别、反反爬技巧等高级内容。