03-Kube-Controller-Manager源码分析(Deployment控制器)

本文基于1.29.0版本

kube-controller-manager

pkg/controller目录结构,包含各控制器的具体实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pkg/controller
├── OWNERS
├── apis
├── bootstrap
├── certificates
├── clusterroleaggregation
├── controller_ref_manager.go
├── controller_utils.go
├── cronjob
├── daemon
├── deployment
├── disruption
├── doc.go
├── endpoint
├── endpointslice
├── endpointslicemirroring
├── garbagecollector
├── history
├── job
├── namespace
├── nodeipam
├── nodelifecycle
├── podautoscaler
├── podgc
├── replicaset
├── replication
├── resourceclaim
├── resourcequota
├── serviceaccount
├── servicecidrs
├── statefulset
├── storageversiongc
├── tainteviction
├── ttl
├── ttlafterfinished
├── util
├── validatingadmissionpolicystatus
└── volume
34 directories, 6 files

cmd/kube-controller-manager/app/apps.go

newDeploymentControllerDescriptor

1
2
3
4
5
6
7
func newDeploymentControllerDescriptor() *ControllerDescriptor {
 return &ControllerDescriptor{
  name:     names.DeploymentController,
  aliases:  []string{"deployment"},
  initFunc: startDeploymentController,
 }
}

startDeploymentController

startDeploymentController 里实例化 deployment controller 控制器对象, 并且启动控制器.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func startDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
 dc, err := deployment.NewDeploymentController(
  ctx,
  controllerContext.InformerFactory.Apps().V1().Deployments(),
  controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
  controllerContext.InformerFactory.Core().V1().Pods(),
  controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
 )
 if err != nil {
  return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
 }
 go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
 return nil, true, nil
}

NewDeploymentController

DeploymentController 控制器内通过 informer 监听 deployment, replicaset, pod 三个资源的事件.

pkg/controller/deployment/deployment_controller.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
 ...
    // 实例化 `DeploymentController` 资源对象
 dc := &DeploymentController{
  client:           client,
  eventBroadcaster: eventBroadcaster,
  eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
  queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
 }
 dc.rsControl = controller.RealRSControl{
  KubeClient: client,
  Recorder:   dc.eventRecorder,
 }
    ...
    //  监听Deployment资源的增删改事件变更
 dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc: func(obj interface{}) {
   dc.addDeployment(logger, obj)
  },
  UpdateFunc: func(oldObj, newObj interface{}) {
   dc.updateDeployment(logger, oldObj, newObj)
  },
  // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
  DeleteFunc: func(obj interface{}) {
   dc.deleteDeployment(logger, obj)
  },
 })
    // 监听ReplicaSet资源的增删改事件变更
 rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc: func(obj interface{}) {
   dc.addReplicaSet(logger, obj)
  },
  UpdateFunc: func(oldObj, newObj interface{}) {
   dc.updateReplicaSet(logger, oldObj, newObj)
  },
  DeleteFunc: func(obj interface{}) {
   dc.deleteReplicaSet(logger, obj)
  },
 })
    // 监听Pod资源的删除事件变更
 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  DeleteFunc: func(obj interface{}) {
   dc.deletePod(logger, obj)
  },
 })

//  核心逻辑, 处理队列里的数据
 dc.syncHandler = dc.syncDeployment
 dc.enqueueDeployment = dc.enqueue

 dc.dLister = dInformer.Lister()
 dc.rsLister = rsInformer.Lister()
 dc.podLister = podInformer.Lister()
 dc.dListerSynced = dInformer.Informer().HasSynced
 dc.rsListerSynced = rsInformer.Informer().HasSynced
 dc.podListerSynced = podInformer.Informer().HasSynced
 return dc, nil
}

Run

Run 启动控制器, 阻塞等待 informer 的缓存同步完毕, 启动 workers 数量的 worker 协程, 默认为 5 个.

worker 循环的从队列获取数据, 然后交给 syncHandler 处理. 队列里的数据是由 informer 注册的 eventHandler 写入的.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
 ...
 // 等待 `informer cache` 更新完毕
 if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
  return
 }

// 默认启动 5 个 `worker`
 for i := 0; i < workers; i++ {
  go wait.UntilWithContext(ctx, dc.worker, time.Second)
 }

 <-ctx.Done()
}

func (dc *DeploymentController) worker(ctx context.Context) {
 for dc.processNextWorkItem(ctx) {
 }
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
 // 从队列获取任务, 拿不到任务就阻塞在条件变量上.
 key, quit := dc.queue.Get()
 if quit {
  return false
 }
 defer dc.queue.Done(key)
 // 调用 `syncHandler` 处理
 err := dc.syncHandler(ctx, key.(string))
 dc.handleErr(ctx, err, key)

 return true
}

核心方法 syncDeployment

syncDeployment 在每个阶段都定义了相关操作, syncStatusOnly 处理删除操作, sync 处理状态为 pause 暂停的操作, rollback 处理回滚版本的操作, 最后由 rolloutRecreaterolloutRolling 实现升级操作

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
 logger := klog.FromContext(ctx)
 // 从 `key` 中解析出 `namespace` 和 `name` 信息
 namespace, name, err := cache.SplitMetaNamespaceKey(key)
 if err != nil {
  logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
  return err
 }

 startTime := time.Now()
 logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
 defer func() {
  logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
 }()

 // 从 informer cache 中获取指定 `namespace` 和 `name` 的 `deployment` 对象
 deployment, err := dc.dLister.Deployments(namespace).Get(name)
 if errors.IsNotFound(err) {
  logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
  return nil
 }
 if err != nil {
  return err
 }

 // Deep-copy otherwise we are mutating our cache.
 // TODO: Deep-copy only when needed.
 // 拷贝 `deployment` 对象
 d := deployment.DeepCopy()

 // 判断 `Selector` 是否为空
 everything := metav1.LabelSelector{}
 if reflect.DeepEqual(d.Spec.Selector, &everything) {
  dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
  if d.Status.ObservedGeneration < d.Generation {
   d.Status.ObservedGeneration = d.Generation
   dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
  }
  return nil
 }

 // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
 // through adoption/orphaning.
 // 获取 deployment 对应的所有 rs, 通过 LabelSelector 进行匹配
 rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
 if err != nil {
  return err
 }
 // List all Pods owned by this Deployment, grouped by their ReplicaSet.
 // Current uses of the podMap are:
 //
 // * check if a Pod is labeled correctly with the pod-template-hash label.
 // * check that no old Pods are running in the middle of Recreate Deployments.
 // 获取当前 Deployment 对象关联的 pod
 podMap, err := dc.getPodMapForDeployment(d, rsList)
 if err != nil {
  return err
 }

 // 如果该 deployment 处于删除状态,则更新其 status
 if d.DeletionTimestamp != nil {
  return dc.syncStatusOnly(ctx, d, rsList)
 }

 // Update deployment conditions with an Unknown condition when pausing/resuming
 // a deployment. In this way, we can be sure that we won't timeout when a user
 // resumes a Deployment with a set progressDeadlineSeconds.
 // 检查 pause 状态
 if err = dc.checkPausedConditions(ctx, d); err != nil {
  return err
 }

 // 如果是 pause 状态则进行 sync 同步.
 if d.Spec.Paused {
  return dc.sync(ctx, d, rsList)
 }

 // rollback is not re-entrant in case the underlying replica sets are updated with a new
 // revision so we should ensure that we won't proceed to update replica sets until we
 // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
 // 检查是否为回滚操作
 if getRollbackTo(d) != nil {
  return dc.rollback(ctx, d, rsList)
 }
 // 检查 deployment 是否处于 scale 状态
 scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
 if err != nil {
  return err
 }
 if scalingEvent {
  return dc.sync(ctx, d, rsList)
 }

 // 更新操作
 switch d.Spec.Strategy.Type {
 case apps.RecreateDeploymentStrategyType:
  // 重建模式
  return dc.rolloutRecreate(ctx, d, rsList, podMap)
 case apps.RollingUpdateDeploymentStrategyType:
  // 滚动更新模式
  return dc.rolloutRolling(ctx, d, rsList)
 }
 return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

删除 Deployment 操作

DeletionTimestamp 不为 nil时,就意味着需要删除该 deployment,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
 if err != nil {
  return err
 }

 allRSs := append(oldRSs, newRS)
 return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
 newStatus := calculateStatus(allRSs, newRS, d)

 if reflect.DeepEqual(d.Status, newStatus) {
  return nil
 }

 newDeployment := d
 newDeployment.Status = newStatus
 _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
 return err
}
  1. 获取此部署目标的所有旧RS,并计算其中的最大修订号(maxOldV)。
  2. 获取此部署目标的新 RS(其 Pod 模板与部署的匹配),并将新 RS 的修订号更新为 (maxOldV + 1),仅当其修订号小于 (maxOldV + 1) 时。如果此步骤失败,我们将在下一个部署同步循环中更新它。
  3. 将新 RS 的修订号复制到部署的状态中(更新部署的修订版)。如果此步骤失败,我们将在下一个部署同步循环中更新它。
  4. syncDeploymentStatus 通过 newRSallRSs 检查 Deployment 状态是否是最新,如果有差异则更新Deployment的状态。

注意: 真正的删除 deployment, rs, pods 操作是放在垃圾回收控制器 garbagecollector controller 完成的. 在其他控制里的删除只是配置 DeletionTimestamp 字段,并标记 orphan、background 或者 foreground 删除标签.

扩缩容 Deployment

当执行 scale 操作时,首先会通过 isScalingEvent 方法判断是否为扩缩容操作,然后通过 dc.sync 方法来执行实际的扩缩容动作。

  1. 获取所有的 rs
  2. 过滤出 activeRS, rs.Spec.Replicas > 0 的为 activeRS
  3. 判断 rs 的 desired 值是否等于 deployment.Spec.Replicas, 若不等于则需要为 rs 进行 scale 操作.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
 // 获取新旧所有 `RS` 对象
 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
 if err != nil {
  return false, err
 }
 allRSs := append(oldRSs, newRS)
 logger := klog.FromContext(ctx)
 for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
  desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
  if !ok {
   continue
  }
  // 如果不是 `Replicas` 结构,则需要扩锁容
  if desired != *(d.Spec.Replicas) {
   return true, nil
  }
 }
 return false, nil
}

获取新旧两个 replicaset 副本集, newRs 是预期的, oldRss 为当前的存在副本集, 调用 scale 扩缩容方法, 最后同步当前的状态.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
 if err != nil {
  return err
 }
 // `Scale` 扩缩容
 if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
  // If we get an error while trying to scale, the deployment will be requeued
  // so we can abort this resync
  return err
 }

 // Clean up the deployment when it's paused and no rollback is in flight.
 if d.Spec.Paused && getRollbackTo(d) == nil {
  if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
   return err
  }
 }

 allRSs := append(oldRSs, newRS)
 // 更新状态
 return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

scale 方法首先求出需要扩缩容的 pod 数量, 按照策略对 rs 数组进行新旧排序, 为了让每个 rs 都扩缩点, 经过一轮 proportion 计算再对 rs 进行 scale 扩缩容.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
 // If there is only one active replica set then we should scale that up to the full count of the
 // deployment. If there is no active replica set, then we should scale up the newest replica set.
 //返回唯一的活动或最新副本集(如果最多有一个活动副本集)
 //副本集。如果有更多的活动副本集,那么我们应该按比例缩放它们。
 if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
  if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
   return nil
  }
  _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment)
  return err
 }

 // If the new replica set is saturated, old replica sets should be fully scaled down.
 // This case handles replica set adoption during a saturated new replica set.
 //通过将新副本集的大小与其部署大小进行比较来检查新副本集是否饱和。
 //部署和副本集都必须相信该副本集可以拥有所有所需的`Replicas`,
 // 可以通过注解"deployment.kubernetes.io/desired-replicas"来实现。
 //ReplicaSet 的所有 pod需要可用。
 if deploymentutil.IsSaturated(deployment, newRS) {
  for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
   if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil {
    return err
   }
  }
  return nil
 }

 // There are old replica sets with pods and the new replica set is not saturated.
 // We need to proportionally scale all replica sets (new and old) in case of a
 // rolling deployment.
 // 如果 `Deployment` 有配置滚动更新策略, 那么就需要按照策略去对 `rs` 进行扩缩容
 if deploymentutil.IsRollingUpdate(deployment) {
  allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
  allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

  // 计算最大可以创建出的 pod 数
  allowedSize := int32(0)
  if *(deployment.Spec.Replicas) > 0 {
   // 预期的副本数加上 maxSurge 为最大允许数
   allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
  }

  // Number of additional replicas that can be either added or removed from the total
  // replicas count. These replicas should be distributed proportionally to the active
  // replica sets.
  // 计算需要扩容的 pod 数
  deploymentReplicasToAdd := allowedSize - allRSsReplicas

  // The additional replicas should be distributed proportionally amongst the active
  // replica sets from the larger to the smaller in size replica set. Scaling direction
  // drives what happens in case we are trying to scale replica sets of the same size.
  // In such a case when scaling up, we should scale up newer replica sets first, and
  // when scaling down, we should scale down older replica sets first.
  var scalingOperation string
  switch {
  case deploymentReplicasToAdd > 0:
   // 若需要添加的副本数大于 0, 按照新旧进行排序,把新的 rs 放到前面, 这样对较新的 rs 扩容更多的 pod
   sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
   // 扩容操作
   scalingOperation = "up"

  case deploymentReplicasToAdd < 0:
   // 若需要添加的副本数大于 0, 按照新旧进行排序,旧的`RS`放前面, 这样可以删除一些较旧的 pod
   sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
   // 缩容操作
   scalingOperation = "down"
  }

  // Iterate over all active replica sets and estimate proportions for each of them.
  // The absolute value of deploymentReplicasAdded should never exceed the absolute
  // value of deploymentReplicasToAdd.
  deploymentReplicasAdded := int32(0)
  nameToSize := make(map[string]int32)
  logger := klog.FromContext(ctx)
  // 遍历所有的 rs, 计算每个 rs 需要扩容或者缩容到的期望副本数
  for i := range allRSs {
   rs := allRSs[i]

   // Estimate proportions if we have replicas to add, otherwise simply populate
   // nameToSize with the current sizes for each replica set.
   if deploymentReplicasToAdd != 0 {
    // 估算出 rs 需要扩容或者缩容的副本数
    proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)

    // 把计算出来的 `proportion` 累加到 `added`
    nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
    deploymentReplicasAdded += proportion
   } else {
    nameToSize[rs.Name] = *(rs.Spec.Replicas)
   }
  }

  // Update all replica sets
  // 遍历所有的 rs, 第一个最活跃的 rs.Spec.Replicas 加上上面循环中计算出
  // 其他 rs 要加或者减的副本数,然后更新所有 rs 的 rs.Spec.Replicas
  for i := range allRSs {
   rs := allRSs[i]

   // Add/remove any leftovers to the largest replica set.
   if i == 0 && deploymentReplicasToAdd != 0 {
    leftover := deploymentReplicasToAdd - deploymentReplicasAdded
    nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
    if nameToSize[rs.Name] < 0 {
     nameToSize[rs.Name] = 0
    }
   }

   // TODO: Use transactions when we have them.
   // 按照扩容的数量去进行扩缩容
   if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
    // Return as soon as we fail, the deployment is requeued
    return err
   }
  }
 }
 return nil
}

升级Deployment

kubernetes 里更新策略有两种, 一种为重建模式, 另一种为滚动更新模式. 当没有定义 .spec.strategy 策略时, 默认会给填充 RollingUpdate

1
2
3
4
5
6
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
 return dc.rolloutRecreate(ctx, d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
 return dc.rolloutRolling(ctx, d, rsList)
}

重新创建

Recreate 的逻辑不复杂, 首先对旧的 rs 缩容到 0, 等待所有 pods 状态为 not running 后, 再创建新的 rs, 副本数跟 deployment 期望的值一致.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// rolloutRecreate implements the logic for recreating a replica set.
func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
 // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
 if err != nil {
  return err
 }
 allRSs := append(oldRSs, newRS)
 activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

 // scale down old replica sets.
 // 缩容 oldRS
 scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d)
 if err != nil {
  return err
 }
 if scaledDown {
  // Update DeploymentStatus.
  return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 }

 // Do not process a deployment when it has old pods running.
 if oldPodsRunning(newRS, oldRSs, podMap) {
  return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 }

 // If we need to create a new RS, create it now.
 // 创建 `newRS`
 if newRS == nil {
  newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
  if err != nil {
   return err
  }
  allRSs = append(oldRSs, newRS)
 }

 // scale up new replica set.
 // 扩容`newRS`
 if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil {
  return err
 }

 // 清理过期的`RS`
 if util.DeploymentComplete(d, &d.Status) {
  if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
   return err
  }
 }

 // Sync deployment status.
 // 同步`Deployment`状态
 return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

滚动更新

rolloutRolling 负责负责滚动更新操作, 先尝试对新的 rs 进行扩容, 在扩容后更新状态后就跳出. 等再次 informer 触发事件, 再次进入方法内时, 滚动更新场景下这时当前 pods 数量超过预期值无法 scale up. 后面会尝试对旧的 rs 进行缩容, 缩容完成后需要更新 rollout 状态, 再次跳出.

滚动升级就是这样循环反复地对新 rs 进行扩容, 同时对老的 rs 进行缩容, 一边增一边减, 直到达到预期状态.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
 // 获取新旧RS
 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
 if err != nil {
  return err
 }
 allRSs := append(oldRSs, newRS)

 // Scale up, if we can.
 // 进行扩容
 scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
 if err != nil {
  return err
 }
 if scaledUp {
  // Update DeploymentStatus
  // 更新 Deployment 状态
  return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 }

 // Scale down, if we can.
 // 开始缩容
 scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
 if err != nil {
  return err
 }
 if scaledDown {
  // Update DeploymentStatus
  // 更新 Deployment 状态
  return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
 }

 // 如果滚动完毕, 需要清理
 if deploymentutil.DeploymentComplete(d, &d.Status) {
  if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
   return err
  }
 }

 // Sync deployment status
 // 同步状态
 return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

reconcileNewReplicaSetnewRs 进行扩容, 其中 NewRSNewReplicas 会根据 RollingUpdate.MaxSurge 和当前副本数计算得出所需要新增的 pods 数.

在扩容进行完成后, 通过 clientsetupdate 更新 rs 的 annotationsspec.replicas 字段, 这里就完事了, rs 的真正维护是依赖 replicaSet controller 实现的.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
 // 新的 `rs` 副本数是否达到了预期, 也就是跟 `deployment` 定义的一致, 如果一致则没必要再进行滚动.
 if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
  // Scaling not required.
  return false, nil
 }
 // `newRS` 的副本数比 `deployment` 的预期多, 那么就需要缩容,减少副本数
 if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
  // Scale down.
  scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
  return scaled, err
 }
 //  计算 `newRS` 的副本数
 newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
 if err != nil {
  return false, err
 }
 // 更新 `RS` 的注解和 `RS` 的 `rs.Spec.Replicas` 字段
 scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
 return scaled, err
}

调用 reconcileOldReplicaSetsoldRS 不断缩容,每次缩容的数量是根据 RollingUpdate.MaxUnavailable 和当前可用的pods数量来计算出来的.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
 logger := klog.FromContext(ctx)
 // 计算 oldPodsCount
 oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
 if oldPodsCount == 0 {
  // Can't scale down further
  return false, nil
 }
 // 计算 allPodsCount
 allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
 logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas)
 maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

 // Check if we can scale down. We can scale down in the following 2 cases:
 // * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
 //  increase unavailability.
 // * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
 //
 // maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
 // take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
 // the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
 // step(that will increase unavailability).
 //
 // Concrete example:
 //
 // * 10 replicas
 // * 2 maxUnavailable (absolute number, not percent)
 // * 3 maxSurge (absolute number, not percent)
 //
 // case 1:
 // * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
 // * The new replica set pods crashloop and never become available.
 // * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
 // * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
 // * The user notices the crashloop and does kubectl rollout undo to rollback.
 // * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
 // * The total number of pods will then be 9 and the newRS can be scaled up to 10.
 //
 // case 2:
 // Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
 // * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
 // * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
 // allow the new replica set to be scaled up by 5.
 // 计算 maxScaledDown
 minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
 newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
 maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
 if maxScaledDown <= 0 {
  return false, nil
 }

 // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
 // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
 // 清理异常 RS
 oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
 if err != nil {
  return false, nil
 }
 logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount)

 // Scale down old replica sets, need check maxUnavailable to ensure we can scale down
 allRSs = append(oldRSs, newRS)
 // 缩容旧的 RS
 scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
 if err != nil {
  return false, nil
 }
 logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount)

 totalScaledDown := cleanupCount + scaledDownCount
 return totalScaledDown > 0, nil
}

syncDeployment

0%