coredns-autopath

kubernetes 插件

kubernetes [ZONES...] {
    endpoint URL
    tls CERT KEY CACERT
    kubeconfig KUBECONFIG CONTEXT
    namespaces NAMESPACE...
    labels EXPRESSION
    pods POD-MODE
    endpoint_pod_names
    ttl TTL
    noendpoints
    transfer to ADDRESS...
    fallthrough [ZONES...]
    ignore empty_service
}

init 函数

一样的套路,调用 caddy.RegisterPlugin 函数注册 kubernetes 插件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func init() {
	// Kubernetes plugin uses the kubernetes library, which now uses klog, we must set and parse this flag
	// so we don't log to the filesystem, which can fill up and crash CoreDNS indirectly by calling os.Exit().
	// We also set: os.Stderr = os.Stdout in the setup function below so we output to standard out; as we do for
	// all CoreDNS logging. We can't do *that* in the init function, because we, when starting, also barf some
	// things to stderr.
	klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
	klog.InitFlags(klogFlags)
	logtostderr := klogFlags.Lookup("logtostderr")
	logtostderr.Value.Set("true")
	caddy.RegisterPlugin("kubernetes", caddy.Plugin{
		ServerType: "dns",
		Action:     setup,
  })
}

setup 函数

kubernetesParse 函数中的 ParseStanza 函数
func setup(c *caddy.Controller) error {
	// See comment in the init function.
	os.Stderr = os.Stdout
	k, err := kubernetesParse(c)
if err != nil {
	return plugin.Error("kubernetes", err)
}

kubernetes 结构体

interfaceAddrsFunc 设置为 localPodIP

autoPathSearch 设置为 searchFromResolvConf,从 /etc/resov.conf 配置中得到 search,(search default.svc.cluster.local. svc.cluster.local. cluster.local.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Kubernetes implements a plugin that connects to a Kubernetes cluster.
type Kubernetes struct {
	Next             plugin.Handler
	Zones            []string
	Upstream         *upstream.Upstream
	APIServerList    []string
	APICertAuth      string
	APIClientCert    string
	APIClientKey     string
	ClientConfig     clientcmd.ClientConfig
	APIConn          dnsController
	Namespaces       map[string]struct{}
	podMode          string
	endpointNameMode bool
	Fall             fall.F
	ttl              uint32
	opts             dnsControlOpts
	primaryZoneIndex   int
  interfaceAddrsFunc func() net.IP
  autoPathSearch     []string // Local search path from /etc/resolv.conf. Needed for autopath.
  TransferTo         []string
}

对于每一个配置块,读取配置

endpoint_pod_names:在 A 记录中使用 pod 名字,例如 endpoint-name.my-service.namespace.svc.cluster.local. in A 1.2.3.4

pods:POD-MODE (disabled / insecure / verified),例如 1-2-3-4.ns.pod.cluster.local. in A 1.2.3.4

namespaces: NAMESPACE [NAMESPACE…] ,暴露的 k8s namespaces 列表,如果省略则暴露所有 namespaces,这个可以用于 namespace 中的 DNS 隔离

kubeconfig: KUBECONFIG CONTEXT,连接 k8s 的证书配置文件

namespace_labels:EXPRESSION,用于匹配 namespace label,可以用于一组 namespace

fallthrough:[ZONES…],如果指定 in-addr.arpa ip6.arpa,只有这些 zone 的查询才会 fallthrough

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
kubernetes [ZONES...] {
    endpoint URL
    tls CERT KEY CACERT
    kubeconfig KUBECONFIG CONTEXT
    namespaces NAMESPACE...
    labels EXPRESSION
    pods POD-MODE
    endpoint_pod_names
    ttl TTL
    noendpoints
    transfer to ADDRESS...
    fallthrough [ZONES...]
    ignore empty_service
}

for c.NextBlock() {
	switch c.Val() {
	case "endpoint_pod_names":
		args := c.RemainingArgs()
		if len(args) > 0 {
			return nil, c.ArgErr()
		}
		k8s.endpointNameMode = true
		continue
	case "pods":
		args := c.RemainingArgs()
		if len(args) == 1 {
			switch args[0] {
			case podModeDisabled, podModeInsecure, podModeVerified:
				k8s.podMode = args[0]
			default:
				return nil, fmt.Errorf("wrong value for pods: %s,  must be one of: disabled, verified, insecure", args[0])
			}
			continue
		}
		return nil, c.ArgErr()
     注意,namespaces  namespace_labels 不能同时设置

if len(k8s.Namespaces) != 0 && k8s.opts.namespaceLabelSelector != nil {
	return nil, c.Errf("namespaces and namespace_labels cannot both be set")
}

InitKubeCache

建立 k8s 客户端连接,namespace_labels 初始化,调用 newdnsController 实例化 dnsControl

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// InitKubeCache initializes a new Kubernetes cache.
func (k *Kubernetes) InitKubeCache() (err error) {
	if k.opts.namespaceLabelSelector != nil {
	var selector labels.Selector
	selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector)
	if err != nil {
		return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
	}
	k.opts.namespaceSelector = selector
}
newdnsController 函数实例化 dnsControl

包括 client,label 选择,name label 选择等

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// newDNSController creates a controller for CoreDNS.
func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl {
	dns := dnsControl{
		client:            kubeClient,
		selector:          opts.selector,
		namespaceSelector: opts.namespaceSelector,
		stopCh:            make(chan struct{}),
		zones:             opts.zones,
		endpointNameMode:  opts.endpointNameMode,
	}

1.2.3.2 sevice pod 设置 informer 机制

注意其实个人感觉的建议,如果设置了 namspace_labels,已经进行了隔离,没有必要去 list watcher 所有的 service,只关注相关的 namespace 下即可,可以减少缓存的数量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dns.svcLister, dns.svcController = object.NewIndexerInformer(
	&cache.ListWatch{
		ListFunc:  serviceListFunc(dns.client, api.NamespaceAll, dns.selector),
		WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector),
	},
	&api.Service{},
	opts.resyncPeriod,
	cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
	cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
	object.ToService,
)

if opts.initPodCache {
	dns.podLister, dns.podController = object.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc:  podListFunc(dns.client, api.NamespaceAll, dns.selector),
			WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector),
		},
		&api.Pod{},
		opts.resyncPeriod,
		cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
		cache.Indexers{podIPIndex: podIPIndexFunc},
		object.ToPod,
	)
}
endpoint namespace 的 informer 机制

注意 namespace 只关注 namespace labe 的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
if opts.initEndpointsCache {
	dns.epLister, dns.epController = object.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc:  endpointsListFunc(dns.client, api.NamespaceAll, dns.selector),
			WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
		},
		&api.Endpoints{},
		opts.resyncPeriod,
		cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
		cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
		object.ToEndpoints)
}

dns.nsLister, dns.nsController = cache.NewInformer(
	&cache.ListWatch{
		ListFunc:  namespaceListFunc(dns.client, dns.namespaceSelector),
		WatchFunc: namespaceWatchFunc(dns.client, dns.namespaceSelector),
	},
	&api.Namespace{},
	opts.resyncPeriod,
	cache.ResourceEventHandlerFuncs{})

RegisterKubeCache 向 caddy 服务框架注册 OnStartup 和 OnShutdown 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// RegisterKubeCache registers KubeCache start and stop functions with Caddy
func (k *Kubernetes) RegisterKubeCache(c *caddy.Controller) {
	c.OnStartup(func() error {
		go k.APIConn.Run()
	timeout := time.After(5 * time.Second)
	ticker := time.NewTicker(100 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
			if k.APIConn.HasSynced() {
				return nil
			}
		case <-timeout:
			return nil
		}
	}
})

c.OnShutdown(func() error {
	return k.APIConn.Stop()
})
}
Run 函数运行 svc 这些 controller
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Run starts the controller.
func (dns *dnsControl) Run() {
	go dns.svcController.Run(dns.stopCh)
	if dns.epController != nil {
		go dns.epController.Run(dns.stopCh)
	}
	if dns.podController != nil {
		go dns.podController.Run(dns.stopCh)
	}
	go dns.nsController.Run(dns.stopCh)
	<-dns.stopCh
}
HasSynced 定期同步数据
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// HasSynced calls on all controllers.
func (dns *dnsControl) HasSynced() bool {
	a := dns.svcController.HasSynced()
	b := true
	if dns.epController != nil {
		b = dns.epController.HasSynced()
	}
	c := true
	if dns.podController != nil {
		c = dns.podController.HasSynced()
	}
	d := dns.nsController.HasSynced()
	return a && b && c && d
}

调用 AddPlugin 注册 kubernetes 插件

1
2
3
4
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
	k.Next = next
	return k
})

ServiceBackend 接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ServiceBackend defines a (dynamic) backend that returns a slice of service definitions.
type ServiceBackend interface {
// Services communicates with the backend to retrieve the service definitions. Exact indicates
// on exact match should be returned.
Services(ctx context.Context, state request.Request, exact bool, opt Options) ([]msg.Service, error)

// Reverse communicates with the backend to retrieve service definition based on a IP address
// instead of a name. I.e. a reverse DNS lookup.
Reverse(ctx context.Context, state request.Request, exact bool, opt Options) ([]msg.Service, error)

// Lookup is used to find records else where.
Lookup(ctx context.Context, state request.Request, name string, typ uint16) (*dns.Msg, error)

// Returns _all_ services that matches a certain name.
// Note: it does not implement a specific service.
Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error)

// IsNameError return true if err indicated a record not found condition
IsNameError(err error) bool

Transferer
}

kubernetes 的 ServeDNS 方法

// ServeDNS implements the plugin.Handler interface.
func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
state := request.Request{W: w, Req: r}

qname := state.QName()
zone := plugin.Zones(k.Zones).Matches(qname)
if zone == "" {
	return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
}
zone = qname[len(qname)-len(zone):] // maintain case of original query
state.Zone = zone

A 记录

A 记录 A(Address)记录是用来指定主机名(或域名)对应的 IP 地址记录,checkForApex 函数中调用 Services 方法找到对应的 svc

// A returns A records from Backend or an error.
func A(ctx context.Context, b ServiceBackend, zone string, state request.Request, previousRecords []dns.RR, opt Options) (records []dns.RR, err error) {
	services, err := checkForApex(ctx, b, zone, state, opt)
	if err != nil {
		return nil, err
	}

对所有符合的 service 遍历

for _, serv := range services {
what, ip := serv.HostType()

switch what {
case dns.TypeCNAME:
	if Name(state.Name()).Matches(dns.Fqdn(serv.Host)) {
		// x CNAME x is a direct loop, don't add those
		continue
	}

newRecord := serv.NewCNAME(state.QName(), serv.Host)
	if len(previousRecords) > 7 {
		// don't add it, and just continue
		continue
	}
	if dnsutil.DuplicateCNAME(newRecord, previousRecords) {
		continue
	}
	if dns.IsSubDomain(zone, dns.Fqdn(serv.Host)) {
		state1 := state.NewWithQuestion(serv.Host, state.QType())
		state1.Zone = zone
		nextRecords, err := A(ctx, b, zone, state1, append(previousRecords, newRecord), opt)

if err == nil {
			// Not only have we found something we should add the CNAME and the IP addresses.
			if len(nextRecords) > 0 {
				records = append(records, newRecord)
				records = append(records, nextRecords...)
			}
		}
		continue
	}
	// This means we can not complete the CNAME, try to look else where.
	target := newRecord.Target
	// Lookup
	m1, e1 := b.Lookup(ctx, state, target, state.QType())
	if e1 != nil {
		continue
	}
	// Len(m1.Answer) > 0 here is well?
	records = append(records, newRecord)
	records = append(records, m1.Answer...)
	continue

case dns.TypeA:
	if _, ok := dup[serv.Host]; !ok {
		dup[serv.Host] = struct{}{}
		records = append(records, serv.NewA(state.QName(), ip))
	}

case dns.TypeAAAA:
		// nada
	}
}

总结:

pod 内域名解析,如果 dnsPolicy: ClusterFirst,则会以此调用 XXX.default.svc.cluster.local.  XXX.svc.cluster.local.  XXX.cluster.local.

namespaces 和 namespace_labes 不能同时设置,个人意见,对于 namespaces 可以只 list watch 设置的 namsespace,既然只关注相应的 namespace,可以 list watch 该namespace 下的 service 即可,不需要 list watch 所有

A 记录: A(Address)记录是用来指定主机名(或域名)对应的 IP 地址记录

NS 记录: NS(Name Server)记录是域名服务器记录,用来指定该域名由哪个 DNS 服务器来进行解析

PTR:pointer 的简写,用于将一个 IP 地址映射到对应的域名,也可以看成是 A 记录的反向,IP 地址的反向解析