📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!
前言 前面说完了 Service 和 kube-proxy,那就自然轮到了我们的 Ingress 了。Ingress 作为 Kubernetes 中负责外部流量路由的重要组件,其实现原理和设计思路值得我们深入研究。当然,这里也要说明的是,对于 Ingress 其实在这个版本里面已经有一点点 “落伍” ,官方更为推荐的是使用 Kubernetes Gateway API
来做同样的事情,并且能力更为强大。支持更丰富的协议,更精细化的流量控制,以及对于权限的控制等等。不过在这里不涉及这个部分,感兴趣同学可以自行去查看相关源码。
前置知识
Ingress
Ingress Controller
心路历程 之前我也写过对于 Ingress 的基础概念的说明,我称为 service 的 service。其实这个对象的出现是一种顺理成章的事情,对于一个多业务场景下的方案来说,不同域名访问不同服务,一定会需要一个网关这样的角色。而对于大的场景还是小的场景都需要不同的网关去做处理,但实际的能力大差不差。我们最熟悉的就是 https://github.com/kubernetes/ingress-nginx 。所以本文也将以它为源码分析的对象。
码前提问 同样的,在开始源码分析之前,让我们先思考几个问题:
ingress-nginx 本身究竟是个什么东西?
Ingress Controller 是如何监听到 Ingress 资源的变化的?
Ingress Controller 是如何将规则转换为实际的负载均衡配置的?
源码分析 Ingress 数据结构 先看一下我们熟悉的 yaml 配置
1 2 3 4 5 6 7 8 9 10 11 rules: - host: api.linkinstars.com http: paths: - path: /user pathType: Prefix backend: service: name: user port: number: 8080
数据结构的部分,我相信你很容易可以找到对应的部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type Ingress struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` Spec IngressSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` Status IngressStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` } type IngressSpec struct { IngressClassName *string `json:"ingressClassName,omitempty" protobuf:"bytes,4,opt,name=ingressClassName"` Backend *IngressBackend `json:"backend,omitempty" protobuf:"bytes,1,opt,name=backend"` TLS []IngressTLS `json:"tls,omitempty" protobuf:"bytes,2,rep,name=tls"` Rules []IngressRule `json:"rules,omitempty" protobuf:"bytes,3,rep,name=rules"` }
所以 Ingress 对象本身很简单,就是记录路由的规则,而这个规则则是 Ingress Controller
最为关注的东西。下面我们就来看看我们今天的重点,也就是 ingress-nginx
的具体实现。注意下面的源码都来自于它,而不是 k8s 主项目中。
ingress-nginx 如果你对于 ingress-nginx
不是特别熟悉,强烈建议使用一次,一次就懂。然后可以先看:ingress-nginx/deploy/static/provider/cloud/deploy.yaml
部署。看了这个文件其实对于它本身你就觉得不是什么很可怕的东西了。其中抛开角色、权限、配置相关对象之外就只有:
一个名叫 ingress-nginx-controller
的 Service
一个名叫 ingress-nginx-controller
的 Deployment
一个名叫 nginx
的 IngressClass
其他可以忽略,重点其实就是他们几个。其实看到 Deployment
的时候,我就已经一下子明白了。其实本质它还是一个 Deployment
服务而已 ,而 Service
是一个 LoadBalancer
。相信到此你应该已经了解基本的结构了,在没有看源码的时候你就可以大胆去猜测了,可能它就是通过一个服务去获取路由规则,而通过 LB 把流量接进来,通过 nginx 的实例将流量按规则去路由而已。而得益于 nginx 本身的路由强大,性能通常也没有什么问题。
那么本文的两个关键问题就来了:
ingress-nginx 如何知道规则变动了?
ingress-nginx 如何把 ingress 的规则转换为 nginx 的规则?
Ingress Controller 实现原理 还是老套路,由于这是一个独立的服务,所以我们直接从入口着手:
入口到对象 入口特别好找,和大多数项目一样就在 cmd 下面,main 的函数开始。其中我对于入口函数精简了绝大多数代码,留下了与我们最相关的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func main () { kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile) _, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{}) if err != nil { if !errors.IsNotFound(err) { if errors.IsForbidden(err) { klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled" , err) conf.IngressClassConfiguration.IgnoreIngressClass = true } } } conf.Client = kubeClient err = k8s.GetIngressPod(kubeClient) if err != nil { klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v" , err) } ngx := controller.NewNGINXController(conf, mc) go ngx.Start() process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func (code int ) { os.Exit(code) }) }
其中我们可以看到:
通过 createApiserverClient
创建了一个 kubeClient
,然后尝试调用 API 拉取了一下 Ingress
的信息,如果拉取不到则说明没有权限。
然后获取了一下 IngressPod
。
最重要的就是通过 controller.NewNGINXController
创建我们的主角 controller
并通过 Start
启动起来。
啊哈,其实从这里的 ApiserverClient 你就已经大概能猜到如何拿到相关信息的了,当然,所有其实 k8s 相关的扩展能力组件都与这个 apiserver 有着关系,因为就是通过它来控制对象或者获取相关资源的信息。
最后 process.HandleSigterm
其实这部分也是可以抄的,我们放在最后说。
初始化 首先来看看 NewNGINXController
创建的时候究竟干了什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func NewNGINXController (config *Configuration, mc metric.Collector) *NGINXController { n := &NGINXController{ } n.store = store.New( config.Namespace, config.WatchNamespaceSelector, config.ConfigMapName, config.TCPConfigMapName, config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, n.updateCh, config.DisableCatchAll, config.DeepInspector, config.IngressClassConfiguration, config.DisableSyncEvents) n.syncQueue = task.NewTaskQueue(n.syncIngress) return n }
有关 nginx 的部分以及模板的部分都不是我们的重点,因为我们关心的还是 k8s 上,于是在仔细寻找后发现有一个 store 的对象让我找到了关键点。
你别说,这个 store.New
有足足 600+ 行的代码,都看是不可能都看的。
PS:注释特别加了 nolint:gocyclo,可见复杂的别人都懒的动了,不然对于这样长的函数还是拆分为多个可读性会更好
为什么这么长呢?原因其实是因为写了一大堆的闭包在里面,而其实最重要的放在了最后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if _, err := store.informers.Ingress.AddEventHandler(ingEventHandler); err != nil { klog.Errorf("Error adding ingress event handler: %v" , err) } if !icConfig.IgnoreIngressClass { if _, err := store.informers.IngressClass.AddEventHandler(ingressClassEventHandler); err != nil { klog.Errorf("Error adding ingress class event handler: %v" , err) } } if _, err := store.informers.EndpointSlice.AddEventHandler(epsEventHandler); err != nil { klog.Errorf("Error adding endpoint slice event handler: %v" , err) } if _, err := store.informers.Secret.AddEventHandler(secrEventHandler); err != nil { klog.Errorf("Error adding secret event handler: %v" , err) } if _, err := store.informers.ConfigMap.AddEventHandler(cmEventHandler); err != nil { klog.Errorf("Error adding configmap event handler: %v" , err) } if _, err := store.informers.Service.AddEventHandler(serviceHandler); err != nil { klog.Errorf("Error adding service event handler: %v" , err) }
看到这里其实就很清楚了,关键是就实现了 informer 的各种 event handler,然后对于各种事件做了不同的处理。其中就有 ingress 的事件。而在对应的 ingEventHandler
处理方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func (obj interface {}) { ing, _ := toIngress(obj) store.syncIngress(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } },
关键就是将各种事件放到 updateCh
里面去。这里的这个 updateCh
又是我们可以学习的一个小点,它其实是一个 RingChannel
并且其实这库已经不更新了,但这里依旧沿用了,说明一个 10 年前设计的库还是很好用的,特别是它对于暴露功能的设计还是有迹可循的,有兴趣可以看看。而 updateCh
是在哪里消费的呢?其实这一点你通过看之前的 k8s 源码应该有所熟悉了,消费的地方正式在启动的时候设置好了。
启动 Start
并不复杂,精简一下如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 func (n *NGINXController) Start() { klog.InfoS("Starting NGINX Ingress controller" ) n.store.Run(n.stopCh) cmd := n.command.ExecCommand() n.start(cmd) go n.syncQueue.Run(time.Second, n.stopCh) n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync" )) for { select { case err := <-n.ngxErrCh: if n.isShuttingDown { return } if process.IsRespawnIfRequired(err) { return } case event := <-n.updateCh.Out(): if n.isShuttingDown { break } if evt, ok := event.(store.Event); ok { klog.V(3 ).InfoS("Event received" , "type" , evt.Type, "object" , evt.Obj) if evt.Type == store.ConfigurationEvent { n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change" )) continue } n.syncQueue.EnqueueSkippableTask(evt.Obj) } else { klog.Warningf("Unexpected event type received %T" , event) } case <-n.stopCh: return } } }
我们刚才的关键 updateCh
就在这里,event := <-n.updateCh.Out()
,从中获得对应的事件然后放到 syncQueue
里面,而 syncQueue
则是我们在 NewNGINXController
的时候初始化的
1 n.syncQueue = task.NewTaskQueue(n.syncIngress)
所以最终就落到了 syncIngress
方法上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (n *NGINXController) syncIngress(interface {}) error { n.syncRateLimiter.Accept() ings := n.store.ListIngresses() hosts, servers, pcfg := n.getConfiguration(ings) if n.runningConfig.Equal(pcfg) { klog.V(3 ).Infof("No configuration change detected, skipping backend reload" ) return nil } n.metricCollector.SetHosts(hosts) if !utilingress.IsDynamicConfigurationEnough(pcfg, n.runningConfig) { klog.InfoS("Configuration changes detected, backend reload required" ) hash, err := hashstructure.Hash(pcfg, hashstructure.FormatV1, &hashstructure.HashOptions{ TagName: "json" , }) if err != nil { klog.Errorf("unexpected error hashing configuration: %v" , err) } pcfg.ConfigurationChecksum = fmt.Sprintf("%v" , hash) err = n.OnUpdate(*pcfg) klog.InfoS("Backend successfully reloaded" ) n.metricCollector.ConfigSuccess(hash, true ) n.metricCollector.IncReloadCount() n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD" , "NGINX reload triggered due to a change in configuration" ) } return nil }
然后让我们把目光聚到 err = n.OnUpdate(*pcfg)
(一开始我压根没注意这个部分,也是仔细寻找才发现了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { cfg := n.store.GetBackendConfiguration() cfg.Resolver = n.resolver content, err := n.generateTemplate(cfg, ingressCfg) err = n.createLuaConfig(&cfg) err = createOpentelemetryCfg(&cfg) err = n.testTemplate(content) err = os.WriteFile(cfgPath, content, file.ReadWriteByUser) o, err := n.command.ExecCommand("-s" , "reload" ).CombinedOutput() if workerSerialReloads { go n.awaitWorkersReload() } return nil }
看到这里其实就很清楚了,首先就是根据 ingress 信息和模板创建对应的配置文件。而生成配置文件所需要的模板是在 rootfs/etc/nginx/template/nginx.tmpl
的位置,而看了模板你也就大概明白了,这不仅是 nginx 的配置吗。
然后发现,这不就是我最常用的 nginx -s reload
命令吗?原来就这样简单。这下,从 k8s 的资源变动到 nginx 配置变化,整个链路就能串起来了。
码后解答
ingress-nginx 本身究竟是个什么东西?回答:本质其实就是个 deployment 而已,而整个就是打包了一组 k8s 的各种资源在里面
Ingress Controller 是如何监听到 Ingress 资源的变化的?回答:其实就是通过 informer 监听了对应资源变更的事件,并且注册了这些事件对应的处理方法,而 SharedInformer
其实是 client-go 里面的实现。
Ingress Controller 是如何将规则转换为实际的负载均衡配置的?回答:关键就是通过这些规则的配置,通过模板来生成的。
总结提升 其实原理上对于 ingress 其实本身并不复杂,而更多的是,我觉得它可以作为我们学习如果制作一个 k8s 扩展组件的一个案例,如何使用 Apiserver,如何处理对应的事件,以及如何操作对应的资源等等,将它作为一个参考确实是不错的选择。
编码上 HandleSigterm 之前在 main
入口的最下面我们看到过下面的代码:
1 2 3 process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func (code int ) { os.Exit(code) })
其中的 HandleSigterm
的实现其实非常简单,对于一个小型项目的优雅关闭是一个非常不错的案例,可以直接拿来用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func HandleSigterm (ngx Controller, delay int , exit exiter) { signalChan := make (chan os.Signal, 1 ) signal.Notify(signalChan, syscall.SIGTERM) <-signalChan klog.InfoS("Received SIGTERM, shutting down" ) exitCode := 0 if err := ngx.Stop(); err != nil { klog.Warningf("Error during shutdown: %v" , err) exitCode = 1 } klog.Infof("Handled quit, delaying controller exit for %d seconds" , delay) time.Sleep(time.Duration(delay) * time.Second) klog.InfoS("Exiting" , "code" , exitCode) exit(exitCode) }
syncQueue 还有一个 syncQueue
其实也可以简单参考下,它是对于 client-go
里面的 workqueue
的封装,本来其实我觉得 workqueue
的实现已经非常好用了,特别是拿他来做泛型的使用案例特别好,所以比较值得一看,以后很大机会能用上。其中有一个取巧的封装是对于 EnqueueSkippableTask
方法,我一开始还在想什么是 skippable
可以跳过的任务,仔细一看发现其实就是任务的优先级,而当可以 skippable
也就是优先级低的时候,只不过将时间延迟了而已。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (t *Queue) enqueue(obj interface {}, skippable bool ) { if t.IsShuttingDown() { klog.ErrorS(nil , "queue has been shutdown, failed to enqueue" , "key" , obj) return } ts := time.Now().UnixNano() if !skippable { ts = time.Now().Add(24 * time.Hour).UnixNano() } klog.V(3 ).InfoS("queuing" , "item" , obj) key, err := t.fn(obj) if err != nil { klog.ErrorS(err, "creating object key" , "item" , obj) return } t.queue.Add(Element{ Key: key, Timestamp: ts, }) }
对应到实际使用上,其实就是对于外部的 event 的相应任务可以慢慢处理,而对于文件配置的变动需要立刻做出相应,优先级更高。