这一节终于来到了我们最为熟悉的一个对象 deployment,通常这可能是我们学习 k8s 接触的第一个大对象了,我们一般的应用也是以 deployment 来进行部署的,那么对于熟悉的它来说,我们应该从源码里面去找什么目标来看呢?对于我来说,deployment 的更新是我最好奇的,在我重新修改镜像版本之后,deployment 是如何一步步控制已有的 pod 进行更新的呢?这一节我们就从源码中揭秘这个过程。
前置知识
deployment 的基础使用
滚动更新
心路历程
在我看来其他的属性与 pod 类似,而 deployment 作为一个 pod 的集合。那,为什么 deployment 要让 pod 的有多个副本呢?从最初的角度角度来说肯定是高可用了,所以 deployment 中最为关键的就是对 pod 的控制,也就是当 pod 的数量变化的时候,它是如何操作的。
// vendor/k8s.io/api/apps/v1/types.go:355 type Deployment struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata // +optional metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of the Deployment. // +optional Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Most recently observed status of the Deployment. // +optional Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` }
// pkg/controller/deployment/deployment_controller.go:66 type DeploymentController struct { // rsControl is used for adopting/releasing replica sets. rsControl controller.RSControlInterface client clientset.Interface
// To allow injection of syncDeployment for testing. syncHandler func(ctx context.Context, dKey string)error // used for unit testing enqueueDeployment func(deployment *apps.Deployment)
// dLister can list/get deployments from the shared informer's store dLister appslisters.DeploymentLister // rsLister can list/get replica sets from the shared informer's store rsLister appslisters.ReplicaSetLister // podLister can list/get pods from the shared informer's store podLister corelisters.PodLister
// dListerSynced returns true if the Deployment store has been synced at least once. // Added as a member to the struct to allow injection for testing. dListerSynced cache.InformerSynced // rsListerSynced returns true if the ReplicaSet store has been synced at least once. // Added as a member to the struct to allow injection for testing. rsListerSynced cache.InformerSynced // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced
// Deployments that need to be synced queue workqueue.RateLimitingInterface }
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { dc.addDeployment(logger, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { dc.updateDeployment(logger, oldObj, newObj) }, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: func(obj interface{}) { dc.deleteDeployment(logger, obj) }, }) //....
deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) returnnil } if err != nil { return err }
// Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy()
//...
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(ctx, d) if err != nil { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err }
//...
if d.Spec.Paused { return dc.sync(ctx, d, rsList) }
// rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if getRollbackTo(d) != nil { return dc.rollback(ctx, d, rsList) }
// pkg/controller/deployment/rolling.go:31 // rolloutRolling implements the logic for rolling a new replica set. func(dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true) if err != nil { return err } allRSs := append(oldRSs, newRS)
// Scale up, if we can. scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d) if err != nil { return err } if scaledUp { // Update DeploymentStatus return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
// Scale down, if we can. scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
if deploymentutil.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } }
// Sync deployment status return dc.syncRolloutStatus(ctx, allRSs, newRS, d) }
步骤其实比我想的要简单:
得到新旧的 RS 进行比较
先看要不要扩副本数,如果要则直接扩,并且就直接 sync 了不继续了
然后才轮到缩容,能操作就直接操作了。
然后,我们来回忆一下 pod 数量在实际更新中的变动过程,如果目前的 pod 是 3/3(目标/现有),那么扩容之后就会变成 3/4,此时下一次进来就不能扩了,只能变成缩了变成 3/3 然后不断往复,直到所以 pod 都满足期望要求的版本。想想真的蛮奇妙的,就是利用了简单的状态管理就实现了整个滚动更新过程,慢慢的就靠近了目标。这可能就是状态机的优雅吧,你只管改状态,剩下的协调交给我。
码后解答
deployment 是由哪个对象控制的?
DeploymentController
应用更新的时候 deployment 是如何控制更新过程的?
关键其实就在于:rolloutRolling ,将目标态的 pod 添加,打破平衡(状态变化),将不满足的旧状态移除,从而慢慢协调到最终状态。再说的简单一点:先尝试 scaledUp 然后尝试 scaledDown