分析基于kubernetes-1.15.5源码。

Deployment是新一代用于Pod管理的资源对象,除了继承了Replication的全部功能外,还在此基础上提供了更加完善的功能,特别是提供了滚动更新的功能,这对服务平滑升级简直太友好了。关于Rolling Update它有几个重要的参数用来控制滚动更新的动作:

  • .spec.minReadySeconds
  • .spec.strategy.rollingUpdate.maxSurge
  • .spec.strategy.rollingUpdate.maxUnavailable

为了更好的理解这几个参数的作用,有必要深入分析一下Deployment Controller的处理逻辑。

控制器相关的配置项

  • --concurrent-deployment-syncs int32 Default: 5 The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load
  • --deployment-controller-sync-period duration Default: 30s Period for syncing the deployments.

Watch GVK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
// ...
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
// ...
}
  • Apps/V1/Deployments
  • Apps.V1.ReplicaSets
  • Core.V1.Pods。

Event Handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    //...

    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,
    })
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })
    
    // ...
}

Run函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {

	// ...

	// @xnile 等待informer cache同步完成
	if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(dc.worker, time.Second, stopCh)
	}

	<-stopCh
}
  • 等待本地Informer cache同步完成
  • 开启workers(workers由--concurrent-deployment-syncs参数指定,默认为5)个协程从任务队列中消费任务然后交给syncHandler处理,syncHandler的逻辑在syncDeployment函数,Deployment Controller的关键函数。

核心逻辑syncDeployment

Deployment Controller核心逻辑syncDeployment函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (dc *DeploymentController) syncDeployment(key string) error {
    
    // ... 省略

    //@xnile 从informer cache中获取deployment对象
    deployment, err := dc.dLister.Deployments(namespace).Get(name)

    // ...省略
  
    // @xnile 获取deployment管理的rs
    rsList, err := dc.getReplicaSetsForDeployment(d)
    if err != nil {
        return err
    }

    // ... 省略

    // @xnile 判断deployment是否已经被删除,只有当删除策略为“Foreground”时才会出现
    if d.DeletionTimestamp != nil {
        return dc.syncStatusOnly(d, rsList)
        // @xnile 后续GC Controller会负责清理rs、pods
    }

    // @xnile 是否处于暂停状态,更新Conditions,目的是在暂停的这段时间内不记时,防止触发spec.progressDeadlineSeconds,
    if err = dc.checkPausedConditions(d); err != nil {
        return err
    }
    if d.Spec.Paused {
        return dc.sync(d, rsList)
    }

    // @xnile 通过检测 .spec.rollbackTo 信息判断是否需要回退
    // @xnile 通过yaml文件指定或使用kubectl rollout undo命令
    if getRollbackTo(d) != nil {
        return dc.rollback(d, rsList)
    }

    // @xnile 是否需要scale
    scalingEvent, err := dc.isScalingEvent(d, rsList)
    if err != nil {
        return err
    }
    if scalingEvent {
        return dc.sync(d, rsList)
    }

    // @xnile 更新
    switch d.Spec.Strategy.Type {
    case apps.RecreateDeploymentStrategyType:
        return dc.rolloutRecreate(d, rsList, podMap)
    case apps.RollingUpdateDeploymentStrategyType:
        // @xnile 滚动更新
        return dc.rolloutRolling(d, rsList)
    }
    return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
  • Informer cache中获取deployment对象信息
  • 获取deployment管理的rs
  • 判断deployment是否已经被删除,只有当删除策略为Foreground时才会出现
  • 判断是否处于暂停状态,如果是则更新Conditions,目的是在暂停的这段时间内不记时,防止触发spec.progressDeadlineSeconds
  • 通过检测 .spec.rollbackTo 判断是否需要回退到指定Revision
  • 判断是否需要scale,这里也能看出扩缩容的操作的优先级要高于更新操作,下边会详细
  • 更新,根据.spec.strategy.type指定的更新策略选择不同的处理逻辑,这里我们只分析RollingUpdate类型

scale 扩缩容

先来看下是如何判断是否需要scale的,逻辑在isScalingEvent函数

isScalingEvent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (dc *DeploymentController) isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
  // 获取新旧rs,注意最后一个参数为false
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
	if err != nil {
		return false, err
	}
	allRSs := append(oldRSs, newRS)
  // FilterActiveReplicaSets 通过spec.replicas >0 过滤出活跃的rs
	for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
    // 获取annotations["deployment.kubernetes.io/desired-replicas"]值
		desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
		if !ok {
			continue
		}
    // 不一致则代表需要scale
		if desired != *(d.Spec.Replicas) {
			return true, nil
		}
	}
	return false, nil
}

此函数就不再往下展开了,关键逻辑已经备注出来了。

下面来看下scale的逻辑,主要有sync函数完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
	if err != nil {
		return err
	}
	if err := dc.scale(d, newRS, oldRSs); err != nil {
		// If we get an error while trying to scale, the deployment will be requeued
		// so we can abort this resync
		return err
	}

	// Clean up the deployment when it's paused and no rollback is in flight.
	if d.Spec.Paused && getRollbackTo(d) == nil {
		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
			return err
		}
	}

	allRSs := append(oldRSs, newRS)
	return dc.syncDeploymentStatus(allRSs, newRS, d)
}

重点来看下scale函数

scale

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
  // 对应仅修改了replicas的情况
	if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
		if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
			return nil
		}
		_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
		return err
	}

	// 已经饱和
	if deploymentutil.IsSaturated(deployment, newRS) {
		for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
			if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
				return err
			}
		}
		return nil
	}

	// 有多个活跃的rs,spec.replicas >0 且更新策略为滚动更新
	if deploymentutil.IsRollingUpdate(deployment) {
		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
    // 所有活跃rs.replicas相加
		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

		allowedSize := int32(0)
		if *(deployment.Spec.Replicas) > 0 {
			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
		}

    // 待scale的总数
		deploymentReplicasToAdd := allowedSize - allRSsReplicas

		switch {
		case deploymentReplicasToAdd > 0:
			// 大在前小在后,相同时新在前旧在后
			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
			scalingOperation = "up"

		case deploymentReplicasToAdd < 0:
			// 大在前小在后,相同时旧在前新在后
			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
			scalingOperation = "down"
		}

    // 计数,已经派下去的任务
		deploymentReplicasAdded := int32(0)
		nameToSize := make(map[string]int32)
		for i := range allRSs {
			rs := allRSs[i]

			if deploymentReplicasToAdd != 0 {
        // 分任务,根据rs原来的占比情况计算出现在应该分到的任务数量
				proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)

				nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
				deploymentReplicasAdded += proportion
			} else {
				nameToSize[rs.Name] = *(rs.Spec.Replicas)
			}
		}

		for i := range allRSs {
			rs := allRSs[i]

			// Add/remove any leftovers to the largest replica set.
			if i == 0 && deploymentReplicasToAdd != 0 {
        // 还没派完剩下的任务全给最能干的
				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
				nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
				if nameToSize[rs.Name] < 0 {
					nameToSize[rs.Name] = 0
				}
			}

      // 调用api更改rs的Replicas
			if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
				return err
			}
		}
	}
	return nil
}

根据FindActiveOrLatest函数返回结果分这么几种情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func FindActiveOrLatest(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) *apps.ReplicaSet {
	if newRS == nil && len(oldRSs) == 0 {
		return nil
	}

	sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs)))
	allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))

	switch len(allRSs) {
	case 0:
		// If there is no active replica set then we should return the newest.
		if newRS != nil {
			return newRS
		}
		return oldRSs[0]
	case 1:
		return allRSs[0]
	default:
		return nil
	}
}
  1. 返回不为nil,这里又分两种情况

    • 只有一个活跃rs,这种情况比较好理解,deployment.Spec.Template内容不变仅修改了deployment.spec.replicas,简单的扩缩容操作
    • 没有活跃rs,我没想出来什么情况下为会现,有知道的朋友请告知下。
  2. 返回为nil,说明有两个及以上个活跃rs,这种情况出现在上次滚动更新还未完成又应用了一次修改(至少有扩容)时,这时需要根据rs的占比情况计算出待scale的数量。

RollingUpdate

Deployment滚动更新实际是依靠新旧rs交接棒完成的,更新过程分成两步:Scale upScale down

  • Scale up负责将新rsreplicas朝着deployment.Spec.Replicas指定的数据递加。

  • Scale down负责将旧的replicas朝着0的目标递减。

一次完整的滚动更新需要经过很多轮Scale upScale down的过程,这对理解pauseresume很重要。

PS:新旧rs,这里的新旧rs不是通过创建时间来区分的而是通过将rs.Spec.Template和deployment.Spec.Template对比,如果找到一致的就将此rs置为新rs,如果找不到就新建rs并置为新rs,不是简单的通过创建时间区分,这也解释了Deployment并不是每次更新都新建rs,关于此点后边会单独再详细分析。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//@ xnile 一次完整的滚动更新需要执行多次 rolloutRolling
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	// @xnile 这个函数包含重要信息,不是每次滚动更新都创建新的replicas,而是先遍历老的rs,通过对比src.templates判断是否有相同的,如果没有找到再新建rs
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
	if err != nil {
		return err
	}
	allRSs := append(oldRSs, newRS)

	// Scale up, if we can.
	scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
	if err != nil {
		return err
	}
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if err != nil {
		return err
	}
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	// @xnile 如果滚动更新已完成
	if deploymentutil.DeploymentComplete(d, &d.Status) {
		// @xnile 清理历史rs, 最多只保留最 d.Spec.RevisionHistoryLimit 个历史版本
		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status
  // 更新状态
	return dc.syncRolloutStatus(allRSs, newRS, d)
}
  • 获取deployment管理的全部RS,并通过比对Spec.Template信息找出新旧RS,新RS如果不存在则新创建

  • 调用dc.reconcileNewReplicaSet方法判断是否需要Scale up

  • 调用dc.reconcileOldReplicaSets方法判断是否需要Scale down

  • 如果更新已经完成则清理历史rs,最多只保留.spec.RevisionHistoryLimit个历史版本

  • 更新deployment状态,通过kubectl describe可以看到下面信息

    1
    2
    3
    4
    5
    6
    7
    
    Events:
    Type    Reason             Age                  From                   Message
    ----    ------             ----                 ----                   -------
    Normal  ScalingReplicaSet  61s (x2 over 8h)     deployment-controller  Scaled up replica set my-nginx-79cb8c4647 to 1
    Normal  ScalingReplicaSet  49s (x2 over 8h)     deployment-controller  Scaled up replica set my-nginx-79cb8c4647 to 2
    Normal  ScalingReplicaSet  49s                  deployment-controller  Scaled down replica set my-nginx-9f4d8c9d5 to 3
    Normal  ScalingReplicaSet  37s (x2 over 8h)     deployment-controller  Scaled up 

Scale up的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
		// Scaling not required.
		return false, nil
	}
	// @xnile 需要scale down
	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
		// Scale down.
		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
		return scaled, err
	}
	// @xnile 获取能scale up的数量
	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
	if err != nil {
		return false, err
	}
	// @xnile 调用api更新rs的replicas,然后rs controller会负责pod的创建
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
	return scaled, err
}
  • 判断是否需要Scale up
  • 判断是否需要先Scale down
  • 调用deploymentutil.NewRSNewReplicas获取能Scale up的数量
  • 调用api更新rsreplicas,然后rs controller会负责pod的创建

继续来看下deploymentutil.NewRSNewReplicas方法,.spec.strategy.rollingUpdate.maxSurge参数的作用也就在于此。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
	switch deployment.Spec.Strategy.Type {
	case apps.RollingUpdateDeploymentStrategyType:
		// Check if we can scale up.
		maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
		if err != nil {
			return 0, err
		}
		// Find the total number of pods
		currentPodCount := GetReplicaCountForReplicaSets(allRSs)
		maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
		if currentPodCount >= maxTotalPods {
			// Cannot scale up.
			return *(newRS.Spec.Replicas), nil
		}
		// Scale up.
		scaleUpCount := maxTotalPods - currentPodCount
		// Do not exceed the number of desired replicas.
		scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
		return *(newRS.Spec.Replicas) + scaleUpCount, nil
	case apps.RecreateDeploymentStrategyType:
		return *(deployment.Spec.Replicas), nil
	default:
		return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
	}
}

函数逻辑比较简单,从中我们能看出的.spec.strategy.rollingUpdate.maxSurge的作用:

在scale up的时候所有pod不能超过deployment.Spec.Replicas+.spec.strategy.rollingUpdate.maxSurge相加之和,.spec.strategy.rollingUpdate.maxSurge可以是整数或百分比,是百分比时需要向上取整(如0.1就限1)。

Scale down的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
    if oldPodsCount == 0 {
        // Can't scale down further
        return false, nil
    }

    allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
    maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

    minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
    newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
    // @xnile 这里为什么不用readyPodCount - minAvailable
    // @xnile allPodsCount、minAvailable 两值都是静态的,但newRSUnavailablePodCount是动态的。
    // @xnile 考虑一种情况,滚动更新后新创建的Pod因为某种原因一直不能ready。这时不能再scale down,这时我查找到原因了修复了,再滚动一次,发现readyPodCount - minAvailable=0 会卡住
    maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
    if maxScaledDown <= 0 {
        return false, nil
    }

    // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
    // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
    oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
    if err != nil {
        return false, nil
    }
    klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)

    // Scale down old replica sets, need check maxUnavailable to ensure we can scale down
    allRSs = append(oldRSs, newRS)
    scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
    if err != nil {
        return false, nil
    }
    klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)

    totalScaledDown := cleanupCount + scaledDownCount
    return totalScaledDown > 0, nil
}

其中minAvailable := *(deployment.Spec.Replicas) - maxUnavailable 一句比较关键,

在scale down的过程中必须保证当前可用pod不能少于deployment.Spec.Replicas - .spec.strategy.rollingUpdate.maxUnavailable.spec.strategy.rollingUpdate.maxUnavailable可以是整数或百分比,如果百分比就向下取整(如1.7就是1)

PS:这里一开始我在看源码时对maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount这行代码有个疑问,为什么不直接用readyPodCount - minAvailable计算出能scale down的pod数量呢?后来根据这个Issue,发现在之前确实使用过totalScaleDownCount := readyPodCount - minAvailable,也因此引入了一个Bug,如果新起的pod因为某种原因一直不能ready,会卡住后续的更新,想回滚也不行,想了解细节的同学可以看下那个Issue。

Deployment 删除

从源码中我们可以看到Deployment Controller中没有deployment的删除逻辑,其实deployment的删除及关联的rsPod的删除是在GC Controller中处理的,以后有机会再分析下GC Controller的逻辑。

同一个Deployment先后触发滚动更新会如何处理?

如果上一次滚动更新还未完成马上接着又对此deployment执行了一次滚动更新,控制器又会如何处理呢?Scale up的流程参加上边分析的过程会创建New rs,但Scale down会如何处理呢,是Scale down上一次滚动更新刚创建的rs还是更老的rs的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

	//...

	// @xnile 根据创建时间排序,即先Scale down最早创建的rs
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))

	totalScaledDown := int32(0)
	totalScaleDownCount := availablePodCount - minAvailable
	for _, targetRS := range oldRSs {
		//...
	}

	return totalScaledDown, nil
}

答案是先scale down最老的rs,然后再Scale down上次更新时创建的rs.

pause和resume

如果deployment还在滚动更新中我们执行了kubectl rollout pause 命令,控制器又会如何处理?

Rolling Update章节我们已经提到过一次完成的滚动更新需要经过多轮Scale upScale down的过程,当执行暂停操作只会影响下一轮的Scale upScale down而不会影响本轮的操作。是不是也侧面说明了kubernetes操作都是声明式的而非命令式的。

minReadySeconds的作用

.spec.minReadySeconds的作用是在Scale up的过程中新创建的pod在本身ready的基础上会再等上minReadySeconds才会认为pod已经是可用状态,然后才会接着开始scale down,相当于一个观察期的作用,防止新起的pod发生crash,进而影响服务的可用性,保证集群在更新过程的稳定性。

在测试过程中可以适当增加这个值,人为减慢滚动更新的进度,方便我们使用kubectl get rs -w观察滚动更新的过程。

Scale down过程中被kill Pod的优先级

在滚动更新Scale down阶段需要杀掉老的pod,这些需要被杀掉的pod是如何被筛选出来的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
	// No need to sort pods if we are about to delete all of them.
	// diff will always be <= len(filteredPods), so not need to handle > case.
	if diff < len(filteredPods) {
		// Sort the pods in the order such that not-ready < ready, unscheduled
		// < scheduled, and pending < running. This ensures that we delete pods
		// in the earlier stages whenever possible.
		// @xnile 尽可能删除较早的pod
		sort.Sort(controller.ActivePods(filteredPods))
	}
	return filteredPods[:diff]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s ActivePods) Less(i, j int) bool {
	// 1. Unassigned < assigned
	// If only one of the pods is unassigned, the unassigned one is smaller
	if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
		return len(s[i].Spec.NodeName) == 0
	}
	// 2. PodPending < PodUnknown < PodRunning
	m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
	if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
		return m[s[i].Status.Phase] < m[s[j].Status.Phase]
	}
	// 3. Not ready < ready
	// If only one of the pods is not ready, the not ready one is smaller
	if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
		return !podutil.IsPodReady(s[i])
	}
	// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
	//       see https://github.com/kubernetes/kubernetes/issues/22065
	// 4. Been ready for empty time < less time < more time
	// If both pods are ready, the latest ready one is smaller
	if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
		return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
	}
	// 5. Pods with containers with higher restart counts < lower restart counts
	if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
		return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
	}
	// 6. Empty creation time pods < newer pods < older pods
	if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
		return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
	}
	return false
}

注释已经解释的比较清楚了,就不赘述了。

Deployment每次更新都会创建新Replicasets吗?

前边在分析rolloutRolling函数时已经提到过,答案是并不会每次都创建新的rs,详细逻辑我们来看下

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
	_, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)

	// Get new replica set with the updated revision number
	newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted)
	if err != nil {
		return nil, nil, err
	}

	return newRS, allOldRSs, nil
}

func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
	existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)

	// Calculate the max revision number among all old RSes
	maxOldRevision := deploymentutil.MaxRevision(oldRSs)
	// Calculate revision number for this new replica set
	newRevision := strconv.FormatInt(maxOldRevision+1, 10)

	// Latest replica set exists. We need to sync its annotations (includes copying all but
	// annotationsToSkip from the parent deployment, and update revision, desiredReplicas,
	// and maxReplicas) and also update the revision annotation in the deployment with the
	// latest revision.
	if existingNewRS != nil {
		rsCopy := existingNewRS.DeepCopy()

		// Set existing new replica set's annotation
		annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true, maxRevHistoryLengthInChars)
		minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
		if annotationsUpdated || minReadySecondsNeedsUpdate {
			rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
			return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
		}

		// Should use the revision in existingNewRS's annotation, since it set by before
		needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
		// If no other Progressing condition has been recorded and we need to estimate the progress
		// of this deployment then it is likely that old users started caring about progress. In that
		// case we need to take into account the first time we noticed their new replica set.
		cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
		if deploymentutil.HasProgressDeadline(d) && cond == nil {
			msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
			condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
			deploymentutil.SetDeploymentCondition(&d.Status, *condition)
			needsUpdate = true
		}

		if needsUpdate {
			var err error
			if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil {
				return nil, err
			}
		}
		return rsCopy, nil
	}

	if !createIfNotExisted {
		return nil, nil
	}

	// new ReplicaSet does not exist, create one.
	newRSTemplate := *d.Spec.Template.DeepCopy()
	podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)
	newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
	// Add podTemplateHash label to selector.
	newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)

	// Create new ReplicaSet
	newRS := apps.ReplicaSet{
		ObjectMeta: metav1.ObjectMeta{
			// Make the name deterministic, to ensure idempotence
			Name:            d.Name + "-" + podTemplateSpecHash,
			Namespace:       d.Namespace,
			OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)},
			Labels:          newRSTemplate.Labels,
		},
		Spec: apps.ReplicaSetSpec{
			Replicas:        new(int32),
			MinReadySeconds: d.Spec.MinReadySeconds,
			Selector:        newRSSelector,
			Template:        newRSTemplate,
		},
	}
	allRSs := append(oldRSs, &newRS)
	newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
	if err != nil {
		return nil, err
	}

	*(newRS.Spec.Replicas) = newReplicasCount
	// Set new replica set's annotation
	deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false, maxRevHistoryLengthInChars)
	// Create the new ReplicaSet. If it already exists, then we need to check for possible
	// hash collisions. If there is any other error, we need to report it in the status of
	// the Deployment.
	alreadyExists := false
	createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})
	switch {
	// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
	case errors.IsAlreadyExists(err):
		alreadyExists = true

		// Fetch a copy of the ReplicaSet.
		rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
		if rsErr != nil {
			return nil, rsErr
		}

		// If the Deployment owns the ReplicaSet and the ReplicaSet's PodTemplateSpec is semantically
		// deep equal to the PodTemplateSpec of the Deployment, it's the Deployment's new ReplicaSet.
		// Otherwise, this is a hash collision and we need to increment the collisionCount field in
		// the status of the Deployment and requeue to try the creation in the next sync.
		controllerRef := metav1.GetControllerOf(rs)
		if controllerRef != nil && controllerRef.UID == d.UID && deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
			createdRS = rs
			err = nil
			break
		}

		// Matching ReplicaSet is not equal - increment the collisionCount in the DeploymentStatus
		// and requeue the Deployment.
		if d.Status.CollisionCount == nil {
			d.Status.CollisionCount = new(int32)
		}
		preCollisionCount := *d.Status.CollisionCount
		*d.Status.CollisionCount++
		// Update the collisionCount for the Deployment and let it requeue by returning the original
		// error.
		_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
		if dErr == nil {
			klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
		}
		return nil, err
	case errors.HasStatusCause(err, v1.NamespaceTerminatingCause):
		// if the namespace is terminating, all subsequent creates will fail and we can safely do nothing
		return nil, err
	case err != nil:
		msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
		if deploymentutil.HasProgressDeadline(d) {
			cond := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, deploymentutil.FailedRSCreateReason, msg)
			deploymentutil.SetDeploymentCondition(&d.Status, *cond)
			// We don't really care about this error at this point, since we have a bigger issue to report.
			// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
			// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
			_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
		}
		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
		return nil, err
	}
	if !alreadyExists && newReplicasCount > 0 {
		dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
	}

	needsUpdate := deploymentutil.SetDeploymentRevision(d, newRevision)
	if !alreadyExists && deploymentutil.HasProgressDeadline(d) {
		msg := fmt.Sprintf("Created new replica set %q", createdRS.Name)
		condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.NewReplicaSetReason, msg)
		deploymentutil.SetDeploymentCondition(&d.Status, *condition)
		needsUpdate = true
	}
	if needsUpdate {
		_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
	}
	return createdRS, err
}

func FindNewReplicaSet(deployment *apps.Deployment, rsList []*apps.ReplicaSet) *apps.ReplicaSet {
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(rsList))
	for i := range rsList {
    // 对比Spec.Template信息
		if EqualIgnoreHash(&rsList[i].Spec.Template, &deployment.Spec.Template) {
			// In rare cases, such as after cluster upgrades, Deployment may end up with
			// having more than one new ReplicaSets that have the same template as its template,
			// see https://github.com/kubernetes/kubernetes/issues/40415
			// We deterministically choose the oldest new ReplicaSet.
			return rsList[i]
		}
	}
	// new ReplicaSet does not exist.
	return nil
}

func FindOldReplicaSets(deployment *apps.Deployment, rsList []*apps.ReplicaSet) ([]*apps.ReplicaSet, []*apps.ReplicaSet) {
	var requiredRSs []*apps.ReplicaSet
	var allRSs []*apps.ReplicaSet
	newRS := FindNewReplicaSet(deployment, rsList)
	for _, rs := range rsList {
		// Filter out new replica set
		if newRS != nil && rs.UID == newRS.UID {
			continue
		}
		allRSs = append(allRSs, rs)
		if *(rs.Spec.Replicas) != 0 {
			requiredRSs = append(requiredRSs, rs)
		}
	}
	return requiredRSs, allRSs
}

可以看到Deployment每次在更新时会遍历RS通过对比Spec.Template内容判断是否需要新建RS,如果有相同的就在此RS的基础上更新scale up/down操作。细心的朋友日常使用中可能已经发现了此现象,原理就在此。

调试

  • kubectl get rs -w watch rs 的变化
  • kubectl describe deploy <name> 查看deployment的更新状态

总结

Kubernetes Deployment滚动更新是靠新老RS交接棒完成的,新的RS scale up->老的RS scale down->新的RS scale up的…… 一直循环直到新的RS repliacs的数量达到期望值。在滚动更新的过程中会遵循:

  • 总pod数不能超过deployment.Spec.Replicas+.spec.strategy.rollingUpdate.maxSurge
  • 保证当前ready的pod不能少于deployment.Spec.Replicas - .spec.strategy.rollingUpdate.maxUnavailable

在生产环境实际操作中默认25%replicas基数很大的服务是不合适的,因为在滚动更新的一瞬间maxSurge可能突破你集群资源的上限,maxUnavailable也可能会击穿你服务性能水平的下限,因此一定要根据自己服务的情况做相应调整。

参考

https://github.com/kubernetes/kubernetes/issues/22065

https://github.com/kubernetes/kubernetes/pull/20368/commits/86aea1d59c42de15afbff5e2388e4b764bd134fc

https://github.com/kubernetes/kubernetes/pull/20368