0%

MoE 系列[九] - Envoy Go 流式处理

最近 Envoy Go 扩展有一个比较大的改动:支持全双工流式处理(原来只支持半双工流式处理),趁此做些简单的介绍。

先搞清几个基本概念

什么是流式处理

与流式相对应的就是全缓冲

比如下载一个大文件,全缓冲是完整收到整个文件之后,才会转发给客户端

而流式则是收到一部分,也立即转发多少给客户端

在 nginx 里,默认是全缓冲,proxy_buffering off 就是开启流式处理

envoy 里底层默认是流式处理,除非有 filter 插件不支持流式,必须全缓冲处理

什么是异步处理

流式处理确实也算网关标配了,不过流式加异步就不是了

还是下载一个大文件,我们可以在网关上实现 gzip 压缩,减少网络带宽。

这个 gzip 压缩也是流式处理,但是并不要求异步,因为 gzip 是纯 CPU 计算任务,且 gzip 也有流式的支持。

但是,如果想给 AI 大模型 的流式响应,加上实时的安全审查。

此时,则需要将流式的内容,实时发送给远程审查服务,审查服务返回通过之后,才发给客户端。

这个安全审查,则是需要异步处理了。

这时 Nginx 底层的流式处理机制就玩不转了,因为 nginx 不支持 body 处理阶段挂起请求。
(当然,利用 lua cososcket 来替换 proxy 还是可以玩的)

Envoy 这块的底层机制要灵活一些,body 也支持异步处理,可以随时挂起请求。

Envoy Go 提供了流式的异步处理机制,我们可以使用 golang 的网络库,调用远程的服务,从而轻松的实现上面的安全审查,这是本次修复之前,也能支持的。

什么是全双工

对于普通的 http 请求,可以当做为半双工,也就是,客户端发出请求之后,等待响应之后,才会继续发送请求。
(http pipeline 有点不一样,但是对于 web server 来说,还是一个个来处理的,本质上差异也不大)

但是,对于 websocket,HTTP connnect,GRPC bidirectional 等就不一样了,这类协议里有像 tcp 那样的全双工通信机制,随时都能双向发送数据。

这次修复的问题,就是全双工才会遇到的问题,也是之前没有考虑到位的点。

原因

之前在介绍 并发安全 的时候,提到我们有设计一个所有权机制来解决并发问题,这里的所有权依赖的是一个请求处理的状态机。

而之前没有考虑到全双工的场景,这里的状态机在 c 和 Go 的交互期间,只能有一个存在,也就是,要么在处理请求,要么在处理响应。

所以,在全双工场景,状态机就玩不转了。

修复方式,也比较简单,也就是 c 和 Go 之间,使用两个状态机,请求和响应分别一个,这样就不会冲突了。

只是,知易行难,改动量还是有点大的 …

加上最近内部业务压力有点大,断断续续搞了两个月才搞定 …

演示

这里我们以 websocket 为例,来体验一下 Envoy Golang 对于全双工流式的处理。

这里的后端服务是 gorilla/websocket 的 echo 服务,也就是简单的将客户端发送的数据,原样的返回,这是后端的演示效果:

我们在浏览器端,发送了 foobar,收到的也是 foobar

websocket

流式修改内容

我们在中间加入一个 Envoy 的代理,并且开启我们的 Golang 插件示例

我们在 Golang 插件里,演示一下动态修改 websocket 的数据内容,在客户端发给 server 的数据里,加上 Hello, 前缀,在 server 返回给 client 的数据,加上 , World 后缀

如下图所示,我们在浏览器里发送 foobar,收到的就是 Hello, foo, WorldHello, bar, World

streaming-update

我们概览下 DecodeData 的核心代码:

因为 DecodeData 收到的数据,是原始数据,需要自己解析 websocket frame 的封包协议;也就是代码里的 readFrame,我们参考 gorilla/websocket 做了些小调整,就实现了一个简单的 frame 协议处理

1
2
3
4
5
6
7
8
9
10
11
12
13
bytes := data.Bytes()
f.reqBuffer = append(f.reqBuffer, bytes...)
var fr *frame
f.reqBuffer, fr = readFrame(f.reqBuffer)
if fr == nil {
// already cache into Golang side
data.Reset()
return api.Continue
}

newData := append([]byte("Hello, "), fr.GetData()...)
fr.SetData(newData)
data.Set(fr.Bytes())

调用远程检查服务

这个示例里,我们只是针对上行数据,也就是客户端发给服务端的数据,将每个包先发给一个远程的服务,示例里是 httpbin.org,如果返回了 200,我们将内容修改为 Authorized,否则就修改为 Unauthorized

所以,也就有了如下的演示效果:

streaming-async

同样,概览下 DecodeData 的核心代码:

这里的主要区别是,我们启动了一个新的启程,来做异步的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bytes := data.Bytes()
f.reqBuffer = append(f.reqBuffer, bytes...)
var fr *frame
f.reqBuffer, fr = readFrame(f.reqBuffer)
if fr == nil {
// already cache into Golang side
data.Reset()
return api.Continue
}
go func() {
bytes := fr.GetData()
ok := checkData(bytes)
if !ok {
bytes = []byte("Unauthorized")
} else {
bytes = []byte("Authorized")
}
fr.SetData(bytes)
data.Set(fr.Bytes())
f.callback.DecoderFilterCallbacks().Continue(api.Continue)
}()
return api.Running

再看看 checkData 的实现,因为 Envoy Go 支持全功能的 Golang,我们可以直接使用 Golang 的网络库来发请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Just a demo to request a remote server
func checkData(data []byte) bool {
req, err := http.NewRequest("POST", "https://httpbin.org/post", bytes.NewBuffer(data))
if err != nil {
api.LogDebugf("new request error: %v", err)
return false
}

httpc := &http.Client{}
resp, err := httpc.Do(req)
if err != nil {
api.LogDebugf("query request error: %v", err)
return false
}
resp.Body.Close()

return resp.StatusCode == 200
}

完整的示例代码可见:https://github.com/doujiang24/envoy-go-websocket-example

最后

网关的流式处理确实也不是什么新鲜的事了,大文件处理场景就已经是刚需了。

只是在 AI 大模型 的生成速度比较慢,又多了一个刚需的场景。

最后,支持全功能的 Golang 语言,优势还是很明显的,现成有的 Golang 库,都能直接使用,没有 tinygo 的限制,哈哈