K8s Lab 把当前仓库文档整理成一个可阅读的网页站点

Repository Reading Site

train.py

ml-platform/training/train.py

Text Assetml-platform/training/train.py10.0 KB2026年4月9日 15:17查看原始内容
"""
California Housing 房价预测模型训练脚本

完整流程:
1. 加载数据集 → 2. 特征工程 → 3. 训练多个模型 → 4. 评估选最优
→ 5. 导出 ONNX → 6. 存储到 NFS → 7. 创建 MLModel K8s 资源
"""

import os
import json
import time
import subprocess
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.pipeline import Pipeline

# ========== 配置 ==========
MODEL_VERSION = os.getenv("MODEL_VERSION", "v1")
MODEL_OUTPUT_PATH = os.getenv("MODEL_OUTPUT_PATH", "/models")
MLMODEL_NAME = os.getenv("MLMODEL_NAME", "housing-model")
MLMODEL_NAMESPACE = os.getenv("MLMODEL_NAMESPACE", "ml-platform")
CREATE_CR = os.getenv("CREATE_MLMODEL_CR", "true").lower() == "true"

print("=" * 60)
print("California Housing 房价预测 - 模型训练")
print("=" * 60)

# ========== 1. 加载数据集 ==========
print("\n[1/7] 加载 California Housing 数据集...")
housing = fetch_california_housing()
X, y = housing.data, housing.target

print(f"  样本数: {X.shape[0]}")
print(f"  特征数: {X.shape[1]}")
print(f"  特征名: {housing.feature_names}")
print(f"  目标: 房价中位数 (单位: $100,000)")
print(f"  目标范围: {y.min():.2f} ~ {y.max():.2f}")

# ========== 2. 数据探索 ==========
print("\n[2/7] 数据探索...")
print("  特征统计:")
for i, name in enumerate(housing.feature_names):
    col = X[:, i]
    print(f"    {name:12s}: min={col.min():10.2f}, max={col.max():10.2f}, "
          f"mean={col.mean():10.2f}, std={col.std():10.2f}")

# ========== 3. 数据预处理 ==========
print("\n[3/7] 数据预处理...")
# 划分训练集和测试集 (80% 训练, 20% 测试)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
print(f"  训练集: {X_train.shape[0]} 条")
print(f"  测试集: {X_test.shape[0]} 条")

# 特征标准化 (均值为0, 标准差为1)
# 为什么要标准化?
# - 线性回归: 特征尺度影响系数大小,标准化让系数可比较
# - 梯度下降: 不同尺度的特征导致损失函数等高线是椭圆形,收敛慢
# - 树模型: 对尺度不敏感,但统一处理更规范
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # fit + transform 训练集
X_test_scaled = scaler.transform(X_test)  # 只 transform 测试集 (用训练集的参数)
print("  标准化完成 (StandardScaler)")

# ========== 4. 训练多个模型 ==========
print("\n[4/7] 训练模型...")

models = {
    "LinearRegression": LinearRegression(),
    "RandomForest": RandomForestRegressor(
        n_estimators=100,  # 100 棵决策树
        max_depth=10,      # 限制深度防止过拟合
        random_state=42,
        n_jobs=-1          # 使用所有 CPU 核
    ),
    "GradientBoosting": GradientBoostingRegressor(
        n_estimators=100,  # 100 轮迭代
        max_depth=5,       # 每棵树浅一些 (boosting 靠数量不靠深度)
        learning_rate=0.1, # 学习率
        random_state=42
    ),
}

results = {}
for name, model in models.items():
    start = time.time()
    model.fit(X_train_scaled, y_train)
    train_time = time.time() - start

    y_pred = model.predict(X_test_scaled)
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)

    results[name] = {
        "model": model,
        "mae": mae,
        "rmse": rmse,
        "r2": r2,
        "train_time": train_time,
    }

    print(f"\n  {name}:")
    print(f"    训练时间: {train_time:.2f}s")
    print(f"    MAE  (平均绝对误差): {mae:.4f} (${mae * 100000:.0f})")
    print(f"    RMSE (均方根误差):   {rmse:.4f} (${rmse * 100000:.0f})")
    print(f"    R²   (决定系数):     {r2:.4f} ({r2 * 100:.1f}% 方差被解释)")

# ========== 5. 选择最优模型 ==========
print("\n[5/7] 选择最优模型...")
best_name = max(results, key=lambda k: results[k]["r2"])
best = results[best_name]
print(f"  最优模型: {best_name} (R² = {best['r2']:.4f})")

# ========== 6. 导出模型 ==========
print("\n[6/7] 导出模型...")
os.makedirs(MODEL_OUTPUT_PATH, exist_ok=True)

# 尝试导出 ONNX(可能对复杂模型失败)
try:
    pipeline = Pipeline([("scaler", scaler), ("model", best["model"])])
    initial_type = [("features", FloatTensorType([None, X.shape[1]]))]
    onnx_model = convert_sklearn(pipeline, initial_types=initial_type)
    model_filename = f"{MLMODEL_NAME}-{MODEL_VERSION}.onnx"
    model_path = os.path.join(MODEL_OUTPUT_PATH, model_filename)
    with open(model_path, "wb") as f:
        f.write(onnx_model.SerializeToString())
    print(f"  ONNX 模型已保存: {model_path} ({os.path.getsize(model_path) / 1024:.1f} KB)")
except Exception as e:
    print(f"  ONNX 导出失败 ({type(e).__name__}), 使用 Go JSON 格式")
    # 回退到线性回归的 ONNX
    try:
        lr_pipeline = Pipeline([("scaler", scaler), ("model", models["LinearRegression"])])
        initial_type = [("features", FloatTensorType([None, X.shape[1]]))]
        onnx_model = convert_sklearn(lr_pipeline, initial_types=initial_type)
        model_filename = f"{MLMODEL_NAME}-{MODEL_VERSION}.onnx"
        model_path = os.path.join(MODEL_OUTPUT_PATH, model_filename)
        with open(model_path, "wb") as f:
            f.write(onnx_model.SerializeToString())
        print(f"  LinearRegression ONNX 备选已保存: {model_path}")
    except Exception:
        print(f"  ONNX 完全不可用,仅使用 JSON 格式")

# 保存元数据
metadata = {
    "name": MLMODEL_NAME,
    "version": MODEL_VERSION,
    "model_type": best_name,
    "features": housing.feature_names,
    "target": "MedHouseVal",
    "metrics": {
        "mae": round(best["mae"], 4),
        "rmse": round(best["rmse"], 4),
        "r2_score": round(best["r2"], 4),
    },
    "training": {
        "dataset": "california_housing",
        "samples": X.shape[0],
        "train_size": X_train.shape[0],
        "test_size": X_test.shape[0],
        "train_time_seconds": round(best["train_time"], 2),
    },
    "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
meta_path = os.path.join(MODEL_OUTPUT_PATH, f"{MLMODEL_NAME}-{MODEL_VERSION}.json")
with open(meta_path, "w") as f:
    json.dump(metadata, f, indent=2)
print(f"  元数据已保存: {meta_path}")

# 同时导出 Go 推理服务所需的 JSON 参数文件
# 对于线性回归: 导出 weights + intercept + scaler 参数
# 对于树模型: 也训练一个线性回归作为备选,方便 Go 加载
lr_model = models["LinearRegression"]
lr_result = results["LinearRegression"]

# Go 推理服务用的模型参数
go_model_params = {
    "weights": lr_model.coef_.tolist(),
    "intercept": float(lr_model.intercept_),
}
go_model_path = os.path.join(MODEL_OUTPUT_PATH, "model_params.json")
with open(go_model_path, "w") as f:
    json.dump(go_model_params, f, indent=2)
print(f"  Go 模型参数: {go_model_path}")

# Scaler 参数
go_scaler_params = {
    "mean": scaler.mean_.tolist(),
    "std": scaler.scale_.tolist(),
}
go_scaler_path = os.path.join(MODEL_OUTPUT_PATH, "scaler_params.json")
with open(go_scaler_path, "w") as f:
    json.dump(go_scaler_params, f, indent=2)
print(f"  Go Scaler 参数: {go_scaler_path}")

# Go 推理用的元数据 (格式与 Go 代码匹配)
go_metadata = {
    "name": MLMODEL_NAME,
    "version": MODEL_VERSION,
    "model_type": "LinearRegression",
    "features": list(housing.feature_names),
    "metrics": {
        "mae": round(lr_result["mae"], 4),
        "rmse": round(lr_result["rmse"], 4),
        "r2_score": round(lr_result["r2"], 4),
    },
    "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
go_meta_path = os.path.join(MODEL_OUTPUT_PATH, "metadata.json")
with open(go_meta_path, "w") as f:
    json.dump(go_metadata, f, indent=2)
print(f"  Go 元数据: {go_meta_path}")

# ========== 7. 验证 ONNX 模型 ==========
print("\n[7/7] 验证 ONNX 模型...")
import onnxruntime as ort

session = ort.InferenceSession(model_path)
test_input = X_test[:3].astype(np.float32)
onnx_pred = session.run(None, {"features": test_input})[0]
sklearn_pred = pipeline.predict(test_input)

print("  ONNX vs sklearn 预测对比 (前 3 条):")
for i in range(3):
    print(f"    样本{i + 1}: ONNX={onnx_pred[i][0]:.4f}, sklearn={sklearn_pred[i]:.4f}, "
          f"实际={y_test[i]:.4f}")

max_diff = np.max(np.abs(onnx_pred.flatten() - sklearn_pred))
print(f"  最大偏差: {max_diff:.6f} {'✓ 通过' if max_diff < 0.01 else '✗ 偏差过大'}")

# ========== 8. 创建 MLModel K8s CRD (可选) ==========
if CREATE_CR:
    print("\n[bonus] 创建 MLModel K8s 资源...")
    cr_yaml = f"""apiVersion: ml.k8s-lab.io/v1alpha1
kind: MLModel
metadata:
  name: {MLMODEL_NAME}-{MODEL_VERSION}
  namespace: {MLMODEL_NAMESPACE}
spec:
  modelPath: /models/{model_filename}
  version: "{MODEL_VERSION}"
  replicas: 2
  modelType: "{best_name}"
  minReplicas: 1
  maxReplicas: 5
"""
    cr_path = os.path.join(MODEL_OUTPUT_PATH, "mlmodel-cr.yaml")
    with open(cr_path, "w") as f:
        f.write(cr_yaml)

    try:
        result = subprocess.run(
            ["kubectl", "apply", "-f", cr_path],
            capture_output=True, text=True, timeout=10
        )
        if result.returncode == 0:
            print(f"  MLModel CR 创建成功: {MLMODEL_NAME}-{MODEL_VERSION}")
        else:
            print(f"  MLModel CR 创建失败: {result.stderr.strip()}")
            print(f"  (CR YAML 已保存到 {cr_path},可手动 kubectl apply)")
    except Exception as e:
        print(f"  kubectl 不可用,跳过 CR 创建: {e}")
        print(f"  CR YAML 已保存到 {cr_path}")

print("\n" + "=" * 60)
print("训练完成!")
print(f"  模型: {model_path}")
print(f"  类型: {best_name}")
print(f"  R²:   {best['r2']:.4f}")
print("=" * 60)