0%

istio 代码浅读

年中折腾过 istio,写了一篇 《白话 Istio》 ,简单介绍了 Istio 是一个配置组装工厂,从 k8s 等上游拉取 CRD 等配置,向下游 Envoy 供应 xDS 配置资源。

最近因为要了解增量实现机制,又翻了代码,这篇尝试从代码层面,简单记录一下的。

两句话

  1. 每个客户端流,一个主循环,等待两个 channel,一个 deltaReqChan,处理来自客户端的请求,一个 pushChannel 处理上游配置变更事件
  2. 本文只是介绍基本的工作流,更多的复杂度在于,上游配置资源,到下游 xDS 资源的转换关系,这里并没有做具体介绍

入口

首先 istio 对 Envoy 提供了一组 GRPC service

比如这个 service/method

envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources

对应于 Envoy 中的 Incremental ADS,增量聚合模式,实现入口在 pilot/pkg/xds/delta.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
// 鉴权
ids, err := s.authenticate(ctx)

// 新建 conn 对象
con := newDeltaConnection(peerAddr, stream)

// 单独协程读请求,读到一个请求,就写入 deltaReqChan
go s.receiveDelta(con, ids)

// 主循环,每个 stream
for {
select {
// 处理来自客户端 (envoy)的请求
case req, ok := <-con.deltaReqChan:
if ok {
if err := s.processDeltaRequest(req, con); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}

// 处理来自上游配置变更的推送任务
case pushEv := <-con.pushChannel:
err := s.pushConnectionDelta(con, pushEv)
pushEv.done()
if err != nil {
return err
}

case <-con.stop:
return nil
}
}
}

每个 stream 一个主循环,就干两件事:

  1. 处理请求,来自于 read 协程
  2. 处理推送任务,来自上游配置变更事件

处理请求

一次处理一个请求,在主循环中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryRequest, con *Connection) error {
// 客户端(envoy)回复了 ack,可以将 ack 状态上报
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)
}

// 是否需要回复,比如只是 NACK,则没必要回复
shouldRespond := s.shouldRespondDelta(con, req)
if !shouldRespond {
return nil
}

request := &model.PushRequest{
Full: true,
Push: con.proxy.LastPushContext,
Reason: []model.TriggerReason{model.ProxyRequest},
Start: con.proxy.LastPushTime,
Delta: model.ResourceDelta{
Subscribed: sets.New(req.ResourceNamesSubscribe...),
Unsubscribed: sets.New(req.ResourceNamesUnsubscribe...),
},
}

// 响应请求
return s.pushDeltaXds(con, con.Watched(req.TypeUrl), request)
}

响应请求

根据 TypeURL 找对应的资源生成器,生成资源之后,封装成 DeltaDiscoveryResponse 后,发送给 Envoy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *DiscoveryServer) pushDeltaXds(con *Connection,
w *model.WatchedResource, req *model.PushRequest,
) error {
// 根据资源类型找到生成器,比如:CDS 是 CdsGenerator,EDS 是 EdsGenerator
gen := s.findGenerator(w.TypeUrl, con)

// 根据生成器类型生成资源
switch g := gen.(type) {
case model.XdsDeltaResourceGenerator:
res, deletedRes, logdata, usedDelta, err = g.GenerateDeltas(con.proxy, req, w)
// 除了 CDS 和 EDS,目前都还没有 delta 的实现
case model.XdsResourceGenerator:
res, logdata, err = g.Generate(con.proxy, w, req)
}

// 构造 DeltaDiscoveryResponse 发送给客户端
resp := &discovery.DeltaDiscoveryResponse{
ControlPlane: ControlPlane(),
TypeUrl: w.TypeUrl,
// TODO: send different version for incremental eds
SystemVersionInfo: req.Push.PushVersion,
Nonce: nonce(req.Push.LedgerVersion),
Resources: res,
}
// 这里还会记录 NonceSent
if err := con.sendDelta(resp); err != nil {
return err
}
return nil
}

生成资源

资源有多种,实现也分散了,具体可以看,这两个 interface 的具体实现:

XdsResourceGeneratorXdsDeltaResourceGenerator

主要逻辑就是,选取对应的 CRD 资源,生成 xDS 资源,以 LDS 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (configgen *ConfigGeneratorImpl) BuildListeners(node *model.Proxy,
push *model.PushContext,
) []*listener.Listener {
builder := NewListenerBuilder(node, push)

// Mesh sidecar 和 Gateway 有不同的生成逻辑
switch node.Type {
case model.SidecarProxy:
builder = configgen.buildSidecarListeners(builder)
case model.Router:
// 主要是从 Gateway CRD 生成 LDS 的主要配置
builder = configgen.buildGatewayListeners(builder)
}

// 再把对应的 envoyfilter patch 到 LDS
builder.patchListeners()
return builder.getListeners()
}

订阅资源

讲完请求处理这条链路,再来看推送链路。

这就得先从订阅资源开始了,istio 启动之后,会从 k8s 订阅 EndpointSlicePods 等资源

1
2
3
c.registerHandlers(filteredInformer, "EndpointSlice", out.onEvent, nil)

c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, c.pods.labelFilter)

当有资源变更时,会触发 DiscoveryServer.ConfigUpdate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint,
) {
inboundEDSUpdates.Increment()
// Update the endpoint shards
pushType := s.edsCacheUpdate(shard, serviceName, namespace, istioEndpoints)
if pushType == IncrementalPush || pushType == FullPush {
// Trigger a push
s.ConfigUpdate(&model.PushRequest{
Full: pushType == FullPush,
ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
Reason: []model.TriggerReason{model.EndpointUpdate},
})
}
}

debounce

ConfigUpdate 只是进入 pushChannel 队列,中间会经过 debounce 处理,才会进入真正的任务队列 pushQueue

debounce 的核心是 update 事件的合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(opts.debounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++

// 核心是 Merge
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}

聚合后的事件,会为每个客户端连接,都生成一个任务,塞入全局的 pushQueue

1
2
3
for _, p := range s.AllClients() {
s.pushQueue.Enqueue(p, req)
}

推送

另外,再有一个单独的协程,从 pushQueue 消费,来完成推送

主要逻辑就是,喂给入口主循环等待的推送任务队列 pushChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
// 从 pushQueue 获取任务
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
return
}

doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}

go func() {
pushEv := &Event{
pushRequest: push,
done: doneFunc,
}

select {
// 这里的 pushChannel 就是入口主循环等待的推送任务队列
case client.pushChannel <- pushEv:
return
case <-closed: // grpc stream was closed
doneFunc()
log.Infof("Client closed connection %v", client.conID)
}
}()
}
}
}

增量机制

最后说下我了解到的增量实现:

  1. 目前的增量,是指单个资源粒度的单独更新
    相对于原来按照资源类型,把所有资源全量更新的方式,是增量
    单个资源的局部更新,还没有的

  2. 目前 Istio 实现的增量,还仅局限于单个连接内的增量
    如果断连后重连,还是要走一遍全量推送的
    虽然 Envoy 是支持了跨连接的增量支持

  3. 跨连接的增量是通过每个资源的版本号来的
    从 xDS 协议设计上,每个资源都有版本号,Envoy 也确实会在重连时,将现有资源的版本号传给 Istio
    只是 Istio 并没有处理版本号这块细节,发给 Envoy 的版本号只是默认的空字符串