K8s Lab 把当前仓库文档整理成一个可阅读的网页站点

Repository Reading Site

mlmodel_controller.go

ml-platform/operator/controller/mlmodel_controller.go

Text Assetml-platform/operator/controller/mlmodel_controller.go8.2 KB2026年4月9日 14:01查看原始内容
package controller

import (
	"context"
	"fmt"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	mlv1alpha1 "github.com/ataime/k8s-lab/ml-platform/operator/api/v1alpha1"
)

// MLModelReconciler 调和器:监听 MLModel 变化,管理推理服务
type MLModelReconciler struct {
	client.Client
	Scheme         *runtime.Scheme
	InferenceImage string // 推理服务镜像地址
}

// Reconcile 是 Operator 的核心:确保实际状态 = 期望状态
func (r *MLModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	// 1. 获取 MLModel 资源
	var model mlv1alpha1.MLModel
	if err := r.Get(ctx, req.NamespacedName, &model); err != nil {
		if errors.IsNotFound(err) {
			logger.Info("MLModel 已删除,跳过")
			return ctrl.Result{}, nil
		}
		return ctrl.Result{}, err
	}

	logger.Info("开始调和", "name", model.Name, "version", model.Spec.Version)

	// 2. 调和 Deployment(推理服务)
	if err := r.reconcileDeployment(ctx, &model); err != nil {
		return r.updateStatus(ctx, &model, "Failed", fmt.Sprintf("Deployment 失败: %v", err))
	}

	// 3. 调和 Service
	if err := r.reconcileService(ctx, &model); err != nil {
		return r.updateStatus(ctx, &model, "Failed", fmt.Sprintf("Service 失败: %v", err))
	}

	// 4. 更新状态
	return r.updateStatusFromDeployment(ctx, &model)
}

// reconcileDeployment 确保推理 Deployment 存在且配置正确
func (r *MLModelReconciler) reconcileDeployment(ctx context.Context, model *mlv1alpha1.MLModel) error {
	logger := log.FromContext(ctx)
	deployName := model.Name + "-inference"

	// 构建期望的 Deployment
	labels := map[string]string{
		"app.kubernetes.io/name":      "ml-inference",
		"app.kubernetes.io/instance":  model.Name,
		"app.kubernetes.io/component": "inference",
		"app.kubernetes.io/version":   model.Spec.Version,
	}

	// 模型文件的目录路径(去掉文件名)
	modelDir := "/models"

	deploy := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      deployName,
			Namespace: model.Namespace,
			// Owner Reference: 当 MLModel 被删除时,自动删除 Deployment
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(model, mlv1alpha1.GroupVersion.WithKind("MLModel")),
			},
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: &model.Spec.Replicas,
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "inference",
							Image: r.InferenceImage,
							Ports: []corev1.ContainerPort{
								{Name: "http", ContainerPort: 8080},
							},
							Env: []corev1.EnvVar{
								{Name: "MODEL_DIR", Value: modelDir},
								{Name: "MODEL_VERSION", Value: model.Spec.Version},
								{Name: "PORT", Value: "8080"},
							},
							VolumeMounts: []corev1.VolumeMount{
								{Name: "models", MountPath: "/models", ReadOnly: true},
							},
							LivenessProbe: &corev1.Probe{
								ProbeHandler: corev1.ProbeHandler{
									HTTPGet: &corev1.HTTPGetAction{
										Path: "/health",
										Port: intstr.FromInt(8080),
									},
								},
								InitialDelaySeconds: 5,
								PeriodSeconds:       10,
							},
							ReadinessProbe: &corev1.Probe{
								ProbeHandler: corev1.ProbeHandler{
									HTTPGet: &corev1.HTTPGetAction{
										Path: "/ready",
										Port: intstr.FromInt(8080),
									},
								},
								InitialDelaySeconds: 3,
								PeriodSeconds:       5,
							},
						},
					},
					Volumes: []corev1.Volume{
						{
							Name: "models",
							VolumeSource: corev1.VolumeSource{
								PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
									ClaimName: "ml-models",
									ReadOnly:  true,
								},
							},
						},
					},
				},
			},
		},
	}

	// 检查 Deployment 是否已存在
	var existing appsv1.Deployment
	err := r.Get(ctx, types.NamespacedName{Name: deployName, Namespace: model.Namespace}, &existing)
	if errors.IsNotFound(err) {
		logger.Info("创建推理 Deployment", "name", deployName, "replicas", model.Spec.Replicas)
		return r.Create(ctx, deploy)
	}
	if err != nil {
		return err
	}

	// 已存在,检查是否需要更新
	if *existing.Spec.Replicas != model.Spec.Replicas ||
		existing.Spec.Template.Spec.Containers[0].Image != r.InferenceImage {
		logger.Info("更新推理 Deployment", "name", deployName)
		existing.Spec.Replicas = &model.Spec.Replicas
		existing.Spec.Template.Spec.Containers[0].Image = r.InferenceImage
		existing.Spec.Template.Spec.Containers[0].Env = deploy.Spec.Template.Spec.Containers[0].Env
		return r.Update(ctx, &existing)
	}

	return nil
}

// reconcileService 确保推理 Service 存在
func (r *MLModelReconciler) reconcileService(ctx context.Context, model *mlv1alpha1.MLModel) error {
	logger := log.FromContext(ctx)
	svcName := model.Name + "-svc"

	svc := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      svcName,
			Namespace: model.Namespace,
			Labels: map[string]string{
				"app.kubernetes.io/name":      "ml-inference",
				"app.kubernetes.io/instance":  model.Name,
				"app.kubernetes.io/component": "inference",
			},
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(model, mlv1alpha1.GroupVersion.WithKind("MLModel")),
			},
		},
		Spec: corev1.ServiceSpec{
			Selector: map[string]string{
				"app.kubernetes.io/name":     "ml-inference",
				"app.kubernetes.io/instance": model.Name,
			},
			Ports: []corev1.ServicePort{
				{
					Name:       "http",
					Port:       8080,
					TargetPort: intstr.FromInt(8080),
				},
			},
			Type: corev1.ServiceTypeClusterIP,
		},
	}

	var existing corev1.Service
	err := r.Get(ctx, types.NamespacedName{Name: svcName, Namespace: model.Namespace}, &existing)
	if errors.IsNotFound(err) {
		logger.Info("创建推理 Service", "name", svcName)
		return r.Create(ctx, svc)
	}
	return err
}

// updateStatusFromDeployment 从 Deployment 状态更新 MLModel 状态
func (r *MLModelReconciler) updateStatusFromDeployment(ctx context.Context, model *mlv1alpha1.MLModel) (ctrl.Result, error) {
	deployName := model.Name + "-inference"
	var deploy appsv1.Deployment
	err := r.Get(ctx, types.NamespacedName{Name: deployName, Namespace: model.Namespace}, &deploy)
	if err != nil {
		return r.updateStatus(ctx, model, "Deploying", "等待 Deployment 就绪")
	}

	phase := "Deploying"
	msg := fmt.Sprintf("就绪 %d/%d", deploy.Status.ReadyReplicas, model.Spec.Replicas)
	if deploy.Status.ReadyReplicas == model.Spec.Replicas {
		phase = "Serving"
		msg = "推理服务运行中"
	}

	model.Status.Phase = phase
	model.Status.ReadyReplicas = deploy.Status.ReadyReplicas
	model.Status.Endpoint = fmt.Sprintf("%s-svc.%s.svc:8080", model.Name, model.Namespace)
	model.Status.Message = msg
	now := metav1.NewTime(time.Now())
	model.Status.LastUpdated = &now

	if err := r.Status().Update(ctx, model); err != nil {
		return ctrl.Result{}, err
	}

	// 如果还没完全就绪,10 秒后再检查
	if phase != "Serving" {
		return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
	}
	return ctrl.Result{}, nil
}

func (r *MLModelReconciler) updateStatus(ctx context.Context, model *mlv1alpha1.MLModel, phase, msg string) (ctrl.Result, error) {
	model.Status.Phase = phase
	model.Status.Message = msg
	now := metav1.NewTime(time.Now())
	model.Status.LastUpdated = &now

	if err := r.Status().Update(ctx, model); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

// SetupWithManager 注册控制器到 Manager
func (r *MLModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&mlv1alpha1.MLModel{}).            // 监听 MLModel 变化
		Owns(&appsv1.Deployment{}).            // 也监听 Operator 创建的 Deployment 变化
		Owns(&corev1.Service{}).               // 也监听 Service 变化
		Complete(r)
}