Repository Reading Site
train.py
ml-platform/training/train.py
"""
California Housing 房价预测模型训练脚本
完整流程:
1. 加载数据集 → 2. 特征工程 → 3. 训练多个模型 → 4. 评估选最优
→ 5. 导出 ONNX → 6. 存储到 NFS → 7. 创建 MLModel K8s 资源
"""
import os
import json
import time
import subprocess
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.pipeline import Pipeline
# ========== 配置 ==========
MODEL_VERSION = os.getenv("MODEL_VERSION", "v1")
MODEL_OUTPUT_PATH = os.getenv("MODEL_OUTPUT_PATH", "/models")
MLMODEL_NAME = os.getenv("MLMODEL_NAME", "housing-model")
MLMODEL_NAMESPACE = os.getenv("MLMODEL_NAMESPACE", "ml-platform")
CREATE_CR = os.getenv("CREATE_MLMODEL_CR", "true").lower() == "true"
print("=" * 60)
print("California Housing 房价预测 - 模型训练")
print("=" * 60)
# ========== 1. 加载数据集 ==========
print("\n[1/7] 加载 California Housing 数据集...")
housing = fetch_california_housing()
X, y = housing.data, housing.target
print(f" 样本数: {X.shape[0]}")
print(f" 特征数: {X.shape[1]}")
print(f" 特征名: {housing.feature_names}")
print(f" 目标: 房价中位数 (单位: $100,000)")
print(f" 目标范围: {y.min():.2f} ~ {y.max():.2f}")
# ========== 2. 数据探索 ==========
print("\n[2/7] 数据探索...")
print(" 特征统计:")
for i, name in enumerate(housing.feature_names):
col = X[:, i]
print(f" {name:12s}: min={col.min():10.2f}, max={col.max():10.2f}, "
f"mean={col.mean():10.2f}, std={col.std():10.2f}")
# ========== 3. 数据预处理 ==========
print("\n[3/7] 数据预处理...")
# 划分训练集和测试集 (80% 训练, 20% 测试)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f" 训练集: {X_train.shape[0]} 条")
print(f" 测试集: {X_test.shape[0]} 条")
# 特征标准化 (均值为0, 标准差为1)
# 为什么要标准化?
# - 线性回归: 特征尺度影响系数大小,标准化让系数可比较
# - 梯度下降: 不同尺度的特征导致损失函数等高线是椭圆形,收敛慢
# - 树模型: 对尺度不敏感,但统一处理更规范
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # fit + transform 训练集
X_test_scaled = scaler.transform(X_test) # 只 transform 测试集 (用训练集的参数)
print(" 标准化完成 (StandardScaler)")
# ========== 4. 训练多个模型 ==========
print("\n[4/7] 训练模型...")
models = {
"LinearRegression": LinearRegression(),
"RandomForest": RandomForestRegressor(
n_estimators=100, # 100 棵决策树
max_depth=10, # 限制深度防止过拟合
random_state=42,
n_jobs=-1 # 使用所有 CPU 核
),
"GradientBoosting": GradientBoostingRegressor(
n_estimators=100, # 100 轮迭代
max_depth=5, # 每棵树浅一些 (boosting 靠数量不靠深度)
learning_rate=0.1, # 学习率
random_state=42
),
}
results = {}
for name, model in models.items():
start = time.time()
model.fit(X_train_scaled, y_train)
train_time = time.time() - start
y_pred = model.predict(X_test_scaled)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
r2 = r2_score(y_test, y_pred)
results[name] = {
"model": model,
"mae": mae,
"rmse": rmse,
"r2": r2,
"train_time": train_time,
}
print(f"\n {name}:")
print(f" 训练时间: {train_time:.2f}s")
print(f" MAE (平均绝对误差): {mae:.4f} (${mae * 100000:.0f})")
print(f" RMSE (均方根误差): {rmse:.4f} (${rmse * 100000:.0f})")
print(f" R² (决定系数): {r2:.4f} ({r2 * 100:.1f}% 方差被解释)")
# ========== 5. 选择最优模型 ==========
print("\n[5/7] 选择最优模型...")
best_name = max(results, key=lambda k: results[k]["r2"])
best = results[best_name]
print(f" 最优模型: {best_name} (R² = {best['r2']:.4f})")
# ========== 6. 导出模型 ==========
print("\n[6/7] 导出模型...")
os.makedirs(MODEL_OUTPUT_PATH, exist_ok=True)
# 尝试导出 ONNX(可能对复杂模型失败)
try:
pipeline = Pipeline([("scaler", scaler), ("model", best["model"])])
initial_type = [("features", FloatTensorType([None, X.shape[1]]))]
onnx_model = convert_sklearn(pipeline, initial_types=initial_type)
model_filename = f"{MLMODEL_NAME}-{MODEL_VERSION}.onnx"
model_path = os.path.join(MODEL_OUTPUT_PATH, model_filename)
with open(model_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f" ONNX 模型已保存: {model_path} ({os.path.getsize(model_path) / 1024:.1f} KB)")
except Exception as e:
print(f" ONNX 导出失败 ({type(e).__name__}), 使用 Go JSON 格式")
# 回退到线性回归的 ONNX
try:
lr_pipeline = Pipeline([("scaler", scaler), ("model", models["LinearRegression"])])
initial_type = [("features", FloatTensorType([None, X.shape[1]]))]
onnx_model = convert_sklearn(lr_pipeline, initial_types=initial_type)
model_filename = f"{MLMODEL_NAME}-{MODEL_VERSION}.onnx"
model_path = os.path.join(MODEL_OUTPUT_PATH, model_filename)
with open(model_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f" LinearRegression ONNX 备选已保存: {model_path}")
except Exception:
print(f" ONNX 完全不可用,仅使用 JSON 格式")
# 保存元数据
metadata = {
"name": MLMODEL_NAME,
"version": MODEL_VERSION,
"model_type": best_name,
"features": housing.feature_names,
"target": "MedHouseVal",
"metrics": {
"mae": round(best["mae"], 4),
"rmse": round(best["rmse"], 4),
"r2_score": round(best["r2"], 4),
},
"training": {
"dataset": "california_housing",
"samples": X.shape[0],
"train_size": X_train.shape[0],
"test_size": X_test.shape[0],
"train_time_seconds": round(best["train_time"], 2),
},
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
meta_path = os.path.join(MODEL_OUTPUT_PATH, f"{MLMODEL_NAME}-{MODEL_VERSION}.json")
with open(meta_path, "w") as f:
json.dump(metadata, f, indent=2)
print(f" 元数据已保存: {meta_path}")
# 同时导出 Go 推理服务所需的 JSON 参数文件
# 对于线性回归: 导出 weights + intercept + scaler 参数
# 对于树模型: 也训练一个线性回归作为备选,方便 Go 加载
lr_model = models["LinearRegression"]
lr_result = results["LinearRegression"]
# Go 推理服务用的模型参数
go_model_params = {
"weights": lr_model.coef_.tolist(),
"intercept": float(lr_model.intercept_),
}
go_model_path = os.path.join(MODEL_OUTPUT_PATH, "model_params.json")
with open(go_model_path, "w") as f:
json.dump(go_model_params, f, indent=2)
print(f" Go 模型参数: {go_model_path}")
# Scaler 参数
go_scaler_params = {
"mean": scaler.mean_.tolist(),
"std": scaler.scale_.tolist(),
}
go_scaler_path = os.path.join(MODEL_OUTPUT_PATH, "scaler_params.json")
with open(go_scaler_path, "w") as f:
json.dump(go_scaler_params, f, indent=2)
print(f" Go Scaler 参数: {go_scaler_path}")
# Go 推理用的元数据 (格式与 Go 代码匹配)
go_metadata = {
"name": MLMODEL_NAME,
"version": MODEL_VERSION,
"model_type": "LinearRegression",
"features": list(housing.feature_names),
"metrics": {
"mae": round(lr_result["mae"], 4),
"rmse": round(lr_result["rmse"], 4),
"r2_score": round(lr_result["r2"], 4),
},
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
go_meta_path = os.path.join(MODEL_OUTPUT_PATH, "metadata.json")
with open(go_meta_path, "w") as f:
json.dump(go_metadata, f, indent=2)
print(f" Go 元数据: {go_meta_path}")
# ========== 7. 验证 ONNX 模型 ==========
print("\n[7/7] 验证 ONNX 模型...")
import onnxruntime as ort
session = ort.InferenceSession(model_path)
test_input = X_test[:3].astype(np.float32)
onnx_pred = session.run(None, {"features": test_input})[0]
sklearn_pred = pipeline.predict(test_input)
print(" ONNX vs sklearn 预测对比 (前 3 条):")
for i in range(3):
print(f" 样本{i + 1}: ONNX={onnx_pred[i][0]:.4f}, sklearn={sklearn_pred[i]:.4f}, "
f"实际={y_test[i]:.4f}")
max_diff = np.max(np.abs(onnx_pred.flatten() - sklearn_pred))
print(f" 最大偏差: {max_diff:.6f} {'✓ 通过' if max_diff < 0.01 else '✗ 偏差过大'}")
# ========== 8. 创建 MLModel K8s CRD (可选) ==========
if CREATE_CR:
print("\n[bonus] 创建 MLModel K8s 资源...")
cr_yaml = f"""apiVersion: ml.k8s-lab.io/v1alpha1
kind: MLModel
metadata:
name: {MLMODEL_NAME}-{MODEL_VERSION}
namespace: {MLMODEL_NAMESPACE}
spec:
modelPath: /models/{model_filename}
version: "{MODEL_VERSION}"
replicas: 2
modelType: "{best_name}"
minReplicas: 1
maxReplicas: 5
"""
cr_path = os.path.join(MODEL_OUTPUT_PATH, "mlmodel-cr.yaml")
with open(cr_path, "w") as f:
f.write(cr_yaml)
try:
result = subprocess.run(
["kubectl", "apply", "-f", cr_path],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
print(f" MLModel CR 创建成功: {MLMODEL_NAME}-{MODEL_VERSION}")
else:
print(f" MLModel CR 创建失败: {result.stderr.strip()}")
print(f" (CR YAML 已保存到 {cr_path},可手动 kubectl apply)")
except Exception as e:
print(f" kubectl 不可用,跳过 CR 创建: {e}")
print(f" CR YAML 已保存到 {cr_path}")
print("\n" + "=" * 60)
print("训练完成!")
print(f" 模型: {model_path}")
print(f" 类型: {best_name}")
print(f" R²: {best['r2']:.4f}")
print("=" * 60)