Motivation

你肯定已经会在 yaml 文件中添加如下的配置来为容器添加 cpu 、内存资源请求和限制

1
2
3
4
5
6
7
     resources:
          requests:
            cpu: "0.5"
            memory: "512Mi"
          limits:
            cpu: "1"
            memory: "1Gi"

当我们添加这些参数后 Kubernetes 到底为我们做了什么呢?"0.5""1"又是什么意思呢?

本文将基于kubernetes-1.28.4源码来探究下这背后的原理,彻底搞明白 "request""limit" 的原理和用法。

源码分析

容器资源管理的逻辑在 kubelet cm下边的cgroup_manager模块,

首先来看下cgroups操作的接口CgroupManager定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// CgroupManager allows for cgroup management.
// Supports Cgroup Creation ,Deletion and Updates.
type CgroupManager interface {
	// Create creates and applies the cgroup configurations on the cgroup.
	// It just creates the leaf cgroups.
	// It expects the parent cgroup to already exist.
	Create(*CgroupConfig) error
	// Destroy the cgroup.
	Destroy(*CgroupConfig) error
	// Update cgroup configuration.
	Update(*CgroupConfig) error
	// Validate checks if the cgroup is valid
	Validate(name CgroupName) error
	// Exists checks if the cgroup already exists
	Exists(name CgroupName) bool
	// Name returns the literal cgroupfs name on the host after any driver specific conversions.
	// We would expect systemd implementation to make appropriate name conversion.
	// For example, if we pass {"foo", "bar"}
	// then systemd should convert the name to something like
	// foo.slice/foo-bar.slice
	Name(name CgroupName) string
	// CgroupName converts the literal cgroupfs name on the host to an internal identifier.
	CgroupName(name string) CgroupName
	// Pids scans through all subsystems to find pids associated with specified cgroup.
	Pids(name CgroupName) []int
	// ReduceCPULimits reduces the CPU CFS values to the minimum amount of shares.
	ReduceCPULimits(cgroupName CgroupName) error
	// MemoryUsage returns current memory usage of the specified cgroup, as read from the cgroupfs.
	MemoryUsage(name CgroupName) (int64, error)
	// Get the resource config values applied to the cgroup for specified resource type
	GetCgroupConfig(name CgroupName, resource v1.ResourceName) (*ResourceConfig, error)
	// Set resource config for the specified resource type on the cgroup
	SetCgroupConfig(name CgroupName, resource v1.ResourceName, resourceConfig *ResourceConfig) error
}

定义了cgroups有关的全部操作,它的具体实现cgroupManagerImpl

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// cgroupManagerImpl implements the CgroupManager interface.
// Its a stateless object which can be used to
// update,create or delete any number of cgroups
// It relies on runc/libcontainer cgroup managers.
type cgroupManagerImpl struct {
	// subsystems holds information about all the
	// mounted cgroup subsystems on the node
	subsystems *CgroupSubsystems

	// useSystemd tells if systemd cgroup manager should be used.
	useSystemd bool
}

// Make sure that cgroupManagerImpl implements the CgroupManager interface
var _ CgroupManager = &cgroupManagerImpl{}

CgroupManager接口有很多函数,我们目的是快速串起上下流,所以这里简单只以 Create 为代表分析,对其它函数感兴趣的同学可以自行分析,不废话看代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Create creates the specified cgroup
func (m *cgroupManagerImpl) Create(cgroupConfig *CgroupConfig) error {
	// ...省略
  
	libcontainerCgroupConfig := m.libctCgroupConfig(cgroupConfig, true)
	manager, err := manager.New(libcontainerCgroupConfig)
	if err != nil {
		return err
	}

	// Apply(-1) is a hack to create the cgroup directories for each resource
	// subsystem. The function [cgroups.Manager.apply()] applies cgroup
	// configuration to the process with the specified pid.
	// It creates cgroup files for each subsystems and writes the pid
	// in the tasks file. We use the function to create all the required
	// cgroup files but not attach any "real" pid to the cgroup.
	if err := manager.Apply(-1); err != nil {
		return err
	}

	// it may confuse why we call set after we do apply, but the issue is that runc
	// follows a similar pattern.  it's needed to ensure cpu quota is set properly.
	if err := manager.Set(libcontainerCgroupConfig.Resources); err != nil {
		utilruntime.HandleError(fmt.Errorf("cgroup manager.Set failed: %w", err))
	}

	return nil
}

这里代码的注释很详细,好评,函数主要做了两件事

  • 调用 manager.Apply(-1) 创建 cgroups 目录结构。
  • 调用 manager.Set(libcontainerCgroupConfig.Resources) 设置对应的资源限制。

这里的 manager 是什么呢?

根据导入模块知道它是位于runc libcontainer下的 manager.New 函数,来看下函数内容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// New returns the instance of a cgroup manager, which is chosen
// based on the local environment (whether cgroup v1 or v2 is used)
// and the config (whether config.Systemd is set or not).
func New(config *configs.Cgroup) (cgroups.Manager, error) {
	return NewWithPaths(config, nil)
}

// NewWithPaths is similar to New, and can be used in case cgroup paths
// are already well known, which can save some resources.
//
// For cgroup v1, the keys are controller/subsystem name, and the values
// are absolute filesystem paths to the appropriate cgroups.
//
// For cgroup v2, the only key allowed is "" (empty string), and the value
// is the unified cgroup path.
func NewWithPaths(config *configs.Cgroup, paths map[string]string) (cgroups.Manager, error) {
	if config == nil {
		return nil, errors.New("cgroups/manager.New: config must not be nil")
	}
	if config.Systemd && !systemd.IsRunningSystemd() {
		return nil, errors.New("systemd not running on this host, cannot use systemd cgroups manager")
	}

	// Cgroup v2 aka unified hierarchy.
	if cgroups.IsCgroup2UnifiedMode() {
		path, err := getUnifiedPath(paths)
		if err != nil {
			return nil, fmt.Errorf("manager.NewWithPaths: inconsistent paths: %w", err)
		}
		if config.Systemd {
			return systemd.NewUnifiedManager(config, path)
		}
		return fs2.NewManager(config, path)
	}

  // 只分析 systemd v1 情况
	// Cgroup v1.
	if config.Systemd {
		return systemd.NewLegacyManager(config, paths)
	}

	return fs.NewManager(config, paths)
}

主要作用是根据 kubelet 配置的 cgroups 驱动类型初始化相应的 Manager。

由于当前主流 kubernetes 集群仍然以 systemd v1 为主,因此本文仅以 systemd v1 为例,于是焦点来到systemd.NewLegacyManager函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type legacyManager struct {
	mu      sync.Mutex
	cgroups *configs.Cgroup
	paths   map[string]string
	dbus    *dbusConnManager
}

func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) (cgroups.Manager, error) {
	if cg.Rootless {
		return nil, errors.New("cannot use rootless systemd cgroups manager on cgroup v1")
	}
	if cg.Resources != nil && cg.Resources.Unified != nil {
		return nil, cgroups.ErrV1NoUnified
	}
	if paths == nil {
		var err error
		paths, err = initPaths(cg)
		if err != nil {
			return nil, err
		}
	}
	return &legacyManager{
		cgroups: cg,
		paths:   paths,
		dbus:    newDbusConnManager(false), // @xnile dbus 接口
	}, nil
}

继续记下看, manager.Apply 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (m *legacyManager) Apply(pid int) error {
	var (
		c          = m.cgroups
		unitName   = getUnitName(c)
		slice      = "system.slice"
		properties []systemdDbus.Property
	)

	m.mu.Lock()
	defer m.mu.Unlock()

	if c.Parent != "" {
		slice = c.Parent
	}

	properties = append(properties, systemdDbus.PropDescription("libcontainer container "+c.Name))

	if strings.HasSuffix(unitName, ".slice") {
		// If we create a slice, the parent is defined via a Wants=.
		properties = append(properties, systemdDbus.PropWants(slice))
	} else {
		// Otherwise it's a scope, which we put into a Slice=.
		properties = append(properties, systemdDbus.PropSlice(slice))
		// Assume scopes always support delegation (supported since systemd v218).
		properties = append(properties, newProp("Delegate", true))
	}

	// only add pid if its valid, -1 is used w/ general slice creation.
	if pid != -1 {
		properties = append(properties, newProp("PIDs", []uint32{uint32(pid)}))
	}

	// Always enable accounting, this gets us the same behaviour as the fs implementation,
	// plus the kernel has some problems with joining the memory cgroup at a later time.
	properties = append(properties,
		newProp("MemoryAccounting", true),
		newProp("CPUAccounting", true),
		newProp("BlockIOAccounting", true),
		newProp("TasksAccounting", true),
	)

	// Assume DefaultDependencies= will always work (the check for it was previously broken.)
	properties = append(properties,
		newProp("DefaultDependencies", false))

	properties = append(properties, c.SystemdProps...)

	// @xnile 通过dbus与systemd通讯创建cgroup子目录
	if err := startUnit(m.dbus, unitName, properties, pid == -1); err != nil {
		return err
	}

	// @xnile 将 Proc PID 写入 procs
	if err := m.joinCgroups(pid); err != nil {
		return err
	}

	return nil
}

函数主要做了两件事:

  • 通过 dbus调用 systemd API 创建 cgroups 目录。

    注意这里只创建 pod 级的 cgroups 目录,容器级的 cgroups 目录通过 CRI 接口调用具体 CRI Runtime 创建和管理,如DockercontainerdRI-O

  • 通过m.joinCgroups(pid) 将Proc PID 写入相应子系统的 procs 文件,使资源限制生效。

来看下 m.joinCgroups 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (m *legacyManager) joinCgroups(pid int) error {
	for _, sys := range legacySubsystems {
		name := sys.Name()
		switch name {
		case "name=systemd":
			// let systemd handle this
		case "cpuset":
			if path, ok := m.paths[name]; ok {
				s := &fs.CpusetGroup{}
				if err := s.ApplyDir(path, m.cgroups.Resources, pid); err != nil {
					return err
				}
			}
		default:
			if path, ok := m.paths[name]; ok {
				if err := os.MkdirAll(path, 0o755); err != nil {
					return err
				}
				if err := cgroups.WriteCgroupProc(path, pid); err != nil {
					return err
				}
			}
		}
	}

	return nil
}

var legacySubsystems = []subsystem{
	&fs.CpusetGroup{},
	&fs.DevicesGroup{},
	&fs.MemoryGroup{},
	&fs.CpuGroup{},
	&fs.CpuacctGroup{},
	&fs.PidsGroup{},
	&fs.BlkioGroup{},
	&fs.HugetlbGroup{},
	&fs.PerfEventGroup{},
	&fs.FreezerGroup{},
	&fs.NetPrioGroup{},
	&fs.NetClsGroup{},
	&fs.NameGroup{GroupName: "name=systemd"},
	&fs.RdmaGroup{},
	&fs.NameGroup{GroupName: "misc"},
}

可以看到通过遍历 cgroups 子系统通过操作文件的方式修改 cgroups 资源限制。

这里我有个疑问?欢迎朋友们指点。

创建cgroups 目录通过dbus直接调用systemd的api,修改cgroups资源配置时为什么不使用同样的方式而要使用直接操作文件的方式呢?

接着看 manager.Set 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (m *legacyManager) Set(r *configs.Resources) error {
		// ... 省略
  
	setErr := setUnitProperties(m.dbus, unitName, properties...)
	if needsThaw {
		if err := m.doFreeze(configs.Thawed); err != nil {
			logrus.Infof("thaw container after SetUnitProperties failed: %v", err)
		}
	}
	if setErr != nil {
		return setErr
	}

	for _, sys := range legacySubsystems {
		// Get the subsystem path, but don't error out for not found cgroups.
		path, ok := m.paths[sys.Name()]
		if !ok {
			continue
		}
		if err := sys.Set(path, r); err != nil {
			return err
		}
	}

	return nil
}

同样遍历子系统并调用子系统的 Set 函数,全部cgroups子系统都在这个目录下,来看下CpuGroup.Set()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (s *CpuGroup) Set(path string, r *configs.Resources) error {
	if r.CpuShares != 0 {
		shares := r.CpuShares
		if err := cgroups.WriteFile(path, "cpu.shares", strconv.FormatUint(shares, 10)); err != nil {
			return err
		}
		// read it back
		sharesRead, err := fscommon.GetCgroupParamUint(path, "cpu.shares")
		if err != nil {
			return err
		}
		// ... and check
		if shares > sharesRead {
			return fmt.Errorf("the maximum allowed cpu-shares is %d", sharesRead)
		} else if shares < sharesRead {
			return fmt.Errorf("the minimum allowed cpu-shares is %d", sharesRead)
		}
	}

	var period string
	if r.CpuPeriod != 0 {
		period = strconv.FormatUint(r.CpuPeriod, 10)
		if err := cgroups.WriteFile(path, "cpu.cfs_period_us", period); err != nil {
			// Sometimes when the period to be set is smaller
			// than the current one, it is rejected by the kernel
			// (EINVAL) as old_quota/new_period exceeds the parent
			// cgroup quota limit. If this happens and the quota is
			// going to be set, ignore the error for now and retry
			// after setting the quota.
			if !errors.Is(err, unix.EINVAL) || r.CpuQuota == 0 {
				return err
			}
		} else {
			period = ""
		}
	}
	if r.CpuQuota != 0 {
		if err := cgroups.WriteFile(path, "cpu.cfs_quota_us", strconv.FormatInt(r.CpuQuota, 10)); err != nil {
			return err
		}
		if period != "" {
			if err := cgroups.WriteFile(path, "cpu.cfs_period_us", period); err != nil {
				return err
			}
		}
	}
	return s.SetRtSched(path, r)
}

翻云拨雾,原来我们在yaml中添加的spec.containers[].resources. 资源请求和限制相关的配置最终会写入到cgroup相关子系统的配置文件中,交由内核cgroup完成。

cpu.shares、cpu.cfs_period_us、cpu.cfs_quota_us

具体来看,与cpu资源控制的操作汲到cpu.sharescpu.cfs_quota_uscpu.cfs_period_us三个cgroup配置文件,那它们的作用分别是什么呢?

  • cpu.shares

    相对值,用于设置进程在全部 CPU 核心上可获得的相对 CPU 时间份额。

    例如,两个具有设置为100的 cpu.shares 的 cgroups 中的任务将获得相等的 CPU 时间,而一个具有设置为200的 cpu.shares 的 cgroup 中的任务将获得比设置为100的 cgroups 中的任务多一倍的 CPU 时间。在 cpu.shares 文件中指定的值必须为2或更高。

    它控制进程获取 CPU 时间的优先级,获取 CPU 时间的下限,当然如果节点当前 CPU 未超卖,利用率未达到100%,进程是可以突破原本比例完全利用剩余全部 CPU 时间的

  • cpu.cfs_period_us

    时间周期,以微妙为单位,告诉 CFS 调度器在该周期内进行CPU时间片分配。

  • cpu.cfs_quota_us

    设置 CFS 调度器调度周期内进程能使用 CPU 的最长时间,微秒为单位。

    cpu.cfs_period_uscpu.cfs_quota_us配合起来限制在CFS调度周期进程能获取多少 CPU 时间,众核平等,不关心节点有多少核。

概括起来就是cpu.shares控制进程能获取 CPU 时间的下限,cpu.cfs_period_uscpu.cfs_quota_us共同作用控制进程能获取 CPU 时间的下限,对应到 kubernetes 又分别对应什么参数呢呢?

  • cpu.shares 对应 yaml 文件中的 spec.containers[].resources.requests.cpu

  • cpu.cfs_period_us 由 kubelet 启动时通过配置传入。

    cpuCFSQuotaPeriod is the CPU CFS quota period value, cpu.cfs_period_us. The value must be between 1 ms and 1 second, inclusive. Requires the CustomCPUCFSQuotaPeriod feature gate to be enabled. Default: “100ms”

  • cpu.cfs_quota_us 对应 yaml 文件中的 spec.containers[].resources.limits.cpu

CPU 单位

再来看下 CPU 单位

Limits and requests for CPU resources are measured in cpu units. In Kubernetes, 1 CPU unit is equivalent to 1 physical CPU core, or 1 virtual core, depending on whether the node is a physical host or a virtual machine running inside a physical machine.

Fractional requests are allowed. When you define a container with spec.containers[].resources.requests.cpu set to 0.5, you are requesting half as much CPU time compared to if you asked for 1.0 CPU. For CPU resource units, the quantity expression 0.1 is equivalent to the expression 100m, which can be read as “one hundred millicpu”. Some people say “one hundred millicores”, and this is understood to mean the same thing.

cpu: "1"表示一个物理核或虚拟核,特别需要注意它是一个绝对值,跟节点的核数无关。

CPU resource is always specified as an absolute amount of resource, never as a relative amount. For example, 500m CPU represents the roughly same amount of computing power whether that container runs on a single-core, dual-core, or 48-core machine.

kubernetes 内部用 millicpu 存储配置,"1" 会转化为1000m 存储。

认真听讲的同学可能会有疑问了,上边在解释cpu.shares时特别强调了它是一个相对值且跟节点的核数是有关系的,那为什么到了 Kubernetes 中就成了绝对值了呢,这是又怎么会事呢?

我们来看下spec.containers[].resources.requests.cpucpu.shares的计算函数MilliCPUToShares

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const (
	// These limits are defined in the kernel:
	// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
	MinShares = 2
	MaxShares = 262144

	SharesPerCPU  = 1024
	MilliCPUToCPU = 1000

	// 100000 microseconds is equivalent to 100ms
	QuotaPeriod = 100000
	// 1000 microseconds is equivalent to 1ms
	// defined here:
	// https://github.com/torvalds/linux/blob/cac03ac368fabff0122853de2422d4e17a32de08/kernel/sched/core.c#L10546
	MinQuotaPeriod = 1000
)

// MilliCPUToShares converts the milliCPU to CFS shares.
func MilliCPUToShares(milliCPU int64) uint64 {
	if milliCPU == 0 {
		// Docker converts zero milliCPU to unset, which maps to kernel default
		// for unset: 1024. Return 2 here to really match kernel default for
		// zero milliCPU.
		// 如果未设置requests.cpu 则 cpu.shares 为2
		return MinShares
	}
	// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
	shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
	if shares < MinShares {
		return MinShares
	}
	if shares > MaxShares {
		return MaxShares
	}
	return uint64(shares)
}

很巧妙的将一个 CPU 核心先假定为1024,然后通过 shares := (milliCPU * 1024) / 1000 将计算得值写入cpu.shares 文件,虽然最终容器能获取到的 CPU 时间是个与节点核数及运行的容器总数有关的相对值,但只要保证下限能满足request的请求的,上限交个 limit 即可,否则如果不这样设计的话,调度时就要考虑到当调度到只一个核心的节点和多个核心节点的区别,增加了调度的复杂度。有个与此相关的 issue ,有兴趣的可以看下。

由于内存是不可压缩资源,spec.containers[].resources.requests.memorycgroups 中没有对应属性,其值只用于 scheduler 调度。

总结

  • 通过cpu.shares设置请求 CPU ,保证容器所需 CPU 的下限;

  • 通过 cpu.cfs_period_uscpu.cfs_quota_us 配置限制容器获取 CPU 的上限;

  • CPU 单位是一个绝对值,与节点有多少核无关;

  • 如未设置requests.cpu 则 cpu.shares 置为2;

  • requestlimit 小于1个cpu时为避免出错推荐millicpu的写法。

    Kubernetes doesn’t allow you to specify CPU resources with a precision finer than 1m or 0.001 CPU. To avoid accidentally using an invalid CPU quantity, it’s useful to specify CPU units using the milliCPU form instead of the decimal form when using less than 1 CPU unit.

    For example, you have a Pod that uses 5m or 0.005 CPU and would like to decrease its CPU resources. By using the decimal form, it’s harder to spot that 0.0005 CPU is an invalid value, while by using the milliCPU form, it’s easier to spot that 0.5m is an invalid value.

参考

https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/

https://kubernetes.io/docs/concepts/architecture/cgroups/

https://batey.info/cgroup-cpu-shares-for-docker.html

https://www.redhat.com/sysadmin/cgroups-part-two

https://forums.gentoo.org/viewtopic-t-1132431-start-0.html

https://medium.com/@jettycloud/making-sense-of-kubernetes-cpu-requests-and-limits-390bbb5b7c92

https://github.com/kubernetes/kubernetes/issues/24925

https://github.com/kubernetes/cri-api/blob/c75ef5b/pkg/apis/runtime/v1/api.proto