Skip to content

Python 爬虫面试题

1. 爬虫基础

问题:什么是网络爬虫?爬虫的基本工作流程是什么?

答案

网络爬虫是一种自动获取网页内容的程序,通过模拟浏览器行为,批量抓取互联网信息。

基本工作流程

  1. 发送 HTTP 请求获取网页
  2. 解析 HTML 提取数据
  3. 存储数据
  4. 发现新链接,继续爬取
python
import requests
from bs4 import BeautifulSoup

# 基本爬虫示例
url = 'https://example.com'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')

# 提取标题
title = soup.find('title').text

# 提取所有链接
links = [a['href'] for a in soup.find_all('a', href=True)]

2. Requests 库

问题:Requests 库如何使用?有哪些常用功能?

答案

python
import requests

# 1. 基本请求
response = requests.get('https://api.github.com')
response = requests.post('https://httpbin.org/post', data={'key': 'value'})
response = requests.put('https://httpbin.org/put', data={'key': 'value'})
response = requests.delete('https://httpbin.org/delete')

# 2. 请求参数
params = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://httpbin.org/get', params=params)

# 3. 请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
response = requests.get('https://example.com', headers=headers)

# 4. 表单数据
data = {'username': 'admin', 'password': '123456'}
response = requests.post('https://httpbin.org/post', data=data)

# 5. JSON 数据
json_data = {'key': 'value'}
response = requests.post('https://httpbin.org/post', json=json_data)

# 6. 文件上传
files = {'file': open('report.xls', 'rb')}
response = requests.post('https://httpbin.org/post', files=files)

# 7. Cookies
cookies = {'session_id': '123456'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)

# 获取响应 cookies
print(response.cookies)

# 8. Session(保持会话)
session = requests.Session()
session.headers.update({'User-Agent': 'MyBot/1.0'})

# 登录
session.post('https://example.com/login', data={'user': 'admin', 'pass': '123'})

# 后续请求自动携带 cookies
response = session.get('https://example.com/profile')

# 9. 代理
proxies = {
    'http': 'http://10.10.1.10:3128',
    'https': 'http://10.10.1.10:1080'
}
response = requests.get('https://example.com', proxies=proxies)

# 10. 超时和重试
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))

response = session.get('https://example.com', timeout=5)

# 11. SSL 验证
response = requests.get('https://example.com', verify=False)  # 禁用 SSL 验证
response = requests.get('https://example.com', verify='/path/to/certfile')  # 自定义证书

# 12. 流式下载
response = requests.get('https://example.com/largefile.zip', stream=True)
with open('largefile.zip', 'wb') as f:
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)

3. BeautifulSoup 解析

问题:如何使用 BeautifulSoup 解析 HTML?

答案

python
from bs4 import BeautifulSoup
import requests

html = """
<html>
<head><title>Example</title></head>
<body>
    <div class="content">
        <h1 id="title">Hello World</h1>
        <p class="text">This is a paragraph.</p>
        <a href="https://example.com">Link</a>
        <ul>
            <li>Item 1</li>
            <li>Item 2</li>
        </ul>
    </div>
</body>
</html>
"""

soup = BeautifulSoup(html, 'html.parser')
# 或使用 lxml 解析器(更快)
# soup = BeautifulSoup(html, 'lxml')

# 1. 基本查找
soup.title              # <title>Example</title>
soup.title.text         # Example
soup.h1                 # 第一个 h1
soup.find('h1')         # 第一个 h1
soup.find_all('p')      # 所有 p 标签

# 2. 通过属性查找
soup.find('div', class_='content')      # 注意 class_
soup.find('h1', id='title')
soup.find(attrs={'data-id': '123'})

# 3. CSS 选择器
soup.select('div.content')              # class 为 content 的 div
soup.select('#title')                   # id 为 title 的元素
soup.select('div > p')                  # div 下的直接子 p
soup.select('a[href]')                  # 有 href 属性的 a
soup.select('li:nth-of-type(2)')        # 第二个 li

# 4. 获取内容
element = soup.find('h1')
element.text            # 文本内容
element.get_text()      # 文本内容(可指定分隔符)
element.string          # 直接子文本

# 获取属性
link = soup.find('a')
link['href']            # https://example.com
link.get('href')        # https://example.com
link.attrs              # 所有属性字典

# 5. 遍历文档树
soup.find('div').children       # 直接子节点
soup.find('div').descendants    # 所有后代节点
soup.find('div').parent         # 父节点
soup.find('div').parents        # 所有祖先节点
soup.find('li').next_sibling    # 下一个兄弟节点
soup.find('li').previous_sibling # 上一个兄弟节点

# 6. 实际应用:提取数据
def extract_data(html):
    soup = BeautifulSoup(html, 'html.parser')
    
    data = {
        'title': soup.find('h1', class_='title').text.strip(),
        'content': soup.find('div', class_='content').text.strip(),
        'links': [
            {
                'text': a.text,
                'href': a['href']
            }
            for a in soup.find_all('a', href=True)
        ]
    }
    
    return data

4. XPath 和 CSS 选择器

问题:XPath 和 CSS 选择器如何使用?

答案

python
from lxml import etree

html = """
<html>
<body>
    <div class="container">
        <h1>Title</h1>
        <ul class="list">
            <li data-id="1">Item 1</li>
            <li data-id="2">Item 2</li>
            <li data-id="3">Item 3</li>
        </ul>
        <div class="item" data-price="100">Product 1</div>
        <div class="item" data-price="200">Product 2</div>
    </div>
</body>
</html>
"""

tree = etree.HTML(html)

# 1. XPath 基本用法
tree.xpath('//h1')                      # 所有 h1
tree.xpath('//h1/text()')               # h1 文本
tree.xpath('//div[@class="container"]') # class 为 container 的 div
tree.xpath('//li[@data-id="1"]')        # data-id 为 1 的 li
tree.xpath('//li[1]')                   # 第一个 li
tree.xpath('//li[last()]')              # 最后一个 li
tree.xpath('//li[position()<=2]')       # 前两个 li

# 2. XPath 轴
tree.xpath('//ul/li')                   # ul 下的 li
tree.xpath('//ul//li')                  # ul 下的所有 li(后代)
tree.xpath('//li/..')                   # li 的父节点
tree.xpath('//li/following-sibling::*') # 后续兄弟节点
tree.xpath('//li/preceding-sibling::*') # 前面兄弟节点

# 3. XPath 函数
tree.xpath('contains(@class, "item")')  # class 包含 item
tree.xpath('starts-with(@href, "http")') # href 以 http 开头
tree.xpath('//li[text()="Item 1"]')     # 文本为 Item 1 的 li
tree.xpath('//div[@data-price>100]')    # data-price 大于 100

# 4. 提取属性
tree.xpath('//div/@class')              # 所有 class 属性
tree.xpath('//li/@data-id')             # 所有 data-id 属性

# 5. CSS 选择器(使用 cssselect)
from lxml.cssselect import CSSSelector

sel = CSSSelector('div.container')
results = sel(tree)

# 6. 实际应用
def parse_with_xpath(html):
    tree = etree.HTML(html)
    
    items = []
    for li in tree.xpath('//ul[@class="list"]/li'):
        item = {
            'id': li.get('data-id'),
            'text': li.xpath('./text()')[0]
        }
        items.append(item)
    
    return items

5. 数据存储

问题:爬虫数据如何存储?

答案

python
import json
import csv
import sqlite3
import pymongo
import pandas as pd

# 1. JSON 文件
data = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]

# 保存
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 追加(JSON Lines 格式)
with open('data.jsonl', 'a', encoding='utf-8') as f:
    for item in data:
        f.write(json.dumps(item, ensure_ascii=False) + '\n')

# 2. CSV 文件
# 保存
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=['name', 'age'])
    writer.writeheader()
    writer.writerows(data)

# 使用 pandas
df = pd.DataFrame(data)
df.to_csv('data.csv', index=False, encoding='utf-8')

# 3. SQLite
conn = sqlite3.connect('data.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        age INTEGER
    )
''')

# 插入数据
for item in data:
    cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', 
                   (item['name'], item['age']))

conn.commit()
conn.close()

# 4. MongoDB
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['mydb']
collection = db['users']

# 插入
collection.insert_many(data)

# 查询
for doc in collection.find():
    print(doc)

# 5. Redis(缓存)
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 存储
r.set('key', 'value')
r.setex('temp_key', 3600, 'value')  # 带过期时间

# 集合(去重)
r.sadd('urls', 'http://example.com')
is_member = r.sismember('urls', 'http://example.com')

# 列表(队列)
r.lpush('queue', 'task1')
task = r.brpop('queue', timeout=5)

# 6. 数据库存储类
class DataPipeline:
    def __init__(self):
        self.conn = sqlite3.connect('data.db')
        self.cursor = self.conn.cursor()
        self._create_table()
    
    def _create_table(self):
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT,
                url TEXT UNIQUE,
                content TEXT,
                crawl_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()
    
    def process_item(self, item):
        try:
            self.cursor.execute('''
                INSERT OR IGNORE INTO items (title, url, content)
                VALUES (?, ?, ?)
            ''', (item['title'], item['url'], item['content']))
            self.conn.commit()
        except Exception as e:
            print(f"Error: {e}")
    
    def close(self):
        self.conn.close()

6. 异步爬虫

问题:如何实现异步爬虫?

答案

python
import asyncio
import aiohttp
from bs4 import BeautifulSoup

# 1. 基本异步请求
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com/page1', 'http://example.com/page2']
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, html in zip(urls, results):
            print(f"Fetched {url}: {len(html)} bytes")

asyncio.run(main())

# 2. 带限制的异步爬虫
import aiohttp
from asyncio import Semaphore

class AsyncCrawler:
    def __init__(self, max_concurrent=5):
        self.semaphore = Semaphore(max_concurrent)
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers={'User-Agent': 'MyBot/1.0'},
            timeout=aiohttp.ClientTimeout(total=30)
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def fetch(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    else:
                        print(f"Error {response.status}: {url}")
                        return None
            except Exception as e:
                print(f"Exception {e}: {url}")
                return None
    
    async def crawl(self, urls):
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

# 使用
async def main():
    urls = [f'http://example.com/page/{i}' for i in range(100)]
    
    async with AsyncCrawler(max_concurrent=10) as crawler:
        results = await crawler.crawl(urls)
        
        for url, html in zip(urls, results):
            if html:
                # 解析数据
                soup = BeautifulSoup(html, 'html.parser')
                title = soup.find('title')
                print(f"{url}: {title.text if title else 'No title'}")

asyncio.run(main())

# 3. 异步队列
async def producer(queue, urls):
    for url in urls:
        await queue.put(url)
    await queue.put(None)  # 结束信号

async def consumer(queue, session):
    while True:
        url = await queue.get()
        if url is None:
            break
        
        try:
            async with session.get(url) as response:
                html = await response.text()
                print(f"Fetched: {url}")
        except Exception as e:
            print(f"Error {url}: {e}")
        
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=100)
    urls = [f'http://example.com/page/{i}' for i in range(100)]
    
    async with aiohttp.ClientSession() as session:
        producers = asyncio.create_task(producer(queue, urls))
        consumers = [asyncio.create_task(consumer(queue, session)) 
                     for _ in range(5)]
        
        await producers
        await queue.join()
        
        for c in consumers:
            c.cancel()

asyncio.run(main())

7. 反爬对抗

问题:如何应对常见的反爬机制?

答案

python
import requests
import time
import random
from fake_useragent import UserAgent

# 1. User-Agent 轮换
ua = UserAgent()

headers = {
    'User-Agent': ua.random,
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'Accept-Encoding': 'gzip, deflate, br',
    'Connection': 'keep-alive'
}

# 2. 代理 IP
proxies = [
    {'http': 'http://proxy1.com:8080', 'https': 'https://proxy1.com:8080'},
    {'http': 'http://proxy2.com:8080', 'https': 'https://proxy2.com:8080'}
]

def get_proxy():
    return random.choice(proxies)

# 3. 请求间隔(限速)
def crawl_with_delay(urls, delay=1):
    for url in urls:
        response = requests.get(url, headers=headers)
        # 处理响应
        time.sleep(delay + random.uniform(0, 1))  # 随机延迟

# 4. Cookie 和 Session
session = requests.Session()

# 先访问首页获取 cookie
session.get('https://example.com')

# 后续请求自动携带 cookie
response = session.get('https://example.com/data')

# 5. 验证码处理
# 使用打码平台(如超级鹰)
def solve_captcha(image_path):
    # 调用打码平台 API
    # 返回识别结果
    pass

# 6. 动态页面(Selenium)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

def selenium_crawl(url):
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')  # 无头模式
    options.add_argument('--disable-gpu')
    options.add_argument(f'--user-agent={ua.random}')
    
    driver = webdriver.Chrome(options=options)
    
    try:
        driver.get(url)
        
        # 等待元素加载
        wait = WebDriverWait(driver, 10)
        element = wait.until(
            EC.presence_of_element_located((By.CLASS_NAME, 'content'))
        )
        
        # 获取页面源码
        html = driver.page_source
        
        return html
    finally:
        driver.quit()

# 7. 请求重试
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_with_retry(url):
    response = requests.get(url, headers=headers, timeout=10)
    response.raise_for_status()
    return response

# 8. 分布式爬虫(使用 Scrapy-Redis)
# settings.py
"""
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://localhost:6379'
"""

8. Scrapy 框架

问题:Scrapy 框架的核心组件有哪些?如何使用?

答案

python
# 1. 项目结构
"""
myproject/
    scrapy.cfg
    myproject/
        __init__.py
        items.py
        middlewares.py
        pipelines.py
        settings.py
        spiders/
            __init__.py
            example.py
"""

# 2. Item 定义
import scrapy

class ProductItem(scrapy.Item):
    name = scrapy.Field()
    price = scrapy.Field()
    description = scrapy.Field()
    url = scrapy.Field()

# 3. Spider 编写
import scrapy
from myproject.items import ProductItem

class ExampleSpider(scrapy.Spider):
    name = 'example'
    allowed_domains = ['example.com']
    start_urls = ['http://example.com/products']
    
    custom_settings = {
        'DOWNLOAD_DELAY': 1,
        'CONCURRENT_REQUESTS': 16
    }
    
    def parse(self, response):
        # 提取产品列表
        for product in response.css('div.product'):
            item = ProductItem()
            item['name'] = product.css('h2.name::text').get()
            item['price'] = product.css('span.price::text').get()
            item['description'] = product.css('p.description::text').get()
            item['url'] = response.urljoin(product.css('a::attr(href)').get())
            
            yield item
            
            # 跟进详情页
            yield response.follow(product.css('a::attr(href)'), self.parse_detail)
        
        # 跟进下一页
        next_page = response.css('a.next::attr(href)').get()
        if next_page:
            yield response.follow(next_page, self.parse)
    
    def parse_detail(self, response):
        # 解析详情页
        pass

# 4. Pipeline 处理数据
class ProductPipeline:
    def __init__(self):
        self.file = open('products.json', 'w')
    
    def process_item(self, item, spider):
        line = json.dumps(dict(item), ensure_ascii=False) + '\n'
        self.file.write(line)
        return item
    
    def close_spider(self, spider):
        self.file.close()

# 5. Middleware
class RandomUserAgentMiddleware:
    def __init__(self):
        self.ua = UserAgent()
    
    def process_request(self, request, spider):
        request.headers['User-Agent'] = self.ua.random

# 6. Settings 配置
"""
# settings.py
BOT_NAME = 'myproject'

SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'

# 遵守 robots.txt
ROBOTSTXT_OBEY = True

# 并发请求数
CONCURRENT_REQUESTS = 16

# 下载延迟
DOWNLOAD_DELAY = 1

# 默认请求头
DEFAULT_REQUEST_HEADERS = {
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'en',
}

# Pipeline
ITEM_PIPELINES = {
    'myproject.pipelines.ProductPipeline': 300,
}

# Middleware
DOWNLOADER_MIDDLEWARES = {
    'myproject.middlewares.RandomUserAgentMiddleware': 400,
}

# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
"""

# 7. 运行爬虫
"""
# 命令行
scrapy crawl example
scrapy crawl example -o products.json
scrapy crawl example -o products.csv

# 脚本运行
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

process = CrawlerProcess(get_project_settings())
process.crawl('example')
process.start()
"""

9. 爬虫道德与法律

问题:爬虫开发中需要注意哪些道德和法律问题?

答案

python
"""
1. 遵守 robots.txt
   - 检查网站的 robots.txt 文件
   - 遵守 Disallow 规则
   - 尊重 Crawl-delay 设置

2. 控制请求频率
   - 不要对目标网站造成过大压力
   - 设置合理的下载延迟
   - 避免在高峰期爬取

3. 数据使用规范
   - 不要爬取个人隐私数据
   - 遵守网站的服务条款
   - 尊重版权和知识产权

4. 技术实现建议
"""

import time
import random
from urllib.robotparser import RobotFileParser

def check_robots_txt(url, user_agent='*'):
    """检查 robots.txt"""
    rp = RobotFileParser()
    rp.set_url(f"{url}/robots.txt")
    rp.read()
    return rp.can_fetch(user_agent, url)

def respectful_crawl(urls, min_delay=1, max_delay=3):
    """有礼貌地爬取"""
    for url in urls:
        # 检查 robots.txt
        if not check_robots_txt(url):
            print(f"Skipping {url} (disallowed by robots.txt)")
            continue
        
        # 发送请求
        try:
            response = requests.get(url, headers=headers)
            # 处理数据
            yield response.text
        except Exception as e:
            print(f"Error crawling {url}: {e}")
        
        # 随机延迟
        delay = random.uniform(min_delay, max_delay)
        time.sleep(delay)

"""
5. 其他注意事项:
   - 设置合理的 User-Agent,包含联系方式
   - 处理 429 (Too Many Requests) 状态码
   - 使用缓存避免重复请求
   - 监控目标网站的负载情况
   - 准备好应对反爬措施
   - 遵守相关法律法规(如 GDPR)
"""