年中折腾过 istio,写了一篇 《白话 Istio》 ,简单介绍了 Istio 是一个配置组装工厂,从 k8s 等上游拉取 CRD 等配置,向下游 Envoy 供应 xDS 配置资源。
最近因为要了解增量实现机制,又翻了代码,这篇尝试从代码层面,简单记录一下的。
两句话
- 每个客户端流,一个主循环,等待两个 channel,一个
deltaReqChan
,处理来自客户端的请求,一个 pushChannel
处理上游配置变更事件
- 本文只是介绍基本的工作流,更多的复杂度在于,上游配置资源,到下游 xDS 资源的转换关系,这里并没有做具体介绍
入口
首先 istio 对 Envoy 提供了一组 GRPC service
比如这个 service/method
:
envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources
对应于 Envoy 中的 Incremental ADS,增量聚合模式,实现入口在 pilot/pkg/xds/delta.go
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error { // 鉴权 ids, err := s.authenticate(ctx)
// 新建 conn 对象 con := newDeltaConnection(peerAddr, stream)
// 单独协程读请求,读到一个请求,就写入 deltaReqChan go s.receiveDelta(con, ids)
// 主循环,每个 stream for { select { // 处理来自客户端 (envoy)的请求 case req, ok := <-con.deltaReqChan: if ok { if err := s.processDeltaRequest(req, con); err != nil { return err } } else { // Remote side closed connection or error processing the request. return <-con.errorChan }
// 处理来自上游配置变更的推送任务 case pushEv := <-con.pushChannel: err := s.pushConnectionDelta(con, pushEv) pushEv.done() if err != nil { return err }
case <-con.stop: return nil } } }
|
每个 stream 一个主循环,就干两件事:
- 处理请求,来自于 read 协程
- 处理推送任务,来自上游配置变更事件
处理请求
一次处理一个请求,在主循环中执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryRequest, con *Connection) error { // 客户端(envoy)回复了 ack,可以将 ack 状态上报 if s.StatusReporter != nil { s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce) }
// 是否需要回复,比如只是 NACK,则没必要回复 shouldRespond := s.shouldRespondDelta(con, req) if !shouldRespond { return nil }
request := &model.PushRequest{ Full: true, Push: con.proxy.LastPushContext, Reason: []model.TriggerReason{model.ProxyRequest}, Start: con.proxy.LastPushTime, Delta: model.ResourceDelta{ Subscribed: sets.New(req.ResourceNamesSubscribe...), Unsubscribed: sets.New(req.ResourceNamesUnsubscribe...), }, }
// 响应请求 return s.pushDeltaXds(con, con.Watched(req.TypeUrl), request) }
|
响应请求
根据 TypeURL
找对应的资源生成器,生成资源之后,封装成 DeltaDiscoveryResponse
后,发送给 Envoy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (s *DiscoveryServer) pushDeltaXds(con *Connection, w *model.WatchedResource, req *model.PushRequest, ) error { // 根据资源类型找到生成器,比如:CDS 是 CdsGenerator,EDS 是 EdsGenerator gen := s.findGenerator(w.TypeUrl, con)
// 根据生成器类型生成资源 switch g := gen.(type) { case model.XdsDeltaResourceGenerator: res, deletedRes, logdata, usedDelta, err = g.GenerateDeltas(con.proxy, req, w) // 除了 CDS 和 EDS,目前都还没有 delta 的实现 case model.XdsResourceGenerator: res, logdata, err = g.Generate(con.proxy, w, req) }
// 构造 DeltaDiscoveryResponse 发送给客户端 resp := &discovery.DeltaDiscoveryResponse{ ControlPlane: ControlPlane(), TypeUrl: w.TypeUrl, // TODO: send different version for incremental eds SystemVersionInfo: req.Push.PushVersion, Nonce: nonce(req.Push.LedgerVersion), Resources: res, } // 这里还会记录 NonceSent if err := con.sendDelta(resp); err != nil { return err } return nil }
|
生成资源
资源有多种,实现也分散了,具体可以看,这两个 interface 的具体实现:
XdsResourceGenerator
和 XdsDeltaResourceGenerator
主要逻辑就是,选取对应的 CRD 资源,生成 xDS 资源,以 LDS
为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (configgen *ConfigGeneratorImpl) BuildListeners(node *model.Proxy, push *model.PushContext, ) []*listener.Listener { builder := NewListenerBuilder(node, push)
// Mesh sidecar 和 Gateway 有不同的生成逻辑 switch node.Type { case model.SidecarProxy: builder = configgen.buildSidecarListeners(builder) case model.Router: // 主要是从 Gateway CRD 生成 LDS 的主要配置 builder = configgen.buildGatewayListeners(builder) }
// 再把对应的 envoyfilter patch 到 LDS builder.patchListeners() return builder.getListeners() }
|
订阅资源
讲完请求处理这条链路,再来看推送链路。
这就得先从订阅资源开始了,istio 启动之后,会从 k8s 订阅 EndpointSlice
,Pods
等资源
1 2 3
| c.registerHandlers(filteredInformer, "EndpointSlice", out.onEvent, nil)
c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, c.pods.labelFilter)
|
当有资源变更时,会触发 DiscoveryServer.ConfigUpdate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string, namespace string, istioEndpoints []*model.IstioEndpoint, ) { inboundEDSUpdates.Increment() // Update the endpoint shards pushType := s.edsCacheUpdate(shard, serviceName, namespace, istioEndpoints) if pushType == IncrementalPush || pushType == FullPush { // Trigger a push s.ConfigUpdate(&model.PushRequest{ Full: pushType == FullPush, ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: serviceName, Namespace: namespace}), Reason: []model.TriggerReason{model.EndpointUpdate}, }) } }
|
debounce
ConfigUpdate
只是进入 pushChannel
队列,中间会经过 debounce
处理,才会进入真正的任务队列 pushQueue
debounce
的核心是 update 事件的合并:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| for { select { case <-freeCh: free = true pushWorker() case r := <-ch: lastConfigUpdateTime = time.Now() if debouncedEvents == 0 { timeChan = time.After(opts.debounceAfter) startDebounce = lastConfigUpdateTime } debouncedEvents++
// 核心是 Merge req = req.Merge(r) case <-timeChan: if free { pushWorker() } case <-stopCh: return } }
|
聚合后的事件,会为每个客户端连接,都生成一个任务,塞入全局的 pushQueue
1 2 3
| for _, p := range s.AllClients() { s.pushQueue.Enqueue(p, req) }
|
推送
另外,再有一个单独的协程,从 pushQueue
消费,来完成推送
主要逻辑就是,喂给入口主循环等待的推送任务队列 pushChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) { for { select { case <-stopCh: return default: // 从 pushQueue 获取任务 client, push, shuttingdown := queue.Dequeue() if shuttingdown { return }
doneFunc := func() { queue.MarkDone(client) <-semaphore }
go func() { pushEv := &Event{ pushRequest: push, done: doneFunc, }
select { // 这里的 pushChannel 就是入口主循环等待的推送任务队列 case client.pushChannel <- pushEv: return case <-closed: // grpc stream was closed doneFunc() log.Infof("Client closed connection %v", client.conID) } }() } } }
|
增量机制
最后说下我了解到的增量实现:
目前的增量,是指单个资源粒度的单独更新
相对于原来按照资源类型,把所有资源全量更新的方式,是增量
单个资源的局部更新,还没有的
目前 Istio 实现的增量,还仅局限于单个连接内的增量
如果断连后重连,还是要走一遍全量推送的
虽然 Envoy 是支持了跨连接的增量支持
跨连接的增量是通过每个资源的版本号来的
从 xDS 协议设计上,每个资源都有版本号,Envoy 也确实会在重连时,将现有资源的版本号传给 Istio
只是 Istio 并没有处理版本号这块细节,发给 Envoy 的版本号只是默认的空字符串