Pilot 作为 Istio 控制面流量管理的核心组件,为整个服务网格提供了标准的独立与各种平台的服务模型,支持从 Kubernetes、Consul 等多种平台获取服务发现功能,支持用户通过 VirtualServiceGateway等API定制服务间的流量管理规则,并将这些配置信息转化为统一的服务发现和流量控制模型,以 xDS 方式下发到数据面。

pilot-discovery 扮演服务注册中心、istio控制平面到 sidecar之间的桥梁作用,负责将服务信息和配置数据转换为xDS接口的标准数据结构,通过gRPC下发到数据面的Envoy。

目前Pilot的输入包括两部分数据来源:

  • 服务数据: 来源于各个服务注册表(Service Registry),例如Kubernetes中注册的Service,Consul Catalog中的服务等。Pilot 监控服务注册中心(如 Kubernetes)的服务注册情况。在 Kubernetes 环境下,会监控 serviceendpointpodnode 等资源信息。
  • 配置规则: 各种配置规则,包括路由规则及流量管理规则等,通过Kubernetes CRD(Custom Resources Definition)形式定义并存储在Kubernetes中。Pilot 会监控 istio控制面信息变化,在 Kubernetes 环境下,会监控包括 RouteRuleVirtualServiceGatewayEgressRuleServiceEntry 等以 Kubernetes CRD形式存在的 istio控制面配置信息。

Pilot的输出为符合xDS接口的数据面配置数据,并通过gRPC Streaming接口将配置数据推送到数据面的Envoy中。

获取配置和服务数据

底层平台 多种多样,istio 抽象一套自己的数据模型(pilot/pkg/model)及数据存取接口,以屏蔽底层平台。

服务数据部分

中间Abstract Model 层 实现如下

Service describes an Istio service (e.g., catalog.mystore.com:8080)Each service has a fully qualified domain name (FQDN) and one or more ports where the service is listening for connections. Service用于表示Istio服务网格中的一个服务(例如 catalog.mystore.com:8080)。每一个服务有一个全限定域名(FQDN)和一个或者多个接收客户端请求的监听端口。

SercieInstance中存放了服务实例相关的信息,一个Service可以对应到一到多个Service Instance,Istio在收到客户端请求时,会根据该Service配置的LB策略和路由规则从可用的Service Instance中选择一个来提供服务。

ServiceDiscovery抽象了一个服务发现的接口,所有接入istio 的平台应提供该接口实现。

Controller抽象了一个Service Registry变化通知的接口,该接口会将Service及Service Instance的增加,删除,变化等消息通知给ServiceHandler(也就是一个func)。调用Controller的Run方法后,Controller会一直执行,将监控Service Registry的变化,并将通知到注册到Controller中的ServiceHandler中

由上图可知,底层平台 接入时必须实现 ServiceDiscovery 和 Controller,提供Service 数据,并在Service 变动时 执行handler。 整个流程 由Controller.Run 触发,将平台数据 同步and 转换到 istio 内部数据模型(ServiceDiscovery实现),若数据有变化,则触发handler。

配置数据部分

ConfigControllerPilot实现配置信息监控与处理的核心,它关联的几个关键的结构体如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//go 语言,源码摘自 pilot-discovery,pilot-discovery 实现配置监听的关键部分

// 用于存储 route rule、virtualservice 等流量配置信息
type ConfigStore interface {
    Schemas() collection.Schemas
    Get(typ resource.GroupVersionKind, name, namespace string) *Config
    List(typ resource.GroupVersionKind, namespace string) ([]Config, error)
    Create(config Config) (revision string, err error)
    Update(config Config) (newRevision string, err error)
    Delete(typ resource.GroupVersionKind, name, namespace string) error
    Version() string
    GetResourceAtVersion(version string, key string) (resourceVersion string, err error)
    GetLedger() ledger.Ledger
    SetLedger(ledger.Ledger) error
}

// 扩展了 ConfigStore 存储,并提供资源处理的注册函数,使用此函数注册后,资源变更会回调 handler 处理
type ConfigStoreCache interface {
    RegisterEventHandler(kind resource.GroupVersionKind, handler func(Config, Config, Event))
    Run(stop <-chan struct{})
    HasSynced() bool
}


//controller 实现了 ConfigStore 接口和 ConfigStoreCache 接口
type controller struct {
    client *Client
    queue  queue.Instance
    kinds  map[resource.GroupVersionKind]*cacheHandler
}

type Task func() error

// controller 的 queue 的类型,包装了 Task 任务
type Instance interface {
    Push(task Task)
    Run(<-chan struct{})
}

//initServiceControllers 下的 kubernets 下的 Controller ,由 initKubeRegistry 创建
func NewController(client kubernetes.Interface, options Options) *Controller {
    c := &Controller{
        client:                     client,
        queue:                      queue.NewQueue(1 * time.Second),
        ...
    }

    ...

    registerHandlers(c.services, c.queue, "Services", c.onServiceEvent)

ConfigController 用于处理 istio流控 CRD, 如 VirtualServiceDestinationRule 等。

  • ConfigStore 对象利用 client-go 库从 Kubernetes 获取 RouteRuleVirtualServiceCRD形式存在控制面信息,转换为 model 包下的 Config 对象,对外提供 GetListCreateUpdateDelete 等 CRUD 服务。
  • ConfigStoreCache 则主要扩展了:注册 Config 变更事件处理函数 RegisterEventHandler 、开始处理流程的 Run 方法。

Pilot中,目前实现了 ConfigStoreCachecontroller 主要有以下五种:

  • crd/controller/controller.go
  • serviceregistry/mcp/controller.go
  • kube/gateway/controller.go
  • kube/ingress/controller.go
  • memory/controller.go

其中比较关键的是 crd controllerCRDCustomResourceDefinition 的缩写 ,CRDContriller 利用 SharedIndexInformer 实现对 CRD资源的 list/watch。将 AddUpdateDelete 事件涉及到的 CRD资源对象封装为一个 Task ,并 push 到 ConfigControllerqueue 里,queue 队列始终处于监听状态,只要队列中有内容,就会回调 task 函数执行。关键代码的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//go 语言,源码摘自 pilot-discovery,pilot-discovery 实现配置监听的关键部分,接上一段代码中的 registerHandlers

func registerHandlers(informer cache.SharedIndexInformer, q queue.Instance, otype string,
    handler func(interface{}, model.Event) error) {

    informer.AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                ...
                q.Push(...)
                ...
            },
            UpdateFunc: func(old, cur interface{}) {
                ...
                q.Push(...)
                ...
            },
            DeleteFunc: func(obj interface{}) {
                ...
                q.Push(...)
                ...
            },
        })
}

//queue 的实现,始终等待执行 task
func (q *queueImpl) Run(stop <-chan struct{}) {
    ...
    for {
        if len(q.tasks) == 0 {
            return
        }
        task, q.tasks = q.tasks[0], q.tasks[1:]
        task()
    }
}

ConfigStore describes a set of platform agnostic APIs that must be supported by the underlying platform to store and retrieve Istio configuration. ConfigStore定义一组平台无关的,但是底层平台(例如K8S)必须支持的API,通过这些API可以存取Istio配置信息每个配置信息的键,由type + name + namespace的组合构成,确保每个配置具有唯一的键。写操作是异步执行的,也就是说Update后立即Get可能无法获得最新结果。

ConfigStoreCache表示ConfigStore的本地完整复制的缓存,此缓存主动和远程存储保持同步,并且在获取更新时提供提供通知机制。为了获得通知,事件处理器必须在Run之前注册,缓存需要在Run之后有一个初始的同步延迟。

IstioConfigStore扩展ConfigStore,增加一些针对Istio资源的操控接口

由上图可知,底层平台 接入时必须实现 ConfigStoreCache,提供Config 数据,并在Config 变动时 执行handler。 整个流程 由ConfigStoreCache.Run 触发,将平台数据 同步and 转换到 istio 内部数据模型(ConfigStore实现),若数据有变化,则触发handler。

Environment 聚合

Environment provides an aggregate environmental API for Pilot. Environment为Pilot提供聚合的环境性的API

由上文可知,启动时,向 Controller 和 ConfigStoreCache 注册handler,执行 ConfigStoreCache.Run 和 Controller.Run,便可以同步 service 和config 数据,并在数据变动时 触发handler 执行。pilot数据输入的部分就解决了

img
img

启动

pilot-discovery 关键实现逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//go 语言,源码摘自 pilot-discovery,pilot-discovery 初始化及启动的关键部分,省去异常处理

// 创建 discoveryServer 对象并启动
discoveryServer, err := bootstrap.NewServer(serverArgs)
discoveryServer.Start(stop)

// discoveryServer 对象的具体创建方法
func NewServer(args *PilotArgs) (\*Server, error) {
    //环境变量
    e := &model.Environment{...}

    s := &Server{
        clusterID:      getClusterID(args),                                //集群id
        environment:    e,                                                //环境变量
        EnvoyXdsServer: envoyv2.NewDiscoveryServer(e, args.Plugins),     //Pilot 针对 Envoy v2 xds APIs 的 gRPC 实现,用于通知 envoy 配置更新
        ...
    }

    s.initKubeClient(args)
    s.initMeshConfiguration(args, fileWatcher)
    s.initConfigController(args)
    s.initServiceControllers(args)
    s.initDiscoveryService(args)
    ...
}
...

gRPC服务启动
func (s *Server) Start(stop <-chan struct{}) error {
    go func() {
        s.grpcServer.Serve(s.GRPCListener)
    }()
}

pilot-discovery 的初始化主要在 pilot-discovery 的 init 方法和在 discovery 命令处理流程中调用的 bootstrap.NewServer 完成,关键步骤如下:

  • 创建 Kubernetes apiserver client(initKubeClient),可以在 pilot-discovery 的 discovery 命令的 kubeconfig flag 中提供文件路径,默认为空。
  • 读取 mesh 配置(initMeshConfiguration),包含 MixerCheckServerMixerReportServerProxyListenPortRdsRefreshDelayMixerAddress 等一些列配置,默认 mesh 配置文件"/etc/istio/config/mesh"。
  • 初始化与配置存储中心的连接(initConfigController 方法)对 istio做出的各种配置,比如 route rulevirtualservice 等,需要保存在配置存储中心(config store)内。
  • 配置与服务注册中心(service registry)的连接(initServiceControllers 方法)
  • 初始化 discovery 服务(initDiscoveryService),将 discovery 服务注册为 Config Controller 和 Service Controller 的 Event Handler,监听配置和服务变化消息。
  • 启动 gRPC Server 并接收来自 Envoy端的连接请求。
  • 接收 sidecar端的 xDS 请求,从 Config Controller、Service Controller 中获取配置和服务信息,生成响应消息发送给 sidecar
  • 监听来自 Config Controller 、Service Controller 的变化消息,并将配置、服务变化内容通过 xDS 接口推送到 sidecar

启动命令示例:/usr/local/bin/pilot-discovery discovery --monitoringAddr=:15014 --log_output_level=default:info --domain cluster.local --secureGrpcAddr --keepaliveMaxServerConnectionAge 30m

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package bootstrap
func NewServer(args *PilotArgs) (*Server, error) {
    s.initKubeClient(args)
    s.initMeshConfiguration(args, fileWatcher)
    s.initMeshNetworks(args, fileWatcher)
    s.initCertController(args)
    s.initConfigController(args)
    s.initServiceControllers(args)
    s.initDiscoveryService(args)
    s.initMonitor(args.DiscoveryOptions.MonitoringAddr)
    s.initClusterRegistries(args)
    s.initDNSListener(args)
    // Will run the sidecar injector in pilot.Only operates if /var/lib/istio/inject exists
    s.initSidecarInjector(args)
    s.initSDSCA(args)
}

启动的逻辑很多,但从config+service+grcServer 视角看 启动代码的核心如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func NewServer(args *PilotArgs) (*Server, error) {
    s.addStartFunc(func(stop <-chan struct{}) error {
        go s.configController.Run(stop)
        return nil
    })
    s.addStartFunc(func(stop <-chan struct{}) error {
        go serviceControllers.Run(stop)
        return nil
    })
    ## DiscoveryServer 注册config/service 事件handler
    s.initEventHandlers(){
        s.ServiceController().AppendServiceHandler(serviceHandler)
        s.ServiceController().AppendInstanceHandler(instanceHandler)
        s.configController.RegisterEventHandler(descriptor.Type, configHandler)
    }
    s.initGrpcServer(args.KeepaliveOptions)
}

处理xds请求

如果golang 里有类似 tomcat、springmvc 的组件,那源码看起来就很简单了。

img
img

envoy 通过grpc 协议与 pilot-discovery 交互,因此首先找 ads.proto 文件

ads.proto

基于ads.proto 生成 ads.pb.go 文件 github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2/ads.pb.go 其中定义了 服务接口 AggregatedDiscoveryServiceServer,其实现类 DiscoveryServer,DiscoveryServer 方法分散于多个go 文件中

img
img

DiscoveryServer 通过Environment 间接持有了 config和 service 数据。此外, pilot-discovery Server启动时便 为DiscoveryServer 注册了config service 变更处理函数,不管config/service 如何变更,都会触发 DiscoveryServer.ConfigUpdate

代码中 Server.EnvoyXdsServer 就是DiscoveryServer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *Server) initEventHandlers() error {
    // Flush cached discovery responses whenever services configuration change.
    serviceHandler := func(svc *model.Service, _ model.Event) {
        pushReq := &model.PushRequest{...}
        s.EnvoyXdsServer.ConfigUpdate(pushReq)
    }
    s.ServiceController().AppendServiceHandler(serviceHandler)
    instanceHandler := func(si *model.ServiceInstance, _ model.Event) {
        s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{...})
    }
    s.ServiceController().AppendInstanceHandler(instanceHandler)
    if s.configController != nil {
        configHandler := func(old, curr model.Config, _ model.Event) {
            ...
            s.EnvoyXdsServer.ConfigUpdate(pushReq)
        }
        for _, descriptor := range schemas.Istio {
            s.configController.RegisterEventHandler(descriptor.Type, configHandler)
        }
    }
    return nil
}

proxy

Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from ‘node’ info in the protocol as well as data extracted from registries. proxy struct是sidecar 在 pilot 内的一个表示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Proxy struct {
    ClusterID string
    // Type specifies the node type. First part of the ID.
    Type NodeType
    IPAddresses []string
    ID string
    Locality *core.Locality
    // DNSDomain defines the DNS domain suffix for short hostnames (e.g.
    // "default.svc.cluster.local")
    DNSDomain string
    ConfigNamespace string
    // Metadata key-value pairs extending the Node identifier
    Metadata *NodeMetadata
    // the sidecarScope associated with the proxy
    SidecarScope *SidecarScope
    // The merged gateways associated with the proxy if this is a Router
    MergedGateway *MergedGateway
    // service instances associated with the proxy
    ServiceInstances []*ServiceInstance
    // labels associated with the workload
    WorkloadLabels labels.Collection
    // Istio version associated with the Proxy
    IstioVersion *IstioVersion
}

envoy 向pilot 发送请求

grpc 请求通过 StreamAggregatedResources 来处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    peerInfo, ok := peer.FromContext(stream.Context())
    ...
    con := newXdsConnection(peerAddr, stream)
    ...
    // xds请求消息接收,接收后存放到reqChannel中
    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    go receiveThread(con, reqChannel, &receiveError)
    for {
        select {
        case discReq, ok := <-reqChannel:
            switch discReq.TypeUrl {
            case ClusterType:
                ...
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
            case ListenerType:
                ...
            case RouteType:
                ...
            case EndpointType:
                ...
            }
        case pushEv := <-con.pushChannel:
            ...
        }
    }
}

StreamAggregatedResources 函数的for循环是无限循环流程,这里会监控两个channel 通道的消息,一个是reqChannel的新连接消息, 一个是pushChannel的配置变更消息。reqChannel 接收到新数据时,会从reqChannel 取出xds 请求消息discReq, 然后根据不同类型的xds请求,调用相应的xds下发逻辑。在v2版本的xds 协议实现中,为了保证多个xds数据下发的顺序,lds、rds、cds和eds 等所有的交互均在一个grpc 连接上完成,因此StreamAggregatedResources 接收到第一个请求时,会将连接保存起来,供后续配置变更时使用。

DiscoveryServer 收到 ClusterType 的请求要生成 cluster 数据响应

1
2
3
4
5
6
7
8
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
    rawClusters := s.generateRawClusters(con.node, push)
    ...
    response := con.clusters(rawClusters, push.Version)
    err := con.send(response)
    ...
    return nil
}

cluster 数据实际由ConfigGenerator 生成

1
2
3
4
5
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) []*xdsapi.Cluster {
    rawClusters := s.ConfigGenerator.BuildClusters(node, push)
    ...
    return rawClusters
}

数据来自PushContext.Services 方法

1
2
3
4
5
6
7
8
func (configgen *ConfigGeneratorImpl) buildOutboundClusters(proxy *model.Proxy, push *model.PushContext) []*apiv2.Cluster {
    clusters := make([]*apiv2.Cluster, 0)
    networkView := model.GetNetworkView(proxy)
    for _, service := range push.Services(proxy) {
        ...
    }
    return clusters
}

cluster 数据来自 PushContext的privateServicesByNamespace 和 publicServices, 通过代码可以发现,它们都是初始化时从model.Environment 取Service 数据的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (ps *PushContext) Services(proxy *Proxy) []*Service {
    ...
    out := make([]*Service, 0)
    if proxy == nil {
        for _, privateServices := range ps.privateServicesByNamespace {
            out = append(out, privateServices...)
        }
    } else {
        out = append(out, ps.privateServicesByNamespace[proxy.ConfigNamespace]...)
    }
    out = append(out, ps.publicServices...)
    return out
}

pilot 监控到配合变化 将数据推给envoy

istio 收到变更事件并没有立即处理,而是创建一个定时器事件,通过定时器事件延迟一段时间。这样做的初衷:

  1. 减少配置变更的下发频率(会对多次变更进行合并),进而减少pilot 和 envoy 的通信开销(毕竟是广播,每一个envoy 都要发)
  2. 延迟对配置变更消息的处理, 可以保证配置下发时变更的完整性

config 或 service 数据变更触发 DiscoveryServer.ConfigUpdate 发送请求到 pushChannel

1
2
3
4
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
    inboundConfigUpdates.Increment()
    s.pushChannel <- req
}

DiscoveryServer 启动时 触发了handleUpdates 负责DiscoveryServer.pushChannel 的消费

1
2
3
4
5
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
    go s.handleUpdates(stopCh)
    go s.periodicRefreshMetrics(stopCh)
    go s.sendPushes(stopCh)
}

handleUpdates 触发 debounce(防抖动)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 第一个参数ch实际是 pushChannel
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
    var req *model.PushRequest
    pushWorker := func() {
        ...
        // 符合一定条件 执行 pushFn
        go push(req)
        ...
    }
    for {
        select {
        case <-freeCh:
            ...
        case r := <-ch:
            ...
            req = req.Merge(r)
        case <-timeChan:
            if free {
                pushWorker()
            }
        case <-stopCh:
            return
        }
    }
}

pushFn 实际是DiscoveryServer.Push ==> AdsPushAll ==> startPush 将数据塞入 PushQueue中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (s *DiscoveryServer) Push(req *model.PushRequest) {
    if !req.Full {
        req.Push = s.globalPushContext()
        go s.AdsPushAll(versionInfo(), req)
        return
    }
    ...
    req.Push = push
    go s.AdsPushAll(versionLocal, req)
}

DiscoveryServer 启动时 触发sendPushes ,负责消费PushQueue ==> doSendPushes 最终发给每一个envoy/conneciton 的pushChannel ,envoy/conneciton 的pushChannel 的消费逻辑在DiscoveryServer.StreamAggregatedResources的for 循环中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    ...
    for {
        select {
        case discReq, ok := <-reqChannel:
            ...
        case pushEv := <-con.pushChannel:
            err := s.pushConnection(con, pushEv)
		    pushEv.done()
		    if err != nil {
			    return nil
		    }
        }
    }
}

其它

img
img

参考资料