Appearance
第60天:边缘部署
学习目标
- 理解边缘计算概述
- 掌握边缘设备选择
- 学习模型压缩技术
- 理解边缘推理优化
- 掌握边缘部署实践
边缘计算概述
什么是边缘计算
边缘计算是将计算和数据存储移至接近数据源或用户的设备上,而不是依赖集中式云服务。
核心概念:
云端计算 → 边缘计算 → 终端设备边缘计算优势:
- 低延迟:数据在本地处理,减少网络延迟
- 隐私保护:数据不离开设备,保护用户隐私
- 离线能力:无需网络连接即可运行
- 降低成本:减少云服务使用成本
- 带宽节省:只传输必要数据
边缘vs云端
| 特性 | 边缘计算 | 云端计算 |
|---|---|---|
| 延迟 | 极低 | 中等 |
| 隐私 | 高 | 中等 |
| 离线能力 | 支持 | 不支持 |
| 计算能力 | 有限 | 强大 |
| 存储容量 | 有限 | 海量 |
| 成本 | 较低 | 较高 |
| 可扩展性 | 有限 | 高 |
边缘设备选择
设备类型
1. 移动设备
python
class MobileDevice:
def __init__(self):
self.specs = {
"cpu": "ARM",
"gpu": "Mali/Adreno",
"ram": "4-8GB",
"storage": "64-512GB",
"power": "Low",
"cost": "Low"
}
def get_capabilities(self) -> Dict:
return {
"model_size": "< 1GB",
"batch_size": 1,
"precision": "INT8",
"frameworks": ["TensorFlow Lite", "Core ML", "ONNX"]
}2. 嵌入式设备
python
class EmbeddedDevice:
def __init__(self):
self.specs = {
"cpu": "ARM/RISC-V",
"gpu": "None/Integrated",
"ram": "512MB-4GB",
"storage": "8-64GB",
"power": "Very Low",
"cost": "Very Low"
}
def get_capabilities(self) -> Dict:
return {
"model_size": "< 100MB",
"batch_size": 1,
"precision": "INT8/INT4",
"frameworks": ["TensorFlow Lite Micro", "TFLite", "ONNX"]
}3. 边缘服务器
python
class EdgeServer:
def __init__(self):
self.specs = {
"cpu": "x86/ARM",
"gpu": "NVIDIA/Intel",
"ram": "16-64GB",
"storage": "512GB-4TB",
"power": "Medium",
"cost": "Medium"
}
def get_capabilities(self) -> Dict:
return {
"model_size": "1-10GB",
"batch_size": "4-16",
"precision": "FP16/INT8",
"frameworks": ["TensorRT", "ONNX Runtime", "OpenVINO"]
}设备选择指南
python
class DeviceSelector:
def __init__(self):
self.devices = {
"mobile": MobileDevice(),
"embedded": EmbeddedDevice(),
"edge_server": EdgeServer()
}
def select_device(self, requirements: Dict) -> str:
scores = {}
for device_name, device in self.devices.items():
score = self._calculate_score(device, requirements)
scores[device_name] = score
best_device = max(scores, key=scores.get)
return best_device
def _calculate_score(self, device, requirements: Dict) -> float:
capabilities = device.get_capabilities()
score = 0.0
if requirements.get("model_size", "large") == "small":
score += 1.0
if requirements.get("batch_size", 1) == 1:
score += 1.0
if requirements.get("precision", "int8") == "int8":
score += 1.0
if requirements.get("power", "low") == "low":
score += 1.0
if requirements.get("cost", "low") == "low":
score += 1.0
return score模型压缩
知识蒸馏
python
import torch
import torch.nn as nn
class KnowledgeDistillation:
def __init__(self, teacher_model: nn.Module,
student_model: nn.Module,
temperature: float = 3.0):
self.teacher_model = teacher_model
self.student_model = student_model
self.temperature = temperature
def distillation_loss(self, student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor,
alpha: float = 0.5) -> torch.Tensor:
soft_loss = nn.KLDivLoss(reduction="batchmean")(
nn.functional.log_softmax(student_logits / self.temperature, dim=1),
nn.functional.softmax(teacher_logits / self.temperature, dim=1)
) * (self.temperature ** 2)
hard_loss = nn.CrossEntropyLoss()(student_logits, labels)
return alpha * soft_loss + (1 - alpha) * hard_loss
def train_student(self, train_loader,
optimizer, n_epochs: int = 10):
self.teacher_model.eval()
self.student_model.train()
for epoch in range(n_epochs):
total_loss = 0
for batch in train_loader:
inputs, labels = batch
with torch.no_grad():
teacher_logits = self.teacher_model(inputs)
student_logits = self.student_model(inputs)
loss = self.distillation_loss(
student_logits,
teacher_logits,
labels
)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}: Loss: {total_loss / len(train_loader):.4f}")
return self.student_model剪枝
python
class ModelPruner:
def __init__(self, model: nn.Module,
pruning_ratio: float = 0.3):
self.model = model
self.pruning_ratio = pruning_ratio
def prune_model(self):
parameters_to_prune = []
for name, module in self.model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
parameters_to_prune.append((module, 'weight'))
for module, param_name in parameters_to_prune:
torch.nn.utils.prune.l1_unstructured(
module,
name=param_name,
amount=self.pruning_ratio
)
return self.model
def remove_pruning(self):
for module in self.model.modules():
if hasattr(module, 'weight_orig'):
torch.nn.utils.prune.remove(module, 'weight')
return self.model
def fine_tune_pruned(self, train_loader,
optimizer, n_epochs: int = 5):
self.model.train()
for epoch in range(n_epochs):
total_loss = 0
for batch in train_loader:
inputs, labels = batch
outputs = self.model(inputs)
loss = nn.CrossEntropyLoss()(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}: Loss: {total_loss / len(train_loader):.4f}")
return self.model结构化剪枝
python
class StructuredPruner:
def __init__(self, model: nn.Module,
pruning_ratio: float = 0.3):
self.model = model
self.pruning_ratio = pruning_ratio
def prune_channels(self):
for name, module in self.model.named_modules():
if isinstance(module, nn.Conv2d):
self._prune_conv_channels(module)
elif isinstance(module, nn.Linear):
self._prune_linear_channels(module)
return self.model
def _prune_conv_channels(self, conv: nn.Conv2d):
n_channels = conv.out_channels
n_pruned = int(n_channels * self.pruning_ratio)
importance = self._calculate_channel_importance(conv)
_, indices = torch.topk(importance, n_channels - n_pruned)
mask = torch.zeros(n_channels, dtype=torch.bool)
mask[indices] = True
conv.register_buffer('channel_mask', mask)
def _prune_linear_channels(self, linear: nn.Linear):
n_features = linear.out_features
n_pruned = int(n_features * self.pruning_ratio)
importance = self._calculate_feature_importance(linear)
_, indices = torch.topk(importance, n_features - n_pruned)
mask = torch.zeros(n_features, dtype=torch.bool)
mask[indices] = True
linear.register_buffer('feature_mask', mask)
def _calculate_channel_importance(self, conv: nn.Conv2d) -> torch.Tensor:
weight = conv.weight
importance = weight.abs().sum(dim=(0, 2, 3))
return importance
def _calculate_feature_importance(self, linear: nn.Linear) -> torch.Tensor:
weight = linear.weight
importance = weight.abs().sum(dim=1)
return importance边缘推理优化
模型优化
python
class EdgeModelOptimizer:
def __init__(self, model: nn.Module):
self.model = model
def optimize_for_edge(self):
self.model.eval()
for param in self.model.parameters():
param.requires_grad = False
self._fuse_layers()
self._optimize_memory()
return self.model
def _fuse_layers(self):
torch.quantization.fuse_modules(
self.model,
[['conv', 'bn', 'relu']],
inplace=True
)
def _optimize_memory(self):
if hasattr(self.model, 'gradient_checkpointing_enable'):
self.model.gradient_checkpointing_enable()TensorFlow Lite转换
python
class TFLiteConverter:
def __init__(self, model_path: str):
self.model_path = model_path
def convert_to_tflite(self, output_path: str):
try:
import tensorflow as tf
except ImportError:
raise ImportError("Install tensorflow: pip install tensorflow")
converter = tf.lite.TFLiteConverter.from_saved_model(self.model_path)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
with open(output_path, 'wb') as f:
f.write(tflite_model)
return output_path
def convert_to_int8(self, output_path: str,
representative_dataset):
try:
import tensorflow as tf
except ImportError:
raise ImportError("Install tensorflow: pip install tensorflow")
converter = tf.lite.TFLiteConverter.from_saved_model(self.model_path)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
with open(output_path, 'wb') as f:
f.write(tflite_model)
return output_pathCore ML转换
python
class CoreMLConverter:
def __init__(self, model_path: str):
self.model_path = model_path
def convert_to_coreml(self, output_path: str):
try:
import coremltools as ct
except ImportError:
raise ImportError("Install coremltools: pip install coremltools")
import torch
model = torch.jit.load(self.model_path)
model.eval()
example_input = torch.rand(1, 3, 224, 224)
traced_model = torch.jit.trace(model, example_input)
mlmodel = ct.convert(
traced_model,
inputs=[ct.TensorType(name="input", shape=example_input.shape)],
minimum_deployment_target=ct.target.iOS13
)
mlmodel.save(output_path)
return output_path边缘部署实践
移动端部署
python
class MobileDeployment:
def __init__(self, model_path: str,
device_type: str = "android"):
self.model_path = model_path
self.device_type = device_type
def prepare_model(self) -> str:
if self.device_type == "android":
return self._prepare_android_model()
elif self.device_type == "ios":
return self._prepare_ios_model()
else:
raise ValueError(f"Unsupported device type: {self.device_type}")
def _prepare_android_model(self) -> str:
converter = TFLiteConverter(self.model_path)
tflite_path = self.model_path.replace(".h5", ".tflite")
converter.convert_to_tflite(tflite_path)
return tflite_path
def _prepare_ios_model(self) -> str:
converter = CoreMLConverter(self.model_path)
coreml_path = self.model_path.replace(".h5", ".mlmodel")
converter.convert_to_coreml(coreml_path)
return coreml_path
def generate_app_code(self, model_path: str) -> str:
if self.device_type == "android":
return self._generate_android_code(model_path)
elif self.device_type == "ios":
return self._generate_ios_code(model_path)
def _generate_android_code(self, model_path: str) -> str:
return f"""
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path="{model_path}")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
def predict(input_data):
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
return output_data
"""
def _generate_ios_code(self, model_path: str) -> str:
return f"""
import CoreML
model = CoreML.MLModel(contentsOf: URL(fileURLWithPath: "{model_path}"))
func predict(input: [Float]) -> [Float] {{
let inputML = try! MLMultiArray(shape: [1, input.count], dataType: .float32)
for (index, value) in input.enumerated() {{
inputML[index] = value
}}
let output = try! model.prediction(from: ["input": inputML])
return output.featureValue(for: "output").multiArrayValue!.array.map {{ $0 as! Float }}
}}
"""嵌入式部署
python
class EmbeddedDeployment:
def __init__(self, model_path: str):
self.model_path = model_path
def prepare_for_embedded(self) -> str:
converter = TFLiteConverter(self.model_path)
representative_dataset = self._create_representative_dataset()
tflite_path = self.model_path.replace(".h5", "_int8.tflite")
converter.convert_to_int8(tflite_path, representative_dataset)
return tflite_path
def _create_representative_dataset(self):
import numpy as np
def representative_data():
for _ in range(100):
yield [np.random.rand(224, 224, 3).astype(np.float32)]
return representative_data
def generate_c_code(self, tflite_path: str) -> str:
try:
import xxd
except ImportError:
return "# C code generation requires xxd tool"
return f"""
#include "tensorflow/lite/c/common.h"
#include "tensorflow/lite/c/interpreter.h"
const unsigned char model_data[] = {{
{xxd(tflite_path)}
}};
const int model_size = sizeof(model_data);
void setup_model() {{
TfLiteModel* model = TfLiteModelCreate(model_data, model_size);
TfLiteInterpreterOptions* options = TfLiteInterpreterOptionsCreate();
TfLiteInterpreterCreate(model, options, &interpreter);
}}
void run_inference(float* input, float* output) {{
TfLiteTensor* input_tensor = interpreter->input_tensors[0];
TfLiteTensor* output_tensor = interpreter->output_tensors[0];
memcpy(input_tensor->data.f, input, input_tensor->bytes);
TfLiteInterpreterInvoke(interpreter);
memcpy(output, output_tensor->data.f, output_tensor->bytes);
}}
"""边缘服务器部署
python
class EdgeServerDeployment:
def __init__(self, model_path: str):
self.model_path = model_path
def deploy_to_edge_server(self, server_config: Dict):
optimized_model = self._optimize_model()
container_image = self._build_container(optimized_model)
deployment = self._deploy_container(container_image, server_config)
return deployment
def _optimize_model(self):
optimizer = EdgeModelOptimizer(None)
model = load_model(self.model_path)
optimized_model = optimizer.optimize_for_edge(model)
return optimized_model
def _build_container(self, model):
dockerfile = f"""
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "server.py"]
"""
return dockerfile
def _deploy_container(self, image, config: Dict):
deployment_config = {
"image": image,
"replicas": config.get("replicas", 1),
"resources": {
"cpu": config.get("cpu", "2"),
"memory": config.get("memory", "4Gi"),
"gpu": config.get("gpu", "1")
}
}
return deployment_config实践练习
练习1:实现知识蒸馏
python
def distill_knowledge(teacher_model, student_model,
train_loader, n_epochs=10):
distiller = KnowledgeDistillation(teacher_model, student_model)
optimizer = torch.optim.Adam(student_model.parameters(), lr=0.001)
trained_student = distiller.train_student(
train_loader,
optimizer,
n_epochs
)
return trained_student练习2:转换为TensorFlow Lite
python
def convert_to_tflite(model_path, output_path):
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_saved_model(model_path)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
with open(output_path, 'wb') as f:
f.write(tflite_model)
return output_path总结
本节我们学习了边缘部署:
- 边缘计算概述和优势
- 边缘设备选择
- 模型压缩技术(知识蒸馏、剪枝)
- 边缘推理优化
- 边缘部署实践(移动端、嵌入式、边缘服务器)
边缘部署是AI应用落地的重要方向。
