本文基于client-go-0.20.14

我们都知道kubernetes采用声明式api,因此需要一种机制能让各种控制器能随时感知它所管理的各种资源对象的状态并采取相应的动作以达到声明所需要的状态,正是在此背景下此机制被抽象出来成为公共开发库,通常我们把这种机制称为Informer机制,在《client-go概览》中我们对Informer机制各组件做了简单介绍,并借助官方的示例让我们对整个架构的运行机制有了一初步了解,本篇将继续通过对源码的分析来深入学习下它的原理。

在正式开始源码分析前先来铺垫下SharedIndexInformer,在《client-go概览》一篇最后有提到过它,通过名字也能看出它是对Indexer和Informer的封装,看到Shared然后再联想到kubernetes的pod、deployment等各种资源是不是会给你第一眼觉得是用来让他们共享使用的,这样不用每个资源都创建一遍,再结合网上的一些资料:

Informer也被称为Shared Informer,它是可以共享使用的。若同一资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多的相同ListAndWatch ,太多重复的序列化和反序列化操作会导致api-server负载过重。

Shared Informer可以使同一个资源对象共享一个Reflector,这样可以节约很多资源;Shared Infor定义了一个map数据结构,通过map数据结构实现共享Informer机制。

我一开始接触时真以为它是"共享"的,一个Informer解决所有,不用每个Informer都从apiserver获取一遍,减少apiserver压力多合理,便当我在去查阅各种Controller源码时发现并不是这样,比如从ingress-nginx-controller我们可以看到这样的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Informer defines the required SharedIndexInformers that interact with the API server.
type Informer struct {
	Ingress   cache.SharedIndexInformer
	Endpoint  cache.SharedIndexInformer
	Service   cache.SharedIndexInformer
	Secret    cache.SharedIndexInformer
	ConfigMap cache.SharedIndexInformer
	Pod       cache.SharedIndexInformer
}
	// create informers factory, enable and assign required informers
	infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
		informers.WithTweakListOptions(func(*metav1.ListOptions) {}))

	if k8s.IsNetworkingIngressAvailable {
		store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer()
	} else {
		store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
	}

	store.listers.Ingress.Store = store.informers.Ingress.GetStore()

	store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
	store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

	store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
	store.listers.Secret.Store = store.informers.Secret.GetStore()

	store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
	store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

	store.informers.Service = infFactory.Core().V1().Services().Informer()
	store.listers.Service.Store = store.informers.Service.GetStore()

好家伙,这也没"共享"啊,各资源使用自己的Informer,看来这里Share仅理解成共享的作用是不全面的。

SharedIndexInformer

先看下接口定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type SharedInformer interface {

	AddEventHandler(handler ResourceEventHandler)

	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)

	GetStore() Store

	GetController() Controller

	Run(stopCh <-chan struct{})

	HasSynced() bool

	LastSyncResourceVersion() string

	SetWatchErrorHandler(handler WatchErrorHandler) error
}

// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

然后看下SharedIndexInformer具体实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller

	processor             *sharedProcessor
	cacheMutationDetector MutationDetector

	listerWatcher ListerWatcher

	// objectType is an example object of the type this informer is
	// expected to handle.  Only the type needs to be right, except
	// that when that is `unstructured.Unstructured` the object's
	// `"apiVersion"` and `"kind"` must also be right.
	objectType runtime.Object

	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
	// shouldResync to check if any of our listeners need a resync.
	resyncCheckPeriod time.Duration
	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
	// value).
	defaultEventHandlerResyncPeriod time.Duration
	// clock allows for testability
	clock clock.Clock

	started, stopped bool
	startedLock      sync.Mutex

	// blockDeltas gives a way to stop all event distribution so that a late event handler
	// can safely join the shared informer.
	blockDeltas sync.Mutex

	// Called whenever the ListAndWatch drops the connection with an error.
	watchErrorHandler WatchErrorHandler
}

sharedIndexInformer核心Run函数

sharedIndexInformer.Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
  
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

    Process:           s.HandleDeltas, // 供controller.processLoop()用
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

主要三件事:

  1. 传入Config构造controller
  2. 启动一协程运行启动协程运行s.cacheMutationDetector.Run
  3. 启动协程运行s.processor.run
  4. 调用s.controller.Run(stopCh)启动控制器

接下来我们继续深入

Controller

先看下定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// client-go/tools/cache/controller.go
type Controller interface {
	// Run does two things.  One is to construct and run a Reflector
	// to pump objects/notifications from the Config's ListerWatcher
	// to the Config's Queue and possibly invoke the occasional Resync
	// on that Queue.  The other is to repeatedly Pop from the Queue
	// and process with the Config's ProcessFunc.  Both of these
	// continue until `stopCh` is closed.
	Run(stopCh <-chan struct{})

	// HasSynced delegates to the Config's Queue
	HasSynced() bool

	// LastSyncResourceVersion delegates to the Reflector when there
	// is one, otherwise returns the empty string
	LastSyncResourceVersion() string
}

具体实现:

1
2
3
4
5
6
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}
  • Config 配置,控制器需要一些可供配置的选项,这个很好理解
  • Reflector 反射器,暂时先只需要知道它是用来从apiserver获取数据的,稍等会单独分析它

来看下Config都有什么

controller.config

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Config struct {
	Queue
	ListerWatcher	// List-Watch 接口,用来创建Reflector
	Process ProcessFunc	// 用来处理从DeltaFIFO队列中弹出的Deltas
	ObjectType runtime.Object // 希望List-Watch资源对象的类型,供Reflector使用
	FullResyncPeriod time.Duration // 全量同步的周期
	ShouldResync ShouldResyncFunc // 每次执行合同同步时会调用此函数判断是否需要同步
	RetryOnError bool
	WatchErrorHandler WatchErrorHandler
	WatchListPageSize int64
}

type Queue interface {
	Store
	Pop(PopProcessFunc) (interface{}, error)
	AddIfNotPresent(interface{}) error
	HasSynced() bool
	Close()
}

type ShouldResyncFunc func() bool
type ProcessFunc func(obj interface{}) error

可以看到Config很多属性是用来配置Reflector的,

继续来看下controller.Run函数,来看下控制器运行起来都做了什么。

controller.Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	// 启动协程接收关闭信号
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	// 1)创建Reflector
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	// 2)创建协程执行Reflector.Run()
	wg.StartWithChannel(stopCh, r.Run)

	// 3)每秒执行c.processLoop()
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

函数主要做了三件事:

  1. 创建Reflector
  2. 创建协程执行Reflector.Run()
  3. 每秒执行controller.processLoop()

接下来我们来下看下controller.processLoop()

controller.processLoop()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (c *controller) processLoop() {
	for {
    // Queue接口是在sharedIndexInformer.Run()中赋值的,是DeltaFIFO类型 
    // 这里c.config.Process也是在sharedIndexInformer.Run()中传入的,为sharedIndexInformer.HandleDeltas
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		delete(f.items, id)
    // 调用PopProcessFunc处理item
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

processLoop()主要是Queue中弹出item并调用PopProcessFunc处理它,这里的Queue接口的具体实例化对象是在上边sharedindexinformer.run()函数中传入的。至于元素具体是怎么被处理的,我们要看下这里PopProcessFunc是怎么来的,也是在sharedindexinformer.run()函数赋值的,为sharedIndexInformer.HandleDeltas,所以继续往下看

sharedIndexInformer.HandleDeltas

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	// 注意,这里是Deltas
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			// 更新本地cache,在《Kubernetes client-go Indexer解析》中介绍过,cache是indexer的具体实现
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					// Sync events are only propagated to listeners that requested resync
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

函数逻辑很好理解,遍历Deltas然后根据Delta的类型(Sync/Replaced/Added/Updated/Deleted)做两件事:

  • 处理本地缓存(Add/Update/Delete)
  • 派发事件(addNotification/updateNotification/deleteNotification)

关于本地缓存可以参考下Kubernetes client-go Indexer解析,就不再延展下去了,这里重点来看下事年派发的细节。

sharedIndexInformer.processor

Informer一个很重要的功能就是当感知资源变化后要执行对应的事件处理回调函数,processor的作用就在此。先抛开代码如果是我们自己要实现此功能是不是会首先想到消息队列,Producer->Message Broker->Consumer,基于此思想我们再来看下源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

type processorListener struct {
	nextCh chan interface{}
	addCh  chan interface{}
	handler ResourceEventHandler
	pendingNotifications buffer.RingGrowing
	requestedResyncPeriod time.Duration
	resyncPeriod time.Duration
	nextResync time.Time
	resyncLock sync.Mutex
}

续接上节我们先来看下事件是什么派发下去的

distribute
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	// 遍历订阅者扩播消息(通知)
	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}  

逻辑比较清楚,就是遍历订阅者把消息广播出,广播的内容也即消息协议是什么呢

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type updateNotification struct {
	oldObj interface{}
	newObj interface{}
}

type addNotification struct {
	newObj interface{}
}

type deleteNotification struct {
	oldObj interface{}
}

三种类型,分别对应对象的新增、更新、删除事件。

接下来分析下消息在Producer->Message Broker->Consumer之间是什么流转的,也是sharedProcessor的核心逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:  // 有通知
			// Notification dispatched
			var ok bool
			// 缓存中还有没有未处理的通知,有就读出来等待下次处理
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			// channel关闭
			if !ok {
				return
			}
			// 待通知为空
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				// 还有未处理的通知,就先加到缓存里
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}
  • run() 函数不断的channel的取出消息(通知)执行我们配置事件处理函数
  • pop()函数设计比较巧妙, 利用两个channel结合golang select 多个case同时满足时会随机触发其实中一个特性实现了缓冲功能,是整个sharedProcessor的核心,因为条件判断比较多代码读起有点费劲,为了方便理解我画了一张图来辅助理解。

总结下pop()

  1. 第一次会走步骤3,其它随机步骤2或3
  2. 把待通知发送到通道里,然后判断缓存中是否还有未处理的通知,没有的话就将通道置为nil下次你循环不会进入此步骤,缓存中如果还有未处理的通知的话就取出一条放到待通知进入下次循环
  3. 从p.addCh通道中取出一条消息,待通知如果为空则放到待通知并初始化nextCh(因为另一协程已在通道那头随时准备读取数据,所以此时通道已就绪可以发数据了),待通知如果不为空就先放到缓存中,进入下次循环

ps:初次看这里时会有个疑惑,为什么需要两个channel?一进一出感觉一个channel就能解决,看到这里大概知道用意了,设计很巧妙,不过为啥不用限流的方法呢?

参考

https://blog.csdn.net/weixin_42663840/article/details/81699303