📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

前面说完了 Service 和 kube-proxy,那就自然轮到了我们的 Ingress 了。Ingress 作为 Kubernetes 中负责外部流量路由的重要组件,其实现原理和设计思路值得我们深入研究。当然,这里也要说明的是,对于 Ingress 其实在这个版本里面已经有一点点 “落伍” ,官方更为推荐的是使用 Kubernetes Gateway API 来做同样的事情,并且能力更为强大。支持更丰富的协议,更精细化的流量控制,以及对于权限的控制等等。不过在这里不涉及这个部分,感兴趣同学可以自行去查看相关源码。

前置知识

  • Ingress
  • Ingress Controller

心路历程

之前我也写过对于 Ingress 的基础概念的说明,我称为 service 的 service。其实这个对象的出现是一种顺理成章的事情,对于一个多业务场景下的方案来说,不同域名访问不同服务,一定会需要一个网关这样的角色。而对于大的场景还是小的场景都需要不同的网关去做处理,但实际的能力大差不差。我们最熟悉的就是 https://github.com/kubernetes/ingress-nginx 。所以本文也将以它为源码分析的对象。

码前提问

同样的,在开始源码分析之前,让我们先思考几个问题:

  1. ingress-nginx 本身究竟是个什么东西?
  2. Ingress Controller 是如何监听到 Ingress 资源的变化的?
  3. Ingress Controller 是如何将规则转换为实际的负载均衡配置的?

源码分析

Ingress 数据结构

先看一下我们熟悉的 yaml 配置

1
2
3
4
5
6
7
8
9
10
11
rules:
- host: api.linkinstars.com
http:
paths:
- path: /user
pathType: Prefix
backend:
service:
name: user
port:
number: 8080

数据结构的部分,我相信你很容易可以找到对应的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// staging/src/k8s.io/api/networking/v1beta1/types.go:35
type Ingress struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
Spec IngressSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
Status IngressStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

// staging/src/k8s.io/api/networking/v1beta1/types.go:73
type IngressSpec struct {
IngressClassName *string `json:"ingressClassName,omitempty" protobuf:"bytes,4,opt,name=ingressClassName"`
Backend *IngressBackend `json:"backend,omitempty" protobuf:"bytes,1,opt,name=backend"`
TLS []IngressTLS `json:"tls,omitempty" protobuf:"bytes,2,rep,name=tls"`
Rules []IngressRule `json:"rules,omitempty" protobuf:"bytes,3,rep,name=rules"`
}

所以 Ingress 对象本身很简单,就是记录路由的规则,而这个规则则是 Ingress Controller 最为关注的东西。下面我们就来看看我们今天的重点,也就是 ingress-nginx 的具体实现。注意下面的源码都来自于它,而不是 k8s 主项目中。

ingress-nginx

如果你对于 ingress-nginx 不是特别熟悉,强烈建议使用一次,一次就懂。然后可以先看:ingress-nginx/deploy/static/provider/cloud/deploy.yaml 部署。看了这个文件其实对于它本身你就觉得不是什么很可怕的东西了。其中抛开角色、权限、配置相关对象之外就只有:

  • 一个名叫 ingress-nginx-controller 的 Service
  • 一个名叫 ingress-nginx-controller 的 Deployment
  • 一个名叫 nginx 的 IngressClass

其他可以忽略,重点其实就是他们几个。其实看到 Deployment 的时候,我就已经一下子明白了。其实本质它还是一个 Deployment 服务而已,而 Service 是一个 LoadBalancer 。相信到此你应该已经了解基本的结构了,在没有看源码的时候你就可以大胆去猜测了,可能它就是通过一个服务去获取路由规则,而通过 LB 把流量接进来,通过 nginx 的实例将流量按规则去路由而已。而得益于 nginx 本身的路由强大,性能通常也没有什么问题。

那么本文的两个关键问题就来了:

  • ingress-nginx 如何知道规则变动了?
  • ingress-nginx 如何把 ingress 的规则转换为 nginx 的规则?

Ingress Controller 实现原理

还是老套路,由于这是一个独立的服务,所以我们直接从入口着手:

入口到对象

入口特别好找,和大多数项目一样就在 cmd 下面,main 的函数开始。其中我对于入口函数精简了绝大多数代码,留下了与我们最相关的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// cmd/nginx/main.go:53
func main() {
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)

_, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{})
if err != nil {
if !errors.IsNotFound(err) {
if errors.IsForbidden(err) {
klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err)
conf.IngressClassConfiguration.IgnoreIngressClass = true
}
}
}
conf.Client = kubeClient

err = k8s.GetIngressPod(kubeClient)
if err != nil {
klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err)
}

ngx := controller.NewNGINXController(conf, mc)

go ngx.Start()

process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func(code int) {
os.Exit(code)
})
}

其中我们可以看到:

  1. 通过 createApiserverClient 创建了一个 kubeClient ,然后尝试调用 API 拉取了一下 Ingress 的信息,如果拉取不到则说明没有权限。
  2. 然后获取了一下 IngressPod
  3. 最重要的就是通过 controller.NewNGINXController 创建我们的主角 controller 并通过 Start 启动起来。

啊哈,其实从这里的 ApiserverClient 你就已经大概能猜到如何拿到相关信息的了,当然,所有其实 k8s 相关的扩展能力组件都与这个 apiserver 有着关系,因为就是通过它来控制对象或者获取相关资源的信息。

最后 process.HandleSigterm 其实这部分也是可以抄的,我们放在最后说。

初始化

首先来看看 NewNGINXController 创建的时候究竟干了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// internal/ingress/controller/nginx.go:76
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
// ....

n := &NGINXController{
// ....
}

// ....

n.store = store.New(
config.Namespace,
config.WatchNamespaceSelector,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.DefaultSSLCertificate,
config.ResyncPeriod,
config.Client,
n.updateCh,
config.DisableCatchAll,
config.DeepInspector,
config.IngressClassConfiguration,
config.DisableSyncEvents)

n.syncQueue = task.NewTaskQueue(n.syncIngress) // 注意一下这里的这个,后面有用
// ...

return n
}

有关 nginx 的部分以及模板的部分都不是我们的重点,因为我们关心的还是 k8s 上,于是在仔细寻找后发现有一个 store 的对象让我找到了关键点。

你别说,这个 store.New 有足足 600+ 行的代码,都看是不可能都看的。

PS:注释特别加了 nolint:gocyclo,可见复杂的别人都懒的动了,不然对于这样长的函数还是拆分为多个可读性会更好

为什么这么长呢?原因其实是因为写了一大堆的闭包在里面,而其实最重要的放在了最后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// internal/ingress/controller/store/store.go:850
if _, err := store.informers.Ingress.AddEventHandler(ingEventHandler); err != nil {
klog.Errorf("Error adding ingress event handler: %v", err)
}
if !icConfig.IgnoreIngressClass {
if _, err := store.informers.IngressClass.AddEventHandler(ingressClassEventHandler); err != nil {
klog.Errorf("Error adding ingress class event handler: %v", err)
}
}
if _, err := store.informers.EndpointSlice.AddEventHandler(epsEventHandler); err != nil {
klog.Errorf("Error adding endpoint slice event handler: %v", err)
}
if _, err := store.informers.Secret.AddEventHandler(secrEventHandler); err != nil {
klog.Errorf("Error adding secret event handler: %v", err)
}
if _, err := store.informers.ConfigMap.AddEventHandler(cmEventHandler); err != nil {
klog.Errorf("Error adding configmap event handler: %v", err)
}
if _, err := store.informers.Service.AddEventHandler(serviceHandler); err != nil {
klog.Errorf("Error adding service event handler: %v", err)
}

看到这里其实就很清楚了,关键是就实现了 informer 的各种 event handler,然后对于各种事件做了不同的处理。其中就有 ingress 的事件。而在对应的 ingEventHandler 处理方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, _ := toIngress(obj)

//...

store.syncIngress(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)

updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},

关键就是将各种事件放到 updateCh 里面去。这里的这个 updateCh 又是我们可以学习的一个小点,它其实是一个 RingChannel 并且其实这库已经不更新了,但这里依旧沿用了,说明一个 10 年前设计的库还是很好用的,特别是它对于暴露功能的设计还是有迹可循的,有兴趣可以看看。而 updateCh 是在哪里消费的呢?其实这一点你通过看之前的 k8s 源码应该有所熟悉了,消费的地方正式在启动的时候设置好了。

启动

Start 并不复杂,精简一下如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// internal/ingress/controller/nginx.go:270
func (n *NGINXController) Start() {
klog.InfoS("Starting NGINX Ingress controller")

n.store.Run(n.stopCh)
cmd := n.command.ExecCommand()

n.start(cmd)

go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

for {
select {
case err := <-n.ngxErrCh:
if n.isShuttingDown {
return
}

// if the nginx master process dies, the workers continue to process requests
// until the failure of the configured livenessProbe and restart of the pod.
if process.IsRespawnIfRequired(err) {
return
}

case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}

if evt, ok := event.(store.Event); ok {
klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}

n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)
}
case <-n.stopCh:
return
}
}
}

我们刚才的关键 updateCh 就在这里,event := <-n.updateCh.Out(),从中获得对应的事件然后放到 syncQueue 里面,而 syncQueue 则是我们在 NewNGINXController 的时候初始化的

1
n.syncQueue = task.NewTaskQueue(n.syncIngress)

所以最终就落到了 syncIngress 方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// internal/ingress/controller/controller.go:175
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()

ings := n.store.ListIngresses()
hosts, servers, pcfg := n.getConfiguration(ings)

if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}

n.metricCollector.SetHosts(hosts)

if !utilingress.IsDynamicConfigurationEnough(pcfg, n.runningConfig) {
klog.InfoS("Configuration changes detected, backend reload required")

hash, err := hashstructure.Hash(pcfg, hashstructure.FormatV1, &hashstructure.HashOptions{
TagName: "json",
})
if err != nil {
klog.Errorf("unexpected error hashing configuration: %v", err)
}

pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

err = n.OnUpdate(*pcfg)

klog.InfoS("Backend successfully reloaded")
n.metricCollector.ConfigSuccess(hash, true)
n.metricCollector.IncReloadCount()

n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
}

// ...

return nil
}

然后让我们把目光聚到 err = n.OnUpdate(*pcfg)(一开始我压根没注意这个部分,也是仔细寻找才发现了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver

// ....

content, err := n.generateTemplate(cfg, ingressCfg)

err = n.createLuaConfig(&cfg)

err = createOpentelemetryCfg(&cfg)

err = n.testTemplate(content)

// ....

err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)

o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()

// Reload status checking runs in a separate goroutine to avoid blocking the sync queue
if workerSerialReloads {
go n.awaitWorkersReload()
}

return nil
}

看到这里其实就很清楚了,首先就是根据 ingress 信息和模板创建对应的配置文件。而生成配置文件所需要的模板是在 rootfs/etc/nginx/template/nginx.tmpl 的位置,而看了模板你也就大概明白了,这不仅是 nginx 的配置吗。

然后发现,这不就是我最常用的 nginx -s reload 命令吗?原来就这样简单。这下,从 k8s 的资源变动到 nginx 配置变化,整个链路就能串起来了。

码后解答

  1. ingress-nginx 本身究竟是个什么东西?回答:本质其实就是个 deployment 而已,而整个就是打包了一组 k8s 的各种资源在里面
  2. Ingress Controller 是如何监听到 Ingress 资源的变化的?回答:其实就是通过 informer 监听了对应资源变更的事件,并且注册了这些事件对应的处理方法,而 SharedInformer 其实是 client-go 里面的实现。
  3. Ingress Controller 是如何将规则转换为实际的负载均衡配置的?回答:关键就是通过这些规则的配置,通过模板来生成的。

总结提升

其实原理上对于 ingress 其实本身并不复杂,而更多的是,我觉得它可以作为我们学习如果制作一个 k8s 扩展组件的一个案例,如何使用 Apiserver,如何处理对应的事件,以及如何操作对应的资源等等,将它作为一个参考确实是不错的选择。

编码上

HandleSigterm

之前在 main 入口的最下面我们看到过下面的代码:

1
2
3
process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func(code int) {
os.Exit(code)
})

其中的 HandleSigterm 的实现其实非常简单,对于一个小型项目的优雅关闭是一个非常不错的案例,可以直接拿来用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// pkg/util/process/sigterm.go:32
func HandleSigterm(ngx Controller, delay int, exit exiter) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
<-signalChan
klog.InfoS("Received SIGTERM, shutting down")

exitCode := 0
if err := ngx.Stop(); err != nil {
klog.Warningf("Error during shutdown: %v", err)
exitCode = 1
}

klog.Infof("Handled quit, delaying controller exit for %d seconds", delay)
time.Sleep(time.Duration(delay) * time.Second)

klog.InfoS("Exiting", "code", exitCode)
exit(exitCode)
}

syncQueue

还有一个 syncQueue 其实也可以简单参考下,它是对于 client-go 里面的 workqueue 的封装,本来其实我觉得 workqueue 的实现已经非常好用了,特别是拿他来做泛型的使用案例特别好,所以比较值得一看,以后很大机会能用上。其中有一个取巧的封装是对于 EnqueueSkippableTask 方法,我一开始还在想什么是 skippable 可以跳过的任务,仔细一看发现其实就是任务的优先级,而当可以 skippable 也就是优先级低的时候,只不过将时间延迟了而已。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// internal/task/queue.go:74
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
return
}

ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano() // 就只是这样而已
}
klog.V(3).InfoS("queuing", "item", obj)
key, err := t.fn(obj)
if err != nil {
klog.ErrorS(err, "creating object key", "item", obj)
return
}
t.queue.Add(Element{
Key: key,
Timestamp: ts,
})
}

对应到实际使用上,其实就是对于外部的 event 的相应任务可以慢慢处理,而对于文件配置的变动需要立刻做出相应,优先级更高。