📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

在第二章我们会去看 k8s 中常用对象的源码,不过在看这些对象之前,我们需要聊一聊 informer 机制。这个机制可以说是 k8s 设计之中的一个重点了。这个机制的设计不仅仅让代码本身变得清晰,更让整个系统的结构更容易扩展。所以这个机制需要放到第二章的第一节来说。

前置知识

  • 控制循环
  • informer 的使用

心路历程

我第一接触 informer 是在使用 client-go 的时候。相信有很多同学和我一样,学习 k8s 的路径通常是,从基本的使用开始,然后慢慢的有一些自定义的需求需要使用 client-go 进行开发。使用 client-go 开发真的很方便,能力很强大。而在其中我第一次碰到了 informer。从了解了这个机制之后,才逐渐明白 k8s 本身是如何去控制里面的资源的。

还是一样的,本文不涉及具体这个机制的详细原理,更专注在源码本身。当然,我先通过两个小点帮助你回忆起来 informer 机制。

控制循环

首先是控制循环,这个我认为是 k8s 的精髓,它通过一个循环来让整个系统趋向与我们申明的一个期望状态。 https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md#writing-controllers

1
2
3
4
5
6
7
8
9
for {
实际状态 := 获取集群中对象 X 的实际状态(Actual State)
期望状态 := 获取集群中对象 X 的期望状态(Expectation State)
if 实际状态 == 期望状态{
什么都不做
} else {
执行编排动作,将实际状态调整为期望状态
}
}

informer 的用法

下面的例子说明了 informer 的用法 https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md#rough-structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewController(pods informers.PodInformer) *Controller {
c := &Controller{
pods: pods.Lister(),
podsSynced: pods.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller-name"),
}

pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// ...
},
UpdateFunc: func(old interface{}, new interface{}) {
// ...
},
DeleteFunc: func(obj interface{}) {
// ...
},
},)

return c
}

其他你都不需要看,关键在于 AddEventHandler ,看到它你就知道了,本质就是让你能监听一些变动的事件,当事件来的时候,你就会知道,具体你知道了之后干嘛,就是你的事情了。

informer 的流程图

下面这个图非常清晰的说明了 informer 的流程和关系,记住这个图片,后面还会用到

k8s-informer-flow.png

控制对象的思考

结合以上回忆,思考一下:如果我们希望去控制一个对象,那么我们需要知道这个对象现在的状态是什么,或者知道它发生了什么变化,变化能不能满足我们的期望,如果不满足应该怎么调整。那么,想要知道一个对象的状态有两种方式,一种是你主动去查询,对吧,而另一种就是让别人告诉你。而 informer 就是后一种。

码前提问

  1. informer 有那几个组件?
  2. informer 机制是怎么样的?
  3. 为什么需要 informer?

源码分析

这次寻码的原因和之前不太一样,之前我们都是为了看某一个东西的源码,就去搜索相关的代码。而这次是在 client-go 的使用过程中产生的好奇,所以从使用的角度,就很容易去寻码了。因为你需要使用这个方法去创建一个 informer ,那么你就会想知道里面究竟发生了什么对吧?那么这次我们就从这个方法 NewIndexerInformernewInformer 开始。

初始化

从初始化我们就可以知道 informer 里面究竟有什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
// staging/src/k8s.io/client-go/tools/cache/controller.go:380
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, Controller) {
// This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}

这个方法可以帮助我们创建 indexerinformer 我们先不管什么是 indexer。直接 newInformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// staging/src/k8s.io/client-go/tools/cache/controller.go:483
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
})

cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,

Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}

好,一个迷惑点来了,返回一个 ControllerController?傻傻分不清楚,这个 Controller 和我们常说的 Controller 组件完全不是一个东西,这个 Controller 是一个接口,其实它就是 Informer

关键来了,初始化的时候里面有一个 DeltaFIFO 的队列。有一个 Process 的处理方法,我们将 h 也就是我们外部的 handler 函数放进去了,这个 h 就是我们外部用户在使用 client-go 时申明的需要如何处理事件的方法。

事件去了哪里

源码阅读技巧:你不一定非要按事件的来龙去脉来看源码,有什么看什么,最后再串起来也是可以的

由于我们现在看到了 Process 方法,知道这里是处理事件的,于是我们先看这个 processDeltas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// staging/src/k8s.io/client-go/tools/cache/controller.go:436
func processDeltas(
handler ResourceEventHandler,
clientState Store,
deltas Deltas,
isInInitialList bool,
) error {
for _, d := range deltas {
obj := d.Object

switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
clientState.Update(obj)
handler.OnUpdate(old, obj)
} else {
clientState.Add(obj)
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
clientState.Delete(obj)
handler.OnDelete(obj)
}
}
return nil
}

删减了部分代码,不说一目了然,可以说是非常明确了。根据不同的事件类型,调用外部对于 handler 的方法处理事件就可以了。

事件从哪里来

那么问题就来了,这些要处理的事件是从哪里来的呢?于是我们看 processDeltas 方法的调用方 Process ,也就是从下往上找,是谁调用了 Process 方法呢?还好引用的地方不多,容易被找到关键在这里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// staging/src/k8s.io/client-go/tools/cache/controller.go:186
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

不用多解释,这里 queue 就是我们之前在初始化看到的 DeltaFIFO,虽然我们不知道队列里面是做了什么,但无非这个循环的意思就是,从队列中不断 Pop 出事件,然后调用 Process 去处理这个事件。而我们此时可以顺变看一眼,processLoop 方法是在 controller(Informer) Run 也就是启动的时候被一起启动了(代码这里按下不表)。_当然这个循环中有一个重试的机制,如果遇到需要重试的任务,会重新放到队列里面去,一个小的不错设计_。

那么,只要我们知道是哪里在往这个队列里面塞数据,就知道事件从哪里来了。

好,下一个坑就出现了,由于我们是倒着看的,那么我想知道谁往队列里面塞数据,如果你想要看这个 queue 有多少地方在放数据,你会发现太多了,由于 DeltaFIFO 这个实现到处都在引用,所以这样看是很难找的。于是我们需要回到原理上来。看图说话,在最上面说 informer 的流程图的时候我们可以看到有一个 Reflector 的东西在放数据。于是乎我们应该去寻找的是这个东西,你好像在哪里看到过呢?没错 Run 的时候也就是在执行 processLoop 之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// staging/src/k8s.io/client-go/tools/cache/controller.go:129
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
Clock: c.clock,
},
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group

wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

你可以发现,初始化 Reflector 的时候将 c.config.Queue 放进去了作为它的 store,那么关键就在这里面了。之后的链路是: r.Run -> r.ListAndWatch -> r.watch -> watchHandler, 好家伙, 链路还有点长的。好在代码并不复杂。在 watchHandler 中有如下精髓:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// staging/src/k8s.io/client-go/tools/cache/reflector.go:743
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
case watch.Modified:
err := store.Update(event.Object)
case watch.Deleted:
err := store.Delete(event.Object)
case watch.Bookmark:
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}

破案了,store.Add 明白了,原来你在这里。这里我们就可以串起来了,图也就非常明白了,代码也疏通了。

总结一下

  1. Reflector 监听资源变动,将变动放到队列 DeltaFIFO
  2. Informer 不停的从队列中拿取,并调用外部的 handler 进行处理
  3. 至于外部怎么处理的,那是外部的事情了

码后解答

  1. informer 有那几个组件
    1. 关键组件是 Reflector 和 DeltaFIFO
  2. informer 机制是怎么样的
    1. 监听、同步、事件处理(外部)、重试
  3. 为什么需要 informer
    1. 这是一个小设计了,我们在下面总结提升详细说

总结提升

设计上

就像我们一开始说的那样,想要控制一个对象,你需要先知道对象的状态。那么第一个设计的优点就来了:与其不停的去通过 API 查询对象的状态,不如你自己主动去监听状态的变化。这是一种事件机制的设计,在很多地方都会用到。而主要的原因是查询的无效次数过多,而且 API 的压力又大。

而在 k8s 中太多需要监控对象的地方了,如果无论是谁来都要写一遍监控的代码,并且还要处理各种事件的解析、队列、重试..太麻烦了,于是 k8s 将其抽象为 Informer 的机制。从外部你只需要关注如何 handle 事件就可以了,从代码看有一种函数闭包的思想在里面。而且哦,关键在于 DeltaFIFO 还有各种细节的优化。

编码上

DeltaFIFO 真的是一个很不错设计,是值得我们去学习,并且在其他项目中可以直接拿来抄的一种优化方案。

  1. go 里面利用 sync.cond 的不多见,这个队列算是一个经典案例。Pop 的时候没有元素的时候会 Wait 等着,有元素来了会 Broadcast 通知,节省资源
  2. dedupDeltas 里面会 This will combine the most recent two deltas if they are the same. 也就所谓的压缩事件,也就 “聚合”,这是在给消费端减负,相同的事件你只需处理一次就好了,相当于这里就帮你过滤了一次,好贴心。

总结

从原理上来说 informer 本身不复杂,而且真的是一个不错的设计,从我的感受上来总结可以用两个词 事件+解耦 ,一个事件通知机制加上一个抽象解耦的实现。希望你能体会到,之后我们会用到它。当然这里介绍的是 informer 本身,它的前后都还有好多小助手哦。