funcNewDeploymentController(ctxcontext.Context,dInformerappsinformers.DeploymentInformer,rsInformerappsinformers.ReplicaSetInformer,podInformercoreinformers.PodInformer,clientclientset.Interface)(*DeploymentController,error){...// 实例化 `DeploymentController` 资源对象dc:=&DeploymentController{client:client,eventBroadcaster:eventBroadcaster,eventRecorder:eventBroadcaster.NewRecorder(scheme.Scheme,v1.EventSource{Component:"deployment-controller"}),queue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"deployment"),}dc.rsControl=controller.RealRSControl{KubeClient:client,Recorder:dc.eventRecorder,}...// 监听Deployment资源的增删改事件变更dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){dc.addDeployment(logger,obj)},UpdateFunc:func(oldObj,newObjinterface{}){dc.updateDeployment(logger,oldObj,newObj)},// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc:func(objinterface{}){dc.deleteDeployment(logger,obj)},})// 监听ReplicaSet资源的增删改事件变更rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){dc.addReplicaSet(logger,obj)},UpdateFunc:func(oldObj,newObjinterface{}){dc.updateReplicaSet(logger,oldObj,newObj)},DeleteFunc:func(objinterface{}){dc.deleteReplicaSet(logger,obj)},})// 监听Pod资源的删除事件变更podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc:func(objinterface{}){dc.deletePod(logger,obj)},})// 核心逻辑, 处理队列里的数据dc.syncHandler=dc.syncDeploymentdc.enqueueDeployment=dc.enqueuedc.dLister=dInformer.Lister()dc.rsLister=rsInformer.Lister()dc.podLister=podInformer.Lister()dc.dListerSynced=dInformer.Informer().HasSynceddc.rsListerSynced=rsInformer.Informer().HasSynceddc.podListerSynced=podInformer.Informer().HasSyncedreturndc,nil}
func(dc*DeploymentController)syncDeployment(ctxcontext.Context,keystring)error{logger:=klog.FromContext(ctx)// 从 `key` 中解析出 `namespace` 和 `name` 信息namespace,name,err:=cache.SplitMetaNamespaceKey(key)iferr!=nil{logger.Error(err,"Failed to split meta namespace cache key","cacheKey",key)returnerr}startTime:=time.Now()logger.V(4).Info("Started syncing deployment","deployment",klog.KRef(namespace,name),"startTime",startTime)deferfunc(){logger.V(4).Info("Finished syncing deployment","deployment",klog.KRef(namespace,name),"duration",time.Since(startTime))}()// 从 informer cache 中获取指定 `namespace` 和 `name` 的 `deployment` 对象deployment,err:=dc.dLister.Deployments(namespace).Get(name)iferrors.IsNotFound(err){logger.V(2).Info("Deployment has been deleted","deployment",klog.KRef(namespace,name))returnnil}iferr!=nil{returnerr}// Deep-copy otherwise we are mutating our cache.// TODO: Deep-copy only when needed.// 拷贝 `deployment` 对象d:=deployment.DeepCopy()// 判断 `Selector` 是否为空everything:=metav1.LabelSelector{}ifreflect.DeepEqual(d.Spec.Selector,&everything){dc.eventRecorder.Eventf(d,v1.EventTypeWarning,"SelectingAll","This deployment is selecting all pods. A non-empty selector is required.")ifd.Status.ObservedGeneration<d.Generation{d.Status.ObservedGeneration=d.Generationdc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx,d,metav1.UpdateOptions{})}returnnil}// List ReplicaSets owned by this Deployment, while reconciling ControllerRef// through adoption/orphaning.// 获取 deployment 对应的所有 rs, 通过 LabelSelector 进行匹配rsList,err:=dc.getReplicaSetsForDeployment(ctx,d)iferr!=nil{returnerr}// List all Pods owned by this Deployment, grouped by their ReplicaSet.// Current uses of the podMap are://// * check if a Pod is labeled correctly with the pod-template-hash label.// * check that no old Pods are running in the middle of Recreate Deployments.// 获取当前 Deployment 对象关联的 podpodMap,err:=dc.getPodMapForDeployment(d,rsList)iferr!=nil{returnerr}// 如果该 deployment 处于删除状态,则更新其 statusifd.DeletionTimestamp!=nil{returndc.syncStatusOnly(ctx,d,rsList)}// Update deployment conditions with an Unknown condition when pausing/resuming// a deployment. In this way, we can be sure that we won't timeout when a user// resumes a Deployment with a set progressDeadlineSeconds.// 检查 pause 状态iferr=dc.checkPausedConditions(ctx,d);err!=nil{returnerr}// 如果是 pause 状态则进行 sync 同步.ifd.Spec.Paused{returndc.sync(ctx,d,rsList)}// rollback is not re-entrant in case the underlying replica sets are updated with a new// revision so we should ensure that we won't proceed to update replica sets until we// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.// 检查是否为回滚操作ifgetRollbackTo(d)!=nil{returndc.rollback(ctx,d,rsList)}// 检查 deployment 是否处于 scale 状态scalingEvent,err:=dc.isScalingEvent(ctx,d,rsList)iferr!=nil{returnerr}ifscalingEvent{returndc.sync(ctx,d,rsList)}// 更新操作switchd.Spec.Strategy.Type{caseapps.RecreateDeploymentStrategyType:// 重建模式returndc.rolloutRecreate(ctx,d,rsList,podMap)caseapps.RollingUpdateDeploymentStrategyType:// 滚动更新模式returndc.rolloutRolling(ctx,d,rsList)}returnfmt.Errorf("unexpected deployment strategy type: %s",d.Spec.Strategy.Type)}
func(dc*DeploymentController)sync(ctxcontext.Context,d*apps.Deployment,rsList[]*apps.ReplicaSet)error{newRS,oldRSs,err:=dc.getAllReplicaSetsAndSyncRevision(ctx,d,rsList,false)iferr!=nil{returnerr}// `Scale` 扩缩容iferr:=dc.scale(ctx,d,newRS,oldRSs);err!=nil{// If we get an error while trying to scale, the deployment will be requeued// so we can abort this resyncreturnerr}// Clean up the deployment when it's paused and no rollback is in flight.ifd.Spec.Paused&&getRollbackTo(d)==nil{iferr:=dc.cleanupDeployment(ctx,oldRSs,d);err!=nil{returnerr}}allRSs:=append(oldRSs,newRS)// 更新状态returndc.syncDeploymentStatus(ctx,allRSs,newRS,d)}
func(dc*DeploymentController)scale(ctxcontext.Context,deployment*apps.Deployment,newRS*apps.ReplicaSet,oldRSs[]*apps.ReplicaSet)error{// If there is only one active replica set then we should scale that up to the full count of the// deployment. If there is no active replica set, then we should scale up the newest replica set.//返回唯一的活动或最新副本集(如果最多有一个活动副本集)//副本集。如果有更多的活动副本集,那么我们应该按比例缩放它们。ifactiveOrLatest:=deploymentutil.FindActiveOrLatest(newRS,oldRSs);activeOrLatest!=nil{if*(activeOrLatest.Spec.Replicas)==*(deployment.Spec.Replicas){returnnil}_,_,err:=dc.scaleReplicaSetAndRecordEvent(ctx,activeOrLatest,*(deployment.Spec.Replicas),deployment)returnerr}// If the new replica set is saturated, old replica sets should be fully scaled down.// This case handles replica set adoption during a saturated new replica set.//通过将新副本集的大小与其部署大小进行比较来检查新副本集是否饱和。//部署和副本集都必须相信该副本集可以拥有所有所需的`Replicas`,// 可以通过注解"deployment.kubernetes.io/desired-replicas"来实现。//ReplicaSet 的所有 pod需要可用。ifdeploymentutil.IsSaturated(deployment,newRS){for_,old:=rangecontroller.FilterActiveReplicaSets(oldRSs){if_,_,err:=dc.scaleReplicaSetAndRecordEvent(ctx,old,0,deployment);err!=nil{returnerr}}returnnil}// There are old replica sets with pods and the new replica set is not saturated.// We need to proportionally scale all replica sets (new and old) in case of a// rolling deployment.// 如果 `Deployment` 有配置滚动更新策略, 那么就需要按照策略去对 `rs` 进行扩缩容ifdeploymentutil.IsRollingUpdate(deployment){allRSs:=controller.FilterActiveReplicaSets(append(oldRSs,newRS))allRSsReplicas:=deploymentutil.GetReplicaCountForReplicaSets(allRSs)// 计算最大可以创建出的 pod 数allowedSize:=int32(0)if*(deployment.Spec.Replicas)>0{// 预期的副本数加上 maxSurge 为最大允许数allowedSize=*(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)}// Number of additional replicas that can be either added or removed from the total// replicas count. These replicas should be distributed proportionally to the active// replica sets.// 计算需要扩容的 pod 数deploymentReplicasToAdd:=allowedSize-allRSsReplicas// The additional replicas should be distributed proportionally amongst the active// replica sets from the larger to the smaller in size replica set. Scaling direction// drives what happens in case we are trying to scale replica sets of the same size.// In such a case when scaling up, we should scale up newer replica sets first, and// when scaling down, we should scale down older replica sets first.varscalingOperationstringswitch{casedeploymentReplicasToAdd>0:// 若需要添加的副本数大于 0, 按照新旧进行排序,把新的 rs 放到前面, 这样对较新的 rs 扩容更多的 podsort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))// 扩容操作scalingOperation="up"casedeploymentReplicasToAdd<0:// 若需要添加的副本数大于 0, 按照新旧进行排序,旧的`RS`放前面, 这样可以删除一些较旧的 podsort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))// 缩容操作scalingOperation="down"}// Iterate over all active replica sets and estimate proportions for each of them.// The absolute value of deploymentReplicasAdded should never exceed the absolute// value of deploymentReplicasToAdd.deploymentReplicasAdded:=int32(0)nameToSize:=make(map[string]int32)logger:=klog.FromContext(ctx)// 遍历所有的 rs, 计算每个 rs 需要扩容或者缩容到的期望副本数fori:=rangeallRSs{rs:=allRSs[i]// Estimate proportions if we have replicas to add, otherwise simply populate// nameToSize with the current sizes for each replica set.ifdeploymentReplicasToAdd!=0{// 估算出 rs 需要扩容或者缩容的副本数proportion:=deploymentutil.GetProportion(logger,rs,*deployment,deploymentReplicasToAdd,deploymentReplicasAdded)// 把计算出来的 `proportion` 累加到 `added`nameToSize[rs.Name]=*(rs.Spec.Replicas)+proportiondeploymentReplicasAdded+=proportion}else{nameToSize[rs.Name]=*(rs.Spec.Replicas)}}// Update all replica sets// 遍历所有的 rs, 第一个最活跃的 rs.Spec.Replicas 加上上面循环中计算出// 其他 rs 要加或者减的副本数,然后更新所有 rs 的 rs.Spec.Replicasfori:=rangeallRSs{rs:=allRSs[i]// Add/remove any leftovers to the largest replica set.ifi==0&&deploymentReplicasToAdd!=0{leftover:=deploymentReplicasToAdd-deploymentReplicasAddednameToSize[rs.Name]=nameToSize[rs.Name]+leftoverifnameToSize[rs.Name]<0{nameToSize[rs.Name]=0}}// TODO: Use transactions when we have them.// 按照扩容的数量去进行扩缩容if_,_,err:=dc.scaleReplicaSet(ctx,rs,nameToSize[rs.Name],deployment,scalingOperation);err!=nil{// Return as soon as we fail, the deployment is requeuedreturnerr}}}returnnil}
// rolloutRecreate implements the logic for recreating a replica set.func(dc*DeploymentController)rolloutRecreate(ctxcontext.Context,d*apps.Deployment,rsList[]*apps.ReplicaSet,podMapmap[types.UID][]*v1.Pod)error{// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.newRS,oldRSs,err:=dc.getAllReplicaSetsAndSyncRevision(ctx,d,rsList,false)iferr!=nil{returnerr}allRSs:=append(oldRSs,newRS)activeOldRSs:=controller.FilterActiveReplicaSets(oldRSs)// scale down old replica sets.// 缩容 oldRSscaledDown,err:=dc.scaleDownOldReplicaSetsForRecreate(ctx,activeOldRSs,d)iferr!=nil{returnerr}ifscaledDown{// Update DeploymentStatus.returndc.syncRolloutStatus(ctx,allRSs,newRS,d)}// Do not process a deployment when it has old pods running.ifoldPodsRunning(newRS,oldRSs,podMap){returndc.syncRolloutStatus(ctx,allRSs,newRS,d)}// If we need to create a new RS, create it now.// 创建 `newRS`ifnewRS==nil{newRS,oldRSs,err=dc.getAllReplicaSetsAndSyncRevision(ctx,d,rsList,true)iferr!=nil{returnerr}allRSs=append(oldRSs,newRS)}// scale up new replica set.// 扩容`newRS`if_,err:=dc.scaleUpNewReplicaSetForRecreate(ctx,newRS,d);err!=nil{returnerr}// 清理过期的`RS`ifutil.DeploymentComplete(d,&d.Status){iferr:=dc.cleanupDeployment(ctx,oldRSs,d);err!=nil{returnerr}}// Sync deployment status.// 同步`Deployment`状态returndc.syncRolloutStatus(ctx,allRSs,newRS,d)}
func(dc*DeploymentController)reconcileOldReplicaSets(ctxcontext.Context,allRSs[]*apps.ReplicaSet,oldRSs[]*apps.ReplicaSet,newRS*apps.ReplicaSet,deployment*apps.Deployment)(bool,error){logger:=klog.FromContext(ctx)// 计算 oldPodsCountoldPodsCount:=deploymentutil.GetReplicaCountForReplicaSets(oldRSs)ifoldPodsCount==0{// Can't scale down furtherreturnfalse,nil}// 计算 allPodsCountallPodsCount:=deploymentutil.GetReplicaCountForReplicaSets(allRSs)logger.V(4).Info("New replica set","replicaSet",klog.KObj(newRS),"availableReplicas",newRS.Status.AvailableReplicas)maxUnavailable:=deploymentutil.MaxUnavailable(*deployment)// Check if we can scale down. We can scale down in the following 2 cases:// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further// increase unavailability.// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.//// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further// step(that will increase unavailability).//// Concrete example://// * 10 replicas// * 2 maxUnavailable (absolute number, not percent)// * 3 maxSurge (absolute number, not percent)//// case 1:// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.// * The new replica set pods crashloop and never become available.// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.// * The user notices the crashloop and does kubectl rollout undo to rollback.// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.// * The total number of pods will then be 9 and the newRS can be scaled up to 10.//// case 2:// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then// allow the new replica set to be scaled up by 5.// 计算 maxScaledDownminAvailable:=*(deployment.Spec.Replicas)-maxUnavailablenewRSUnavailablePodCount:=*(newRS.Spec.Replicas)-newRS.Status.AvailableReplicasmaxScaledDown:=allPodsCount-minAvailable-newRSUnavailablePodCountifmaxScaledDown<=0{returnfalse,nil}// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737// 清理异常 RSoldRSs,cleanupCount,err:=dc.cleanupUnhealthyReplicas(ctx,oldRSs,deployment,maxScaledDown)iferr!=nil{returnfalse,nil}logger.V(4).Info("Cleaned up unhealthy replicas from old RSes","count",cleanupCount)// Scale down old replica sets, need check maxUnavailable to ensure we can scale downallRSs=append(oldRSs,newRS)// 缩容旧的 RSscaledDownCount,err:=dc.scaleDownOldReplicaSetsForRollingUpdate(ctx,allRSs,oldRSs,deployment)iferr!=nil{returnfalse,nil}logger.V(4).Info("Scaled down old RSes","deployment",klog.KObj(deployment),"count",scaledDownCount)totalScaledDown:=cleanupCount+scaledDownCountreturntotalScaledDown>0,nil}