Appearance
FastAPI 面试题
1. FastAPI 基础
问题:FastAPI 的主要特点是什么?与其他 Python Web 框架有什么区别?
答案:
FastAPI 主要特点:
- 高性能:基于 Starlette 和 Pydantic,性能接近 Node.js 和 Go
- 快速开发:自动 API 文档,类型提示支持
- 类型安全:基于 Python 类型提示,自动数据验证
- 异步支持:原生支持 async/await
- 自动文档:自动生成 OpenAPI 和 Swagger UI
FastAPI vs Flask vs Django:
| 特性 | FastAPI | Flask | Django |
|---|---|---|---|
| 性能 | 高 | 中 | 中 |
| 异步支持 | 原生 | 扩展 | 部分 |
| 类型提示 | 强制 | 可选 | 可选 |
| 自动文档 | 是 | 否 | 否 |
| 数据验证 | 自动 | 手动 | 表单 |
| 学习曲线 | 中 | 低 | 高 |
| 适用场景 | API 服务 | 小型应用 | 大型应用 |
2. FastAPI 基础用法
问题:FastAPI 的基本用法是怎样的?
答案:
python
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI()
# 1. 基本路由
@app.get("/")
async def root():
return {"message": "Hello World"}
# 2. 路径参数
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
# 3. 查询参数
@app.get("/items/")
async def read_items(skip: int = 0, limit: int = 10):
return {"skip": skip, "limit": limit}
# 4. 请求体
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
@app.post("/items/")
async def create_item(item: Item):
item_dict = item.dict()
if item.tax:
price_with_tax = item.price + item.tax
item_dict.update({"price_with_tax": price_with_tax})
return item_dict
# 5. 组合使用
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, q: Optional[str] = None):
result = {"item_id": item_id, **item.dict()}
if q:
result.update({"q": q})
return result
# 6. 响应模型
class ItemResponse(BaseModel):
name: str
price: float
class Config:
orm_mode = True
@app.get("/items/{item_id}", response_model=ItemResponse)
async def read_item(item_id: int):
return {"name": "Foo", "price": 50.5}3. 依赖注入
问题:FastAPI 的依赖注入系统如何使用?
答案:
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
# 1. 基本依赖
def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
# 2. 类作为依赖
class CommonQueryParams:
def __init__(self, q: Optional[str] = None, skip: int = 0, limit: int = 100):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/items/")
async def read_items(commons: CommonQueryParams = Depends()):
return commons
# 3. 子依赖
def query_extractor(q: Optional[str] = None):
return q
def query_or_cookie_extractor(
q: str = Depends(query_extractor),
last_query: Optional[str] = Cookie(None)
):
if not q:
return last_query
return q
@app.get("/items/")
async def read_query(query_or_default: str = Depends(query_or_cookie_extractor)):
return {"q_or_cookie": query_or_default}
# 4. 数据库依赖
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/")
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
# 5. 认证依赖
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user4. 数据库集成
问题:FastAPI 如何与数据库集成?
答案:
python
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
# 1. SQLAlchemy 配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 2. 定义模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
User.items = relationship("Item", back_populates="owner")
# 创建表
Base.metadata.create_all(bind=engine)
# 3. Pydantic 模型
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
# 4. CRUD 操作
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: ItemCreate, user_id: int):
db_item = Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
# 5. API 路由
@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return create_user(db=db, user=user)
@app.get("/users/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=Item)
def create_item_for_user(
user_id: int, item: ItemCreate, db: Session = Depends(get_db)
):
return create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = get_items(db, skip=skip, limit=limit)
return items5. 认证与安全
问题:FastAPI 如何实现用户认证和安全?
答案:
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
# 1. 密码加密
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
# 2. JWT 配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 3. 用户模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
# 模拟数据库
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
# 4. 用户认证函数
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 5. 获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 6. 登录路由
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 7. 受保护的路由
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
# 8. HTTPS 强制
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
# 9. CORS
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)6. 后台任务
问题:FastAPI 如何处理后台任务?
答案:
python
from fastapi import BackgroundTasks
import time
import asyncio
# 1. 基本后台任务
def write_notification(email: str, message=""):
with open("log.txt", mode="a") as email_file:
content = f"notification for {email}: {message}\n"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
# 2. 异步后台任务
async def send_email_async(email: str, message: str):
await asyncio.sleep(2) # 模拟发送邮件
print(f"Email sent to {email}: {message}")
@app.post("/send-email/{email}")
async def send_email(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_email_async, email, "Hello!")
return {"message": "Email sending in background"}
# 3. 使用 Celery
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_data(data_id: int):
# 长时间运行的任务
time.sleep(10)
return f"Processed data {data_id}"
@app.post("/process/{data_id}")
async def process(data_id: int):
task = process_data.delay(data_id)
return {"task_id": task.id, "status": "processing"}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}7. 文件处理
问题:FastAPI 如何处理文件上传和下载?
答案:
python
from fastapi import File, UploadFile
from fastapi.responses import FileResponse
import shutil
from pathlib import Path
# 1. 单文件上传
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
return {
"filename": file.filename,
"content_type": file.content_type
}
# 2. 多文件上传
@app.post("/uploadfiles/")
async def create_upload_files(files: List[UploadFile] = File(...)):
return {"filenames": [file.filename for file in files]}
# 3. 保存上传的文件
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
file_path = Path(f"uploads/{file.filename}")
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"filename": file.filename, "path": str(file_path)}
# 4. 异步保存
@app.post("/upload-async/")
async def upload_file_async(file: UploadFile = File(...)):
file_path = Path(f"uploads/{file.filename}")
file_path.parent.mkdir(parents=True, exist_ok=True)
content = await file.read()
file_path.write_bytes(content)
return {"filename": file.filename, "size": len(content)}
# 5. 文件下载
@app.get("/download/{filename}")
async def download_file(filename: str):
file_path = Path(f"uploads/{filename}")
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/octet-stream"
)
# 6. 文件验证
from fastapi import HTTPException
@app.post("/upload-image/")
async def upload_image(file: UploadFile = File(...)):
# 验证文件类型
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File must be an image")
# 验证文件大小
content = await file.read()
if len(content) > 5 * 1024 * 1024: # 5MB
raise HTTPException(status_code=400, detail="File too large")
# 保存文件
file_path = Path(f"uploads/{file.filename}")
file_path.write_bytes(content)
return {"filename": file.filename, "size": len(content)}8. WebSocket
问题:FastAPI 如何支持 WebSocket?
答案:
python
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
# 1. 基本 WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
# 2. WebSocket 连接管理器
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
# 3. WebSocket 认证
@app.websocket("/ws-auth")
async def websocket_auth(websocket: WebSocket, token: str):
# 验证 token
user = await get_user_from_token(token)
if not user:
await websocket.close(code=4001)
return
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Hello {user.username}, you sent: {data}")
except WebSocketDisconnect:
pass9. 测试
问题:如何测试 FastAPI 应用?
答案:
python
from fastapi.testclient import TestClient
import pytest
from httpx import AsyncClient
import asyncio
# 1. 同步测试
client = TestClient(app)
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_item():
response = client.post(
"/items/",
json={"name": "Foo", "price": 50.5}
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Foo"
assert data["price"] == 50.5
def test_read_item():
response = client.get("/items/1")
assert response.status_code == 200
def test_read_item_not_found():
response = client.get("/items/999")
assert response.status_code == 404
# 2. 异步测试
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_read_main_async(async_client):
response = await async_client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
# 3. 依赖覆盖
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
# 4. 数据库测试
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
# 5. 测试认证
def test_protected_endpoint():
# 登录获取 token
response = client.post(
"/token",
data={"username": "johndoe", "password": "secret"}
)
token = response.json()["access_token"]
# 访问受保护端点
response = client.get(
"/users/me/",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 20010. 部署
问题:如何部署 FastAPI 应用?
答案:
python
# 1. Uvicorn 配置
# 开发
# uvicorn main:app --reload
# 生产
# uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# 2. Gunicorn + Uvicorn
# gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker
# 3. Docker 部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# 4. 生产环境配置
# config.py
from pydantic import BaseSettings
class Settings(BaseSettings):
app_name: str = "My FastAPI App"
debug: bool = False
secret_key: str
database_url: str
class Config:
env_file = ".env"
settings = Settings()
# 5. Nginx 配置
"""
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
"""
# 6. 环境变量
# .env
APP_NAME=MyApp
DEBUG=false
SECRET_KEY=your-secret-key
DATABASE_URL=postgresql://user:password@localhost/dbname
# 7. 日志配置
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.get("/")
async def root():
logger.info("Root endpoint called")
return {"message": "Hello World"}