📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!
前言 在 k8s 中 pod 之间的访问在所难免,多个系统之间往往有着调用和互动。那么他们之间的访问就离不开 DNS 的帮助,两个相同 namespace 下的 pod 可以直接通过名称访问,而不同 namespace 下也只需要加上 namespace 就可以了。而实现的关键就是我们的 “交通指挥员” Core-DNS。
前置知识
心路历程 第一次知道它是哪个时候还是 kube-dns,后来才是 Core-DNS。因为在整个系统安装好之后,就会有一个 pod 名字带 core-dns 的 pod。我当时就好奇这个 pod 是干什么的,于是就去研究,后来才知道它是负责 DNS 的。今天我们就来一起读读 Core-DNS 的源码,看看它是如何工作的。
码前提问 在看实际的源码之前,首先我们要明白 Core-DNS 的基本架构和工作原理。Core-DNS 本质是一个灵活的 DNS 服务器,我们知道 DNS 服务器的主要功能是解析域名,将域名转换为 IP 地址。而 Core-DNS 解析的域名不是普通的域名,而是 Kubernetes 集群中的服务和 Pod 的域名。而 IP 地址则是 Pod 的 IP 地址。从而让 pod 之间访问的时候不要用记不住的 IP 地址,而且 IP 地址还会变动。那么其实我们最关心的就是 Core-DNS 是如何知道 Pod 的 IP 地址的,以及它是如何处理 DNS 查询的。
Core-DNS
是如何知道 pod 的 IP 地址的?
Core-DNS
是如何处理 DNS 查询的?
Core-DNS
有什么优化措施来提高性能?
源码分析 首先 Core-DNS 在 https://github.com/coredns/coredns
由于 Core-DNS 其实是利用了老版本的 caddy 作为一个启动器,而所有必要的功能都是以插件的形式存在的,所以看源码的时候可能会跳出这个仓库本身。并且跳出去之后可能通过查看引用的方式是没有办法直接跳回来的,所以请注意跳转的时候记住来源。
入口 入口其实特别简单,就在最外面的文件中,引入插件,然后调用 coremain.Run()
启动 Core-DNS。
1 2 3 4 5 6 7 8 9 import ( _ "github.com/coredns/coredns/core/plugin" "github.com/coredns/coredns/coremain" ) func main () { coremain.Run() }
然后你点进去 coremain.Run()
就会发现懵了,因为其本质是在启动一个 caddy 实例。而并没有任何调用 Core-DNS 的代码。怎么回事呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func Run () { caddy.TrapSignals() flag.Parse() if len (flag.Args()) > 0 { mustLogFatal(fmt.Errorf("extra command line arguments: %s" , flag.Args())) } log.SetOutput(os.Stdout) log.SetFlags(LogFlags) if version { showVersion() os.Exit(0 ) } if plugins { fmt.Println(caddy.DescribePlugins()) os.Exit(0 ) } _, err := maxprocs.Set(maxprocs.Logger(log.Printf)) if err != nil { log.Println("[WARNING] Failed to set GOMAXPROCS:" , err) } corefile, err := caddy.LoadCaddyfile(serverType) if err != nil { mustLogFatal(err) } instance, err := caddy.Start(corefile) if err != nil { mustLogFatal(err) } if !dnsserver.Quiet { showVersion() } instance.Wait() }
此时,不要慌,我们可以尝试去 caddy.Start
里面看看,就会看到其实调用关系是 caddy.Start
-> startWithListenerFds
-> inst.context.MakeServers
。而其中的 context 是一个接口
1 2 3 4 5 6 7 8 9 type Context interface { InspectServerBlocks(string , []caddyfile.ServerBlock) ([]caddyfile.ServerBlock, error ) MakeServers() ([]Server, error ) }
而在我们的 /core/dnsserver/register.go
有一个 dnsContext
实现了这个接口。
1 2 3 4 5 6 7 type dnsContext struct { keysToConfigs map [string ]*Config configs []*Config }
而且其中的 MakeServers
方法就是我们需要的。它会调用 makeServersForGroup
方法来创建服务器实例。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func makeServersForGroup (addr string , group []*Config) ([]caddy.Server, error ) { var servers []caddy.Server for range numSockets { switch tr, _ := parse.Transport(addr); tr { case transport.DNS: s, err := NewServer(addr, group) case transport.TLS: s, err := NewServerTLS(addr, group) case transport.QUIC: s, err := NewServerQUIC(addr, group) case transport.GRPC: s, err := NewServergRPC(addr, group) case transport.HTTPS: s, err := NewServerHTTPS(addr, group) } } return servers, nil }
看到这里的 NewServer
方法了吗?它就是我们 Core-DNS 的核心服务器。我们可以继续深入看看它的实现。而 NewServer
创建的 Server 是实现了 caddy.TCPServer
和 caddy.UDPServer
接口的,即是有 Serve
和 ServePacket
方法的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (s *Server) Serve(l net.Listener) error { s.m.Lock() s.server[tcp] = &dns.Server{Listener: l, Net: "tcp" , TsigSecret: s.tsigSecret, MaxTCPQueries: tcpMaxQueries, ReadTimeout: s.readTimeout, WriteTimeout: s.writeTimeout, IdleTimeout: func () time.Duration { return s.idleTimeout }, Handler: dns.HandlerFunc(func (w dns.ResponseWriter, r *dns.Msg) { ctx := context.WithValue(context.Background(), Key{}, s) ctx = context.WithValue(ctx, LoopKey{}, 0 ) s.ServeDNS(ctx, w, r) })} s.m.Unlock() return s.server[tcp].ActivateAndServe() }
到这里,其实对于整个启动过程我们有了一个大致的认识,这里非常绕的原因是因为它的启动依赖于 Caddy ,所以真正的运行是在那边里面,而这里仅仅只是实现了必要的接口而已。所以有时候看源码单独只是看本仓库,往往会完全不理解在干什么 ,这件事告诉我们,有的时候如果看不明或找不到一些入口的时候,可能是因为它依赖了其他仓库的代码。
插件系统核心 其实插件本身并不复杂,在这里插件的实现就仅仅是实现接口,然后注册,最后被调用而已。接口是下面这样:
1 2 3 4 5 Handler interface { ServeDNS(context.Context, dns.ResponseWriter, *dns.Msg) (int , error ) Name() string }
而注册就更简单了
1 func init () { plugin.Register("whoami" , setup) }
就是在每个插件的 init
函数中调用 plugin.Register
方法来注册插件。这样在 Core-DNS 启动时就会自动加载这些插件。当然,不要忘记本分,我们今天最重要的目的是看它在 k8s 中是如何配合工作的,所以我们需要关注的是 Kubernetes 插件的实现。
Kubernetes 插件分析 相比与其他插件,kubernetes 插件代码就要多的多了。Kubernetes 插件是 Core-DNS 的核心,负责与 K8s API 交互。首先让我们来看到注册的部分:
1 2 3 4 const pluginName = "kubernetes" func init () { plugin.Register(pluginName, setup) }
和其他插件一样,没什么好说的,继续,我们来看看 setup
函数,其中调用了 InitKubeCache
这个方法里面调用了 newdnsController
这是我们的关键:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 func newdnsController (ctx context.Context, kubeClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts dnsControlOpts) *dnsControl { dns := dnsControl{ client: kubeClient, mcsClient: mcsClient, selector: opts.selector, namespaceSelector: opts.namespaceSelector, stopCh: make (chan struct {}), zones: opts.zones, endpointNameMode: opts.endpointNameMode, multiclusterZones: opts.multiclusterZones, } dns.svcLister, dns.svcController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc, svcExtIPIndex: svcExtIPIndexFunc}, object.DefaultProcessor(object.ToService, nil ), ) podLister, podController := object.NewIndexerInformer( &cache.ListWatch{ ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, object.DefaultProcessor(object.ToPod, nil ), ) dns.podLister = podLister if opts.initPodCache { dns.podController = podController } epLister, epController := object.NewIndexerInformer( &cache.ListWatch{ ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &discovery.EndpointSlice{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), ) dns.epLister = epLister if opts.initEndpointsCache { dns.epController = epController } dns.nsLister, dns.nsController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector), WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector), }, &api.Namespace{}, cache.ResourceEventHandlerFuncs{}, cache.Indexers{}, object.DefaultProcessor(object.ToNamespace, nil ), ) return &dns }
看到熟悉的 Informer
了吗?它是 Kubernetes 的核心组件之一,用于监听和缓存 Kubernetes API 对象的变化。这里我们创建了多个 Informer
,分别用于监听 Service、Pod、Endpoints 和 Namespace 的变化。我们知道,不管是在同一个 namespace 还是不同 namespace 下,Pod 都可以通过 DNS 名称访问其他 Pod。所以这些不同的事件变动都要监听。而这几个资源的变化都会触发 注册的 eventHandler 也就是 Add
、Update
和 Delete
方法。
你一定以为 Add
、Update
和 Delete
这些方法会具体处理数据?但其实你实际去看看,他们其实本质都是去更新了一下时间戳而已。而真正的数据 cache 在 cache.Indexers 里面,在 dnsControl
中有几个 cache.Indexer
专门用了放他们。而本地缓存的意义就在于避免去频繁的访问 Kubernetes API Server,这是显而易见的。
其中有一个 podLister cache.Indexer
我们后面还会看到。
ServeDNS 最后我们来看看请求来的时候是如何处理的,而这里的关键则就在与 ServeDNS
方法了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int , error ) { switch state.QType() { case dns.TypeA: records, truncated, err = plugin.A(ctx, &k, zone, state, nil , plugin.Options{}) case dns.TypeAAAA: records, truncated, err = plugin.AAAA(ctx, &k, zone, state, nil , plugin.Options{}) case dns.TypeTXT: records, truncated, err = plugin.TXT(ctx, &k, zone, state, nil , plugin.Options{}) case dns.TypeCNAME: records, err = plugin.CNAME(ctx, &k, zone, state, plugin.Options{}) case dns.TypePTR: records, err = plugin.PTR(ctx, &k, zone, state, plugin.Options{}) case dns.TypeMX: records, extra, err = plugin.MX(ctx, &k, zone, state, plugin.Options{}) case dns.TypeSRV: records, extra, err = plugin.SRV(ctx, &k, zone, state, plugin.Options{}) case dns.TypeSOA: if qname == zone { records, err = plugin.SOA(ctx, &k, zone, state, plugin.Options{}) } case dns.TypeAXFR, dns.TypeIXFR: return dns.RcodeRefused, nil return dns.RcodeSuccess, nil }
可以看到,这里会根据不同的 DNS 查询类型来调用不同的处理方法。比如对于 A 记录查询,会调用 plugin.A
方法,而对于 AAAA 记录查询,则会调用 plugin.AAAA
方法。这些方法会根据当前的状态和查询条件来返回相应的 DNS 记录。
而最终不同的方法都会转回到 Services
方法中,通过 k.Records(ctx, state, false)
最后我们可以确认我们找到了,findPods
方法
1 2 3 4 5 6 7 func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact bool , opt plugin.Options) (svcs []msg.Service, err error ) { s, e := k.Records(ctx, state, false ) return internal, e }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool ) ([]msg.Service, error ) { if r.podOrSvc == Pod { pods, err := k.findPods(r, state.Zone) return pods, err } var services []msg.Service var err error if !multicluster { services, err = k.findServices(r, state.Zone) } else { services, err = k.findMultiClusterServices(r, state.Zone) } return services, err }
而 findPods
当我看到这个名字的时候我就知道距离胜利不远了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (k *Kubernetes) findPods(r recordRequest, zone string ) (pods []msg.Service, err error ) { zonePath := msg.Path(zone, coredns) var ip string if strings.Count(podname, "-" ) == 3 && !strings.Contains(podname, "--" ) { ip = strings.ReplaceAll(podname, "-" , "." ) } else { ip = strings.ReplaceAll(podname, "-" , ":" ) } for _, p := range k.APIConn.PodIndex(ip) { if ip == p.PodIP && match(namespace, p.Namespace) { s := msg.Service{Key: strings.Join([]string {zonePath, Pod, namespace, podname}, "/" ), Host: ip, TTL: k.ttl} pods = append (pods, s) err = nil } } return pods, err }
显然这里就是对于请求的 Pod 的 IP 地址进行查询。我们只需要去看 PodIndex
方法就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (dns *dnsControl) PodIndex(ip string ) (pods []*object.Pod) { os, err := dns.podLister.ByIndex(podIPIndex, ip) if err != nil { return nil } for _, o := range os { p, ok := o.(*object.Pod) if !ok { continue } pods = append (pods, p) } return pods }
而这个方法中的 podLister
就是我们之前看到的 podLister cache.Indexer
。它就是 Indexer 缓存。里面存放的就是 Pod 的信息。而 ByIndex
方法则是根据索引来查询 Pod 的信息。
至此整个链路就全部串起来了。
码后解答
Core-DNS
是如何知道 pod 的 IP 地址的?
还是老一套 Informer
机制而已,Core-DNS
通过 Kubernetes 的 API Server 获取 Pod 的信息,并将其缓存到本地的 cache.Indexer
中。每当 Pod 的信息发生变化时,相关的 Informer
会触发事件,更新缓存中的 Pod 信息。
Core-DNS
是如何处理 DNS 查询的?
其实本质就是启动一个 DNS 服务,只不过它对于 DNS 查询会处理 k8s 中的服务和 Pod 的域名解析而已。
Core-DNS
有什么优化措施来提高性能?
cache 机制,Core-DNS
使用了本地缓存来存储 Pod 和 Service 的信息,避免频繁访问 Kubernetes API Server。通过 Informer
机制监听资源变化,并更新本地缓存,从而提高查询性能。
总结提升 插件机制 其实 Core DNS 本质里面有两种模式在里面,一个是对于 caddy 的套壳,它完全是利用了 caddy 作为了一个启动器,虽然从代码层面来说减少了很多项目启动运行部分的代码,但是实际中我们也看到了,它已经删除了 caddy 许多功能,完全与主干已经脱节了。所以其实它完全可以把那部分直接合并过来作为一个项目里面。我们在看代码的时候会发现跳来跳去,非常的麻烦。对于新人确实不好理解。而另一个模式是 plugin 机制,利用 golang 中的 init 方法实现注册,只要 import 了就会自动注册。很多插件系统的设计也都是如此。
最后提一次它吧,一路看到这里,我相信你已经明白为什么很多人吹 Informer
了。它的设计确实非常的巧妙,利用了缓存和事件驱动的方式来处理 Kubernetes 中的资源变化,而且无论是内部组件还是外部组件都可以用它。通过 Informer
,我们可以轻松地监听和处理资源的增删改查,而不需要频繁地访问 API Server。这种设计大大提高了性能和效率。而最关键的是利用了它,所有其他想要扩展 Kubernetes 的功能都可以利用它一方面是简化了开发工作,一方面接入行为也统一。