分析基于kubernetes-1.15.5源码。

Deployment是新一代用于Pod管理的对象,除了继承了Replication的全部功能外,还在此基础上提供了更加完善的功能,特别是提供了滚动更新的功能,这对服务平滑升级简直太友好了。关于Rolling Update它有几个重要的参数用来控制滚动更新的动作:

  • .spec.minReadySeconds
  • .spec.strategy.rollingUpdate.maxSurge
  • .spec.strategy.rollingUpdate.maxUnavailable

为了更好的理解这几个参数的作用,有必要深入分析一下Deployment Controller的处理逻辑。

控制器相关的配置项

  • --concurrent-deployment-syncs int32 Default: 5 The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load
  • --deployment-controller-sync-period duration Default: 30s Period for syncing the deployments.

Watch GVK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
// ...
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
// ...
}
  • Apps/V1/Deployments
  • Apps.V1.ReplicaSets
  • Core.V1.Pods。

Event Handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    //...

    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,
    })
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })
    
    // ...
}

Run函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {

	// ...

	// @xnile 等待informer cache同步完成
	if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(dc.worker, time.Second, stopCh)
	}

	<-stopCh
}
  • 等待本地Informer cache同步完成
  • 开启workers(workers由--concurrent-deployment-syncs参数指定,默认为5)个协程从任务队列中消费任务然后交给syncHandler处理,syncHandler的逻辑在syncDeployment函数,Deployment Controller的关键函数。

核心逻辑syncDeployment

Deployment Controller核心逻辑syncDeployment函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (dc *DeploymentController) syncDeployment(key string) error {
    
    // ... 省略

    //@xnile 从informer cache中获取deployment对象
    deployment, err := dc.dLister.Deployments(namespace).Get(name)

    // ...省略

    // Deep-copy otherwise we are mutating our cache.
    // TODO: Deep-copy only when needed.
    // @xnile Deep-copy 每一次syncloop独立,每个goroutine独立
    d := deployment.DeepCopy()

    // @xnile 获取deployment管理的rs
    rsList, err := dc.getReplicaSetsForDeployment(d)
    if err != nil {
        return err
    }

    // ... 省略

    // @xnile 判断deployment是否已经被删除,只有当删除策略为“Foreground”时才会出现
    if d.DeletionTimestamp != nil {
        return dc.syncStatusOnly(d, rsList)
        // @xnile 后续GC Controller会负责清理rs、pods
    }

    // @xnile 是否处于暂停状态,更新Conditions,目的是在暂停的这段时间内不记时,防止触发spec.progressDeadlineSeconds,
    if err = dc.checkPausedConditions(d); err != nil {
        return err
    }
    if d.Spec.Paused {
        return dc.sync(d, rsList)
    }

    // @xnile 通过检测 .spec.rollbackTo 信息判断是否需要回退
    // @xnile 通过yaml文件指定或使用kubectl rollout undo命令
    if getRollbackTo(d) != nil {
        return dc.rollback(d, rsList)
    }

    // @xnile 是否需要scale
    // @xnile 如果更新的同时并修改了replicas,先scale完了再Rolling Update
    scalingEvent, err := dc.isScalingEvent(d, rsList)
    if err != nil {
        return err
    }
    if scalingEvent {
        return dc.sync(d, rsList)
    }

    // @xnile 更新
    switch d.Spec.Strategy.Type {
    case apps.RecreateDeploymentStrategyType:
        return dc.rolloutRecreate(d, rsList, podMap)
    case apps.RollingUpdateDeploymentStrategyType:
        // @xnile 滚动更新
        return dc.rolloutRolling(d, rsList)
    }
    return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
  • Informer cache中获取deployment对象
  • 获取deployment关联的rs
  • 判断deployment是否已经被删除,只有当删除策略为Foreground时才会出现
  • 判断是否处于暂停状态,更新Conditions,目的是在暂停的这段时间内不记时,防止触发spec.progressDeadlineSeconds,
  • 通过检测 .spec.rollbackTo 信息判断是否需要回退
  • 判断是否需要scale,如果更新的同时并修改了replicas,先scale完了再更新。
  • 更新。根据.spec.strategy.type指定的更新策略选择不同的处理逻辑。

Rolling Update

Deployment滚动更新是靠新旧rs交接棒完成的,更新过程分成两步:Scale upScale down

  • Scale up负责将新rsreplicas朝着deployment.Spec.Replicas指定的数据递加。

  • Scale down负责将旧的replicas朝着0的目标递减。

一次完整的滚动更新需要经过很多轮Scale upScale down的过程,这对理解pauseresume很重要。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//@ xnile 一次完整的滚动更新需要执行多次 rolloutRolling
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	// @xnile 获取新旧rs,如果是首次rolling out因为new rs还不存在则会创建new rs
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
	if err != nil {
		return err
	}
	allRSs := append(oldRSs, newRS)

	// Scale up, if we can.
	scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
	if err != nil {
		return err
	}
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if err != nil {
		return err
	}
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(allRSs, newRS, d)
	}

	// @xnile 更新已完成
	// @xnile TODO 为什么还要先判断是否已经完成?不scale up也不scale down的情况是?
	if deploymentutil.DeploymentComplete(d, &d.Status) {
		// @xnile 清理历史rs, 最多只保留最 d.Spec.RevisionHistoryLimit 个历史版本
		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status
	return dc.syncRolloutStatus(allRSs, newRS, d)
}
  • 获取新旧rs,如果是首次滚动更新因为New rs还不存在则会创建New rs

  • 调用dc.reconcileNewReplicaSet方法判断是否能Scale up

  • 调用dc.reconcileOldReplicaSets方法判断是否能Scale down

  • 如果更新已经完成则清理历史rs,最多只保留.spec.RevisionHistoryLimit个历史版本

  • 更新deployment状态,通过kubectl describe可以看到下面信息

    1
    2
    3
    4
    5
    6
    7
    
    Events:
    Type    Reason             Age                  From                   Message
    ----    ------             ----                 ----                   -------
    Normal  ScalingReplicaSet  61s (x2 over 8h)     deployment-controller  Scaled up replica set my-nginx-79cb8c4647 to 1
    Normal  ScalingReplicaSet  49s (x2 over 8h)     deployment-controller  Scaled up replica set my-nginx-79cb8c4647 to 2
    Normal  ScalingReplicaSet  49s                  deployment-controller  Scaled down replica set my-nginx-9f4d8c9d5 to 3
    Normal  ScalingReplicaSet  37s (x2 over 8h)     deployment-controller  Scaled up 

Scale up的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
		// Scaling not required.
		return false, nil
	}
	// @xnile TODO 什么情况下会出现?
	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
		// Scale down.
		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
		return scaled, err
	}
	// @xnile 获取能scale up的数量
	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
	if err != nil {
		return false, err
	}
	// @xnile 调用api更新rs的replicas,然后rs controller会负责pod的创建
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
	return scaled, err
}
  • 判断是否能Scale up
  • 判断是否需要先Scale down(什么情况会出现?)。
  • 调用deploymentutil.NewRSNewReplicas获取能Scale up的数量
  • 调用api更新rsreplicas,然后rs controller会负责pod的创建

继续来看下deploymentutil.NewRSNewReplicas方法,.spec.strategy.rollingUpdate.maxSurge参数的作用也就在于此。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
	switch deployment.Spec.Strategy.Type {
	case apps.RollingUpdateDeploymentStrategyType:
		// Check if we can scale up.
		maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
		if err != nil {
			return 0, err
		}
		// Find the total number of pods
		currentPodCount := GetReplicaCountForReplicaSets(allRSs)
		maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
		if currentPodCount >= maxTotalPods {
			// Cannot scale up.
			return *(newRS.Spec.Replicas), nil
		}
		// Scale up.
		scaleUpCount := maxTotalPods - currentPodCount
		// Do not exceed the number of desired replicas.
		scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
		return *(newRS.Spec.Replicas) + scaleUpCount, nil
	case apps.RecreateDeploymentStrategyType:
		return *(deployment.Spec.Replicas), nil
	default:
		return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
	}
}

函数逻辑比较简单,从中我们能看出的.spec.strategy.rollingUpdate.maxSurge的作用:

在scale up的时候所有pod不能超过deployment.Spec.Replicas+.spec.strategy.rollingUpdate.maxSurge相加之和,.spec.strategy.rollingUpdate.maxSurge可以是整数或百分比,是百分比时需要向上取整(如0.1就限1)。

Scale down的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
    if oldPodsCount == 0 {
        // Can't scale down further
        return false, nil
    }

    allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
    maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

    minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
    newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
    // @xnile 这里为什么不用readyPodCount - minAvailable
    // @xnile allPodsCount、minAvailable 两值都是静态的,但newRSUnavailablePodCount是动态的。
    // @xnile 考虑一种情况,滚动更新后新创建的Pod因为某种原因一直不能ready。这时不能再scale down,这时我查找到原因了修复了,再滚动一次,发现readyPodCount - minAvailable=0 会卡住
    maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
    if maxScaledDown <= 0 {
        return false, nil
    }

    // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
    // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
    oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
    if err != nil {
        return false, nil
    }
    klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)

    // Scale down old replica sets, need check maxUnavailable to ensure we can scale down
    allRSs = append(oldRSs, newRS)
    scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
    if err != nil {
        return false, nil
    }
    klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)

    totalScaledDown := cleanupCount + scaledDownCount
    return totalScaledDown > 0, nil
}

其中minAvailable := *(deployment.Spec.Replicas) - maxUnavailable 一句比较关键,

在scale down的过程中必须保证当前可用pod不能少于deployment.Spec.Replicas - .spec.strategy.rollingUpdate.maxUnavailable.spec.strategy.rollingUpdate.maxUnavailable可以是整数或百分比,如果百分比就向下取整(如1.7就是1)

PS:这里一开始我在看源码时对maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount这行代码有个疑问,为什么不直接用readyPodCount - minAvailable计算出能scale down的pod数量呢?后来根据这个Issue,发现在之前确实使用过totalScaleDownCount := readyPodCount - minAvailable,也因此引入了一个Bug,如果新起的pod因为某种原因一直不能ready,会卡住后续的更新,想回滚也不行,想了解细节的同学可以看下那个Issue。

Deployment 删除

从源码中我们可以看到Deployment Controller中没有deployment的删除逻辑,其实deployment的删除及关联的rsPod的删除是在GC Controller中处理的,以后有机会再分析下GC Controller的逻辑。

同一个Deployment先后触发滚动更新会如何处理?

如果上一次滚动更新还未完成马上接着又对此deployment执行了一次滚动更新,控制器又会如何处理呢?Scale up的流程参加上边分析的过程会创建New rs,但Scale down会如何处理呢,是Scale down上一次滚动更新刚创建的rs还是更老的rs的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

	//...

	// @xnile 根据创建时间排序,即先Scale down最早创建的rs
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))

	totalScaledDown := int32(0)
	totalScaleDownCount := availablePodCount - minAvailable
	for _, targetRS := range oldRSs {
		//...
	}

	return totalScaledDown, nil
}

答案是先scale down最老的rs,然后再Scale down上次更新时创建的rs.

pause和resume

如果deployment还在滚动更新中我们执行了kubectl rollout pause 命令,控制器又会如何处理?

Rolling Update章节我们已经提到过一次完成的滚动更新需要经过多轮Scale upScale down的过程,当执行暂停操作只会影响下一轮的Scale upScale down而不会影响本轮的操作。是不是也侧面说明了kubernetes操作都是声明式的而非命令式的。

minReadySeconds的作用

.spec.minReadySeconds的作用是在Scale up的过程中新创建的pod在本身ready的基础上会再等上minReadySeconds才会认为pod已经是可用状态,然后才会接着开始scale down,相当于一个观察期的作用,防止新起的pod发生crash,进而影响服务的可用性,保证集群在更新过程的稳定性。

在测试过程中可以适当增加这个值,人为减慢滚动更新的进度,方便我们使用kubectl get rs -w观察滚动更新的过程。

Scale down过程中被kill Pod的优先级

在滚动更新Scale down阶段需要杀掉老的pod,这些需要被杀掉的pod是如何被筛选出来的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
	// No need to sort pods if we are about to delete all of them.
	// diff will always be <= len(filteredPods), so not need to handle > case.
	if diff < len(filteredPods) {
		// Sort the pods in the order such that not-ready < ready, unscheduled
		// < scheduled, and pending < running. This ensures that we delete pods
		// in the earlier stages whenever possible.
		// @xnile 尽可能删除较早的pod
		sort.Sort(controller.ActivePods(filteredPods))
	}
	return filteredPods[:diff]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s ActivePods) Less(i, j int) bool {
	// 1. Unassigned < assigned
	// If only one of the pods is unassigned, the unassigned one is smaller
	if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
		return len(s[i].Spec.NodeName) == 0
	}
	// 2. PodPending < PodUnknown < PodRunning
	m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
	if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
		return m[s[i].Status.Phase] < m[s[j].Status.Phase]
	}
	// 3. Not ready < ready
	// If only one of the pods is not ready, the not ready one is smaller
	if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
		return !podutil.IsPodReady(s[i])
	}
	// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
	//       see https://github.com/kubernetes/kubernetes/issues/22065
	// 4. Been ready for empty time < less time < more time
	// If both pods are ready, the latest ready one is smaller
	if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
		return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
	}
	// 5. Pods with containers with higher restart counts < lower restart counts
	if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
		return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
	}
	// 6. Empty creation time pods < newer pods < older pods
	if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
		return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
	}
	return false
}

注释已经解释的比较清楚了,就不赘述了。

调试

  • kubectl get rs -w watch rs 的变化
  • kubectl describe deploy <name> 查看deployment的更新状态

总结

Kubernetes Deployment滚动更新是靠新老RS交接棒完成的,新的RS scale up->老的RS scale down->新的RS scale up的…… 一直循环直到新的RS repliacs的数量达到期望值。在滚动更新的过程中会遵循:

  • 总pod数不能超过deployment.Spec.Replicas+.spec.strategy.rollingUpdate.maxSurge
  • 保证当前ready的pod不能少于deployment.Spec.Replicas - .spec.strategy.rollingUpdate.maxUnavailable

在生产环境实际操作中默认25%replicas基数很大的服务是不合适的,因为在滚动更新的一瞬间maxSurge可能突破你集群资源的上限,maxUnavailable也可能会击穿你服务性能水平的下限,因此一定要根据自己服务的情况做相应调整。

参考

https://github.com/kubernetes/kubernetes/issues/22065

https://github.com/kubernetes/kubernetes/pull/20368/commits/86aea1d59c42de15afbff5e2388e4b764bd134fc

https://github.com/kubernetes/kubernetes/pull/20368