Repository Reading Site
mlmodel_controller.go
ml-platform/operator/controller/mlmodel_controller.go
package controller
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
mlv1alpha1 "github.com/ataime/k8s-lab/ml-platform/operator/api/v1alpha1"
)
// MLModelReconciler 调和器:监听 MLModel 变化,管理推理服务
type MLModelReconciler struct {
client.Client
Scheme *runtime.Scheme
InferenceImage string // 推理服务镜像地址
}
// Reconcile 是 Operator 的核心:确保实际状态 = 期望状态
func (r *MLModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. 获取 MLModel 资源
var model mlv1alpha1.MLModel
if err := r.Get(ctx, req.NamespacedName, &model); err != nil {
if errors.IsNotFound(err) {
logger.Info("MLModel 已删除,跳过")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
logger.Info("开始调和", "name", model.Name, "version", model.Spec.Version)
// 2. 调和 Deployment(推理服务)
if err := r.reconcileDeployment(ctx, &model); err != nil {
return r.updateStatus(ctx, &model, "Failed", fmt.Sprintf("Deployment 失败: %v", err))
}
// 3. 调和 Service
if err := r.reconcileService(ctx, &model); err != nil {
return r.updateStatus(ctx, &model, "Failed", fmt.Sprintf("Service 失败: %v", err))
}
// 4. 更新状态
return r.updateStatusFromDeployment(ctx, &model)
}
// reconcileDeployment 确保推理 Deployment 存在且配置正确
func (r *MLModelReconciler) reconcileDeployment(ctx context.Context, model *mlv1alpha1.MLModel) error {
logger := log.FromContext(ctx)
deployName := model.Name + "-inference"
// 构建期望的 Deployment
labels := map[string]string{
"app.kubernetes.io/name": "ml-inference",
"app.kubernetes.io/instance": model.Name,
"app.kubernetes.io/component": "inference",
"app.kubernetes.io/version": model.Spec.Version,
}
// 模型文件的目录路径(去掉文件名)
modelDir := "/models"
deploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: deployName,
Namespace: model.Namespace,
// Owner Reference: 当 MLModel 被删除时,自动删除 Deployment
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(model, mlv1alpha1.GroupVersion.WithKind("MLModel")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: &model.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "inference",
Image: r.InferenceImage,
Ports: []corev1.ContainerPort{
{Name: "http", ContainerPort: 8080},
},
Env: []corev1.EnvVar{
{Name: "MODEL_DIR", Value: modelDir},
{Name: "MODEL_VERSION", Value: model.Spec.Version},
{Name: "PORT", Value: "8080"},
},
VolumeMounts: []corev1.VolumeMount{
{Name: "models", MountPath: "/models", ReadOnly: true},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt(8080),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/ready",
Port: intstr.FromInt(8080),
},
},
InitialDelaySeconds: 3,
PeriodSeconds: 5,
},
},
},
Volumes: []corev1.Volume{
{
Name: "models",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "ml-models",
ReadOnly: true,
},
},
},
},
},
},
},
}
// 检查 Deployment 是否已存在
var existing appsv1.Deployment
err := r.Get(ctx, types.NamespacedName{Name: deployName, Namespace: model.Namespace}, &existing)
if errors.IsNotFound(err) {
logger.Info("创建推理 Deployment", "name", deployName, "replicas", model.Spec.Replicas)
return r.Create(ctx, deploy)
}
if err != nil {
return err
}
// 已存在,检查是否需要更新
if *existing.Spec.Replicas != model.Spec.Replicas ||
existing.Spec.Template.Spec.Containers[0].Image != r.InferenceImage {
logger.Info("更新推理 Deployment", "name", deployName)
existing.Spec.Replicas = &model.Spec.Replicas
existing.Spec.Template.Spec.Containers[0].Image = r.InferenceImage
existing.Spec.Template.Spec.Containers[0].Env = deploy.Spec.Template.Spec.Containers[0].Env
return r.Update(ctx, &existing)
}
return nil
}
// reconcileService 确保推理 Service 存在
func (r *MLModelReconciler) reconcileService(ctx context.Context, model *mlv1alpha1.MLModel) error {
logger := log.FromContext(ctx)
svcName := model.Name + "-svc"
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Namespace: model.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "ml-inference",
"app.kubernetes.io/instance": model.Name,
"app.kubernetes.io/component": "inference",
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(model, mlv1alpha1.GroupVersion.WithKind("MLModel")),
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app.kubernetes.io/name": "ml-inference",
"app.kubernetes.io/instance": model.Name,
},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 8080,
TargetPort: intstr.FromInt(8080),
},
},
Type: corev1.ServiceTypeClusterIP,
},
}
var existing corev1.Service
err := r.Get(ctx, types.NamespacedName{Name: svcName, Namespace: model.Namespace}, &existing)
if errors.IsNotFound(err) {
logger.Info("创建推理 Service", "name", svcName)
return r.Create(ctx, svc)
}
return err
}
// updateStatusFromDeployment 从 Deployment 状态更新 MLModel 状态
func (r *MLModelReconciler) updateStatusFromDeployment(ctx context.Context, model *mlv1alpha1.MLModel) (ctrl.Result, error) {
deployName := model.Name + "-inference"
var deploy appsv1.Deployment
err := r.Get(ctx, types.NamespacedName{Name: deployName, Namespace: model.Namespace}, &deploy)
if err != nil {
return r.updateStatus(ctx, model, "Deploying", "等待 Deployment 就绪")
}
phase := "Deploying"
msg := fmt.Sprintf("就绪 %d/%d", deploy.Status.ReadyReplicas, model.Spec.Replicas)
if deploy.Status.ReadyReplicas == model.Spec.Replicas {
phase = "Serving"
msg = "推理服务运行中"
}
model.Status.Phase = phase
model.Status.ReadyReplicas = deploy.Status.ReadyReplicas
model.Status.Endpoint = fmt.Sprintf("%s-svc.%s.svc:8080", model.Name, model.Namespace)
model.Status.Message = msg
now := metav1.NewTime(time.Now())
model.Status.LastUpdated = &now
if err := r.Status().Update(ctx, model); err != nil {
return ctrl.Result{}, err
}
// 如果还没完全就绪,10 秒后再检查
if phase != "Serving" {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
return ctrl.Result{}, nil
}
func (r *MLModelReconciler) updateStatus(ctx context.Context, model *mlv1alpha1.MLModel, phase, msg string) (ctrl.Result, error) {
model.Status.Phase = phase
model.Status.Message = msg
now := metav1.NewTime(time.Now())
model.Status.LastUpdated = &now
if err := r.Status().Update(ctx, model); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// SetupWithManager 注册控制器到 Manager
func (r *MLModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mlv1alpha1.MLModel{}). // 监听 MLModel 变化
Owns(&appsv1.Deployment{}). // 也监听 Operator 创建的 Deployment 变化
Owns(&corev1.Service{}). // 也监听 Service 变化
Complete(r)
}