Client

client-go 里面有许许多多的库,先从 RESTClient 说起,client-go一共支持 4 种客户端逻辑:

  • DiscoveryClient:发现版本,请求访问 k8s 集群的 API 信息,如kubectl api-versions
  • ClientSet:客户端集合,仅限使用 k8s 内置资源,如 PodsService 等。
  • DynamicClient:动态客户端,用于无类型资源,如 CRD
  • RESTClient: 实现rest.Interface接口,自由度高,有需要时可以进行封装。

其中前三者都是基于 RESTClient 实现,可以在下图或源码中看到 restClient 踪影。

Client关系图
Client关系图

DiscoveryClient

实现方法:

  • ServerGroups: 返回metav1.ApiGroup列表。
  • ServerResourcesForGroupVersion: 根据 GV 参数获取metav1.APIResource列表。
  • ServerResources: 获取metav1.APIResource列表。
  • ServerGroupsAndResources: 返回metav1.APIGroup数组和metav1.APIResource列表。
  • ServerPreferredResources: 返回首选的资源列表。
  • ServerPreferredNamespacedResources: 返回首选的命名空间层级资源列表。
  • ServerVersion: 返回服务器版本。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // DiscoveryClient示例
    discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
    if err != nil {
        panic(err)
    }

    version, err := discoveryClient.ServerVersion()
    if err != nil {
        panic(err)
    }

    pretty.Println(version)
}

ClientSet

使用ClientSet比较简单,只需要配置好RESTClient就可以操作系统资源。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func main() {
    var kubeconfig *string
    if home := homeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    // use the current context in kubeconfig
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err.Error())
    }

    // ClientSet示例
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }
    for {
        // 调用链:客户端->GV->Resources->OP
        pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
        if err != nil {
            panic(err.Error())
        }
        fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))

        namespace := "default"
        pod := "example-xxxxx"
        _, err = clientset.CoreV1().Pods(namespace).Get(context.TODO(), pod, metav1.GetOptions{})
        if errors.IsNotFound(err) {
            fmt.Printf("Pod %s in namespace %s not found\n", pod, namespace)
        } else if statusError, isStatus := err.(*errors.StatusError); isStatus {
            fmt.Printf("Error getting pod %s in namespace %s: %v\n",
                pod, namespace, statusError.ErrStatus.Message)
        } else if err != nil {
            panic(err.Error())
        } else {
            fmt.Printf("Found pod %s in namespace %s\n", pod, namespace)
        }

        time.Sleep(10 * time.Second)
    }
}

DynamicClient

对自定义资源类型(CRD),采用DynamicClient进行 CRUD 操作,创建或返回数据的是Unstructured类型,本质是map[string]interface。目前有很多关于 CRD 库已经从自定义资源生成代码,关联相关类型,所以基本上没有这种接近原生数据的开发。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // DynamicClient示例
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // 记得先创建CRD
    gvr := schema.GroupVersionResource{
        Group:    "stable.example.com",
        Version:  "v1",
        Resource: "crontabs",
    }

    crontabList, err := dynamicClient.Resource(gvr).List(context.TODO(), v1.ListOptions{})
    if err != nil {
        panic(err)
    }

    var crontab *unstructured.Unstructured
    if len(crontabList.Items) < 1 {
        crontab, err = dynamicClient.Resource(gvr).
            Namespace("default").
            Create(context.TODO(),
                // Unstructed类型
                &unstructured.Unstructured{
                    Object: map[string]interface{}{
                        "apiVersion": "stable.example.com/v1",
                        "kind":       "CronTab",
                        "metadata": map[string]interface{}{
                            "name": "demo-crontab",
                        },
                        "spec": map[string]interface{}{
                            "cronSpec": "* * * * 1",
                            "replicas": 5,
                            "image":    "nginx/nginx",
                        },
                    },
                },
                v1.CreateOptions{})

        if err != nil {
            panic(err)
        }
    } else {
        crontab = &crontabList.Items[0]
    }

    pretty.Println(crontab)
}

RESTClient

如果用RESTClientCRUD 某个资源,之前先要配置好rest.Config

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

func main() {
    var kubeconfig *string
    if home := homeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // 这里开始配置
    config.APIPath = "/api"
    // 资源配置,就像ClientSet配置每个资源一样
    config.GroupVersion = &corev1.SchemeGroupVersion
    // 序列化器(编解码器)
    config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
    // 限速器
    config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(rest.DefaultQPS, rest.DefaultBurst)
    // RESTClient示例
    restClient, err := rest.RESTClientFor(config)
    if err != nil {
        panic(err)
    }

    var (
        options = &metav1.ListOptions{}
        pods = &corev1.PodList{}
        ctx = context.Background()
    )

    if err := restClient.Get().
        Resource("pods").
        VersionedParams(options, scheme.ParameterCodec).
        Namespace("").
        Do(ctx).
        Into(pods); err != nil {
        panic(err)
    }

    for _, item := range pods.Items {
        fmt.Println(item.Namespace, "/", item.Name)
    }
}

主要是以Resquest请求处理为主,封装了http.Client对象,但是http.Client对象是存放在RESTClient,当调用RESTClient时就返回Resquest对象,这是就可以调用Resources() VersionedParams() Do()等方法。最后在Do()方法返回Result类型,是用来进行对返回数据的处理,如Into(&pods)就是对数据转换成目录类型。

KubeConfig

kubeconfig

下图是其中一个解析kubeconfig过程,主要涉及三个对象分别是DeferredLoadingClientConfig,DeferredLoadingClientConfig,DirectClientConfig

img
img

DeferredLoadingClientConfig

对象里有个loader成员,用来调用加载配置的规则,对象里每个可导出的方法里首先执行createClientConfig()方法(除ConfigAccess()外),通过这个方调用config.loader.Load()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type DeferredLoadingClientConfig struct {
    // 加载配置规则
    loader         ClientConfigLoader
    ...
}

func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
    if config.clientConfig == nil {
        ...
        if config.clientConfig == nil {
            mergedConfig, err := config.loader.Load()
            ....
        }
    }

    return config.clientConfig, nil
}

ClientCofnigLoadingRules

作用是当有多个配置的时候会合成一个clientcmdapi.Config对象,返回给DeferredLoadingClientConfig对象,如上述代码。

  • LoadFromFile()加载文件,读取字节码。
  • Load() 调用Codec.Decoder.Decode()编解码器解析文件内容。
1
2
3
4
5
6
7
type Config struct {
    ...
    Clusters map[string]*Cluster `json:"clusters"`
    AuthInfos map[string]*AuthInfo `json:"users"`
    Contexts map[string]*Context `json:"contexts"`
    ...
}

DirectClientConfig

最后通过DirectClientConfig.ClientConfig()获取rest.Config对象返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
    if config.clientConfig == nil {
        if config.clientConfig == nil {
            mergedConfig, err := config.loader.Load()

            if config.fallbackReader != nil {
                mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
            } else {
                // 运行client-go的example/out-of-cluster-client-configuration会执行这段
                mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
            }

            config.clientConfig = mergedClientConfig
        }
    }

    return config.clientConfig, nil
}

func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
    mergedClientConfig, err := config.createClientConfig()
    if err != nil {
        return nil, err
    }
    ...
    // 最后调用
    mergedConfig, err := mergedClientConfig.ClientConfig()
    ...
}

最后附上对象图:

img
img

Informer

k8s-informer

Client-go Controller Interaction
Client-go Controller Interaction

WorkQueue

workqueue

EventBroadcaster

代码生成器

client-gen 代码生成器

lister-gen 代码生成器

informer-gen 代码生成器