Appearance
Python 爬虫面试题
1. 爬虫基础
问题:什么是网络爬虫?爬虫的基本工作流程是什么?
答案:
网络爬虫是一种自动获取网页内容的程序,通过模拟浏览器行为,批量抓取互联网信息。
基本工作流程:
- 发送 HTTP 请求获取网页
- 解析 HTML 提取数据
- 存储数据
- 发现新链接,继续爬取
python
import requests
from bs4 import BeautifulSoup
# 基本爬虫示例
url = 'https://example.com'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 提取标题
title = soup.find('title').text
# 提取所有链接
links = [a['href'] for a in soup.find_all('a', href=True)]2. Requests 库
问题:Requests 库如何使用?有哪些常用功能?
答案:
python
import requests
# 1. 基本请求
response = requests.get('https://api.github.com')
response = requests.post('https://httpbin.org/post', data={'key': 'value'})
response = requests.put('https://httpbin.org/put', data={'key': 'value'})
response = requests.delete('https://httpbin.org/delete')
# 2. 请求参数
params = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://httpbin.org/get', params=params)
# 3. 请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
response = requests.get('https://example.com', headers=headers)
# 4. 表单数据
data = {'username': 'admin', 'password': '123456'}
response = requests.post('https://httpbin.org/post', data=data)
# 5. JSON 数据
json_data = {'key': 'value'}
response = requests.post('https://httpbin.org/post', json=json_data)
# 6. 文件上传
files = {'file': open('report.xls', 'rb')}
response = requests.post('https://httpbin.org/post', files=files)
# 7. Cookies
cookies = {'session_id': '123456'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
# 获取响应 cookies
print(response.cookies)
# 8. Session(保持会话)
session = requests.Session()
session.headers.update({'User-Agent': 'MyBot/1.0'})
# 登录
session.post('https://example.com/login', data={'user': 'admin', 'pass': '123'})
# 后续请求自动携带 cookies
response = session.get('https://example.com/profile')
# 9. 代理
proxies = {
'http': 'http://10.10.1.10:3128',
'https': 'http://10.10.1.10:1080'
}
response = requests.get('https://example.com', proxies=proxies)
# 10. 超时和重试
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
response = session.get('https://example.com', timeout=5)
# 11. SSL 验证
response = requests.get('https://example.com', verify=False) # 禁用 SSL 验证
response = requests.get('https://example.com', verify='/path/to/certfile') # 自定义证书
# 12. 流式下载
response = requests.get('https://example.com/largefile.zip', stream=True)
with open('largefile.zip', 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)3. BeautifulSoup 解析
问题:如何使用 BeautifulSoup 解析 HTML?
答案:
python
from bs4 import BeautifulSoup
import requests
html = """
<html>
<head><title>Example</title></head>
<body>
<div class="content">
<h1 id="title">Hello World</h1>
<p class="text">This is a paragraph.</p>
<a href="https://example.com">Link</a>
<ul>
<li>Item 1</li>
<li>Item 2</li>
</ul>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html, 'html.parser')
# 或使用 lxml 解析器(更快)
# soup = BeautifulSoup(html, 'lxml')
# 1. 基本查找
soup.title # <title>Example</title>
soup.title.text # Example
soup.h1 # 第一个 h1
soup.find('h1') # 第一个 h1
soup.find_all('p') # 所有 p 标签
# 2. 通过属性查找
soup.find('div', class_='content') # 注意 class_
soup.find('h1', id='title')
soup.find(attrs={'data-id': '123'})
# 3. CSS 选择器
soup.select('div.content') # class 为 content 的 div
soup.select('#title') # id 为 title 的元素
soup.select('div > p') # div 下的直接子 p
soup.select('a[href]') # 有 href 属性的 a
soup.select('li:nth-of-type(2)') # 第二个 li
# 4. 获取内容
element = soup.find('h1')
element.text # 文本内容
element.get_text() # 文本内容(可指定分隔符)
element.string # 直接子文本
# 获取属性
link = soup.find('a')
link['href'] # https://example.com
link.get('href') # https://example.com
link.attrs # 所有属性字典
# 5. 遍历文档树
soup.find('div').children # 直接子节点
soup.find('div').descendants # 所有后代节点
soup.find('div').parent # 父节点
soup.find('div').parents # 所有祖先节点
soup.find('li').next_sibling # 下一个兄弟节点
soup.find('li').previous_sibling # 上一个兄弟节点
# 6. 实际应用:提取数据
def extract_data(html):
soup = BeautifulSoup(html, 'html.parser')
data = {
'title': soup.find('h1', class_='title').text.strip(),
'content': soup.find('div', class_='content').text.strip(),
'links': [
{
'text': a.text,
'href': a['href']
}
for a in soup.find_all('a', href=True)
]
}
return data4. XPath 和 CSS 选择器
问题:XPath 和 CSS 选择器如何使用?
答案:
python
from lxml import etree
html = """
<html>
<body>
<div class="container">
<h1>Title</h1>
<ul class="list">
<li data-id="1">Item 1</li>
<li data-id="2">Item 2</li>
<li data-id="3">Item 3</li>
</ul>
<div class="item" data-price="100">Product 1</div>
<div class="item" data-price="200">Product 2</div>
</div>
</body>
</html>
"""
tree = etree.HTML(html)
# 1. XPath 基本用法
tree.xpath('//h1') # 所有 h1
tree.xpath('//h1/text()') # h1 文本
tree.xpath('//div[@class="container"]') # class 为 container 的 div
tree.xpath('//li[@data-id="1"]') # data-id 为 1 的 li
tree.xpath('//li[1]') # 第一个 li
tree.xpath('//li[last()]') # 最后一个 li
tree.xpath('//li[position()<=2]') # 前两个 li
# 2. XPath 轴
tree.xpath('//ul/li') # ul 下的 li
tree.xpath('//ul//li') # ul 下的所有 li(后代)
tree.xpath('//li/..') # li 的父节点
tree.xpath('//li/following-sibling::*') # 后续兄弟节点
tree.xpath('//li/preceding-sibling::*') # 前面兄弟节点
# 3. XPath 函数
tree.xpath('contains(@class, "item")') # class 包含 item
tree.xpath('starts-with(@href, "http")') # href 以 http 开头
tree.xpath('//li[text()="Item 1"]') # 文本为 Item 1 的 li
tree.xpath('//div[@data-price>100]') # data-price 大于 100
# 4. 提取属性
tree.xpath('//div/@class') # 所有 class 属性
tree.xpath('//li/@data-id') # 所有 data-id 属性
# 5. CSS 选择器(使用 cssselect)
from lxml.cssselect import CSSSelector
sel = CSSSelector('div.container')
results = sel(tree)
# 6. 实际应用
def parse_with_xpath(html):
tree = etree.HTML(html)
items = []
for li in tree.xpath('//ul[@class="list"]/li'):
item = {
'id': li.get('data-id'),
'text': li.xpath('./text()')[0]
}
items.append(item)
return items5. 数据存储
问题:爬虫数据如何存储?
答案:
python
import json
import csv
import sqlite3
import pymongo
import pandas as pd
# 1. JSON 文件
data = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]
# 保存
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 追加(JSON Lines 格式)
with open('data.jsonl', 'a', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
# 2. CSV 文件
# 保存
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['name', 'age'])
writer.writeheader()
writer.writerows(data)
# 使用 pandas
df = pd.DataFrame(data)
df.to_csv('data.csv', index=False, encoding='utf-8')
# 3. SQLite
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
age INTEGER
)
''')
# 插入数据
for item in data:
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)',
(item['name'], item['age']))
conn.commit()
conn.close()
# 4. MongoDB
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['mydb']
collection = db['users']
# 插入
collection.insert_many(data)
# 查询
for doc in collection.find():
print(doc)
# 5. Redis(缓存)
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 存储
r.set('key', 'value')
r.setex('temp_key', 3600, 'value') # 带过期时间
# 集合(去重)
r.sadd('urls', 'http://example.com')
is_member = r.sismember('urls', 'http://example.com')
# 列表(队列)
r.lpush('queue', 'task1')
task = r.brpop('queue', timeout=5)
# 6. 数据库存储类
class DataPipeline:
def __init__(self):
self.conn = sqlite3.connect('data.db')
self.cursor = self.conn.cursor()
self._create_table()
def _create_table(self):
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
url TEXT UNIQUE,
content TEXT,
crawl_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def process_item(self, item):
try:
self.cursor.execute('''
INSERT OR IGNORE INTO items (title, url, content)
VALUES (?, ?, ?)
''', (item['title'], item['url'], item['content']))
self.conn.commit()
except Exception as e:
print(f"Error: {e}")
def close(self):
self.conn.close()6. 异步爬虫
问题:如何实现异步爬虫?
答案:
python
import asyncio
import aiohttp
from bs4 import BeautifulSoup
# 1. 基本异步请求
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com/page1', 'http://example.com/page2']
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, html in zip(urls, results):
print(f"Fetched {url}: {len(html)} bytes")
asyncio.run(main())
# 2. 带限制的异步爬虫
import aiohttp
from asyncio import Semaphore
class AsyncCrawler:
def __init__(self, max_concurrent=5):
self.semaphore = Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={'User-Agent': 'MyBot/1.0'},
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
if response.status == 200:
return await response.text()
else:
print(f"Error {response.status}: {url}")
return None
except Exception as e:
print(f"Exception {e}: {url}")
return None
async def crawl(self, urls):
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
# 使用
async def main():
urls = [f'http://example.com/page/{i}' for i in range(100)]
async with AsyncCrawler(max_concurrent=10) as crawler:
results = await crawler.crawl(urls)
for url, html in zip(urls, results):
if html:
# 解析数据
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
print(f"{url}: {title.text if title else 'No title'}")
asyncio.run(main())
# 3. 异步队列
async def producer(queue, urls):
for url in urls:
await queue.put(url)
await queue.put(None) # 结束信号
async def consumer(queue, session):
while True:
url = await queue.get()
if url is None:
break
try:
async with session.get(url) as response:
html = await response.text()
print(f"Fetched: {url}")
except Exception as e:
print(f"Error {url}: {e}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=100)
urls = [f'http://example.com/page/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
producers = asyncio.create_task(producer(queue, urls))
consumers = [asyncio.create_task(consumer(queue, session))
for _ in range(5)]
await producers
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())7. 反爬对抗
问题:如何应对常见的反爬机制?
答案:
python
import requests
import time
import random
from fake_useragent import UserAgent
# 1. User-Agent 轮换
ua = UserAgent()
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
# 2. 代理 IP
proxies = [
{'http': 'http://proxy1.com:8080', 'https': 'https://proxy1.com:8080'},
{'http': 'http://proxy2.com:8080', 'https': 'https://proxy2.com:8080'}
]
def get_proxy():
return random.choice(proxies)
# 3. 请求间隔(限速)
def crawl_with_delay(urls, delay=1):
for url in urls:
response = requests.get(url, headers=headers)
# 处理响应
time.sleep(delay + random.uniform(0, 1)) # 随机延迟
# 4. Cookie 和 Session
session = requests.Session()
# 先访问首页获取 cookie
session.get('https://example.com')
# 后续请求自动携带 cookie
response = session.get('https://example.com/data')
# 5. 验证码处理
# 使用打码平台(如超级鹰)
def solve_captcha(image_path):
# 调用打码平台 API
# 返回识别结果
pass
# 6. 动态页面(Selenium)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def selenium_crawl(url):
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 无头模式
options.add_argument('--disable-gpu')
options.add_argument(f'--user-agent={ua.random}')
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
# 等待元素加载
wait = WebDriverWait(driver, 10)
element = wait.until(
EC.presence_of_element_located((By.CLASS_NAME, 'content'))
)
# 获取页面源码
html = driver.page_source
return html
finally:
driver.quit()
# 7. 请求重试
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_with_retry(url):
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response
# 8. 分布式爬虫(使用 Scrapy-Redis)
# settings.py
"""
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://localhost:6379'
"""8. Scrapy 框架
问题:Scrapy 框架的核心组件有哪些?如何使用?
答案:
python
# 1. 项目结构
"""
myproject/
scrapy.cfg
myproject/
__init__.py
items.py
middlewares.py
pipelines.py
settings.py
spiders/
__init__.py
example.py
"""
# 2. Item 定义
import scrapy
class ProductItem(scrapy.Item):
name = scrapy.Field()
price = scrapy.Field()
description = scrapy.Field()
url = scrapy.Field()
# 3. Spider 编写
import scrapy
from myproject.items import ProductItem
class ExampleSpider(scrapy.Spider):
name = 'example'
allowed_domains = ['example.com']
start_urls = ['http://example.com/products']
custom_settings = {
'DOWNLOAD_DELAY': 1,
'CONCURRENT_REQUESTS': 16
}
def parse(self, response):
# 提取产品列表
for product in response.css('div.product'):
item = ProductItem()
item['name'] = product.css('h2.name::text').get()
item['price'] = product.css('span.price::text').get()
item['description'] = product.css('p.description::text').get()
item['url'] = response.urljoin(product.css('a::attr(href)').get())
yield item
# 跟进详情页
yield response.follow(product.css('a::attr(href)'), self.parse_detail)
# 跟进下一页
next_page = response.css('a.next::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
def parse_detail(self, response):
# 解析详情页
pass
# 4. Pipeline 处理数据
class ProductPipeline:
def __init__(self):
self.file = open('products.json', 'w')
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(line)
return item
def close_spider(self, spider):
self.file.close()
# 5. Middleware
class RandomUserAgentMiddleware:
def __init__(self):
self.ua = UserAgent()
def process_request(self, request, spider):
request.headers['User-Agent'] = self.ua.random
# 6. Settings 配置
"""
# settings.py
BOT_NAME = 'myproject'
SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'
# 遵守 robots.txt
ROBOTSTXT_OBEY = True
# 并发请求数
CONCURRENT_REQUESTS = 16
# 下载延迟
DOWNLOAD_DELAY = 1
# 默认请求头
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
}
# Pipeline
ITEM_PIPELINES = {
'myproject.pipelines.ProductPipeline': 300,
}
# Middleware
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.RandomUserAgentMiddleware': 400,
}
# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
"""
# 7. 运行爬虫
"""
# 命令行
scrapy crawl example
scrapy crawl example -o products.json
scrapy crawl example -o products.csv
# 脚本运行
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
process = CrawlerProcess(get_project_settings())
process.crawl('example')
process.start()
"""9. 爬虫道德与法律
问题:爬虫开发中需要注意哪些道德和法律问题?
答案:
python
"""
1. 遵守 robots.txt
- 检查网站的 robots.txt 文件
- 遵守 Disallow 规则
- 尊重 Crawl-delay 设置
2. 控制请求频率
- 不要对目标网站造成过大压力
- 设置合理的下载延迟
- 避免在高峰期爬取
3. 数据使用规范
- 不要爬取个人隐私数据
- 遵守网站的服务条款
- 尊重版权和知识产权
4. 技术实现建议
"""
import time
import random
from urllib.robotparser import RobotFileParser
def check_robots_txt(url, user_agent='*'):
"""检查 robots.txt"""
rp = RobotFileParser()
rp.set_url(f"{url}/robots.txt")
rp.read()
return rp.can_fetch(user_agent, url)
def respectful_crawl(urls, min_delay=1, max_delay=3):
"""有礼貌地爬取"""
for url in urls:
# 检查 robots.txt
if not check_robots_txt(url):
print(f"Skipping {url} (disallowed by robots.txt)")
continue
# 发送请求
try:
response = requests.get(url, headers=headers)
# 处理数据
yield response.text
except Exception as e:
print(f"Error crawling {url}: {e}")
# 随机延迟
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
"""
5. 其他注意事项:
- 设置合理的 User-Agent,包含联系方式
- 处理 429 (Too Many Requests) 状态码
- 使用缓存避免重复请求
- 监控目标网站的负载情况
- 准备好应对反爬措施
- 遵守相关法律法规(如 GDPR)
"""