gRPC的大消息传输

概述

使用gRPC的一个问题是,它的默认最大消息大小默认被设置为4MB,那么当数据量太大时该怎么办?

options

可以通过在创建Server的时候,配置相关的参数来扩大限制最小消息大小的值。

1
s := grpc.NewServer(grpc.MaxRecvMsgSize(size), grpc.MaxSendMsgSize(size))

MaxRecvMsgSizeMaxSendMsgSize分别设置服务器可以接收的最大消息大小和可以发送的最大消息大小(以字节为单位)。不设置的话默认都是4MB

虽然可以配置,但这种行为是一种滑坡谬误,可能会导致不断修改增加服务端客户端最大消息大小,而且每次请求不一定都需要全部的数据,会导致很多性能和资源上的浪费。

chunk

自然地将数据分成更小的块并使用gRPC流方法(stream)对其进行流式传输是一个不错的选择。

首先一个proto,是一个返回流式消息类型的rpc service

1
2
3
4
5
6
7
8
9
10
11
12
13
syntax = "proto3";

package pb;

service Chunker {
rpc Chunker(Empty) returns (stream Chunk) {}
}

message Empty{}

message Chunk {
bytes chunk = 1;
}

实现serverChunker逻辑。流式消息的大小设置为64KB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const chunkSize = 64 * 1024

type chunkerSrv []byte

func (c chunkerSrv) Chunker(_ *pb.Empty, srv pb.Chunker_ChunkerServer) error {
chunk := &pb.Chunk{}
n := len(c)
for cur := 0; cur < n; cur += chunkSize {
if cur+chunkSize > n {
chunk.Chunk = c[cur:n]
} else {
chunk.Chunk = c[cur : cur+chunkSize]
}
if err := srv.Send(chunk); err != nil {
return err
}
}
return nil
}

然后把gRPC服务端运行起来,使用随机填充128M的数据来方便测试。

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
blob := make([]byte, 128*1024*1024) // 128M
rand.Read(blob)
pb.RegisterChunkerServer(s, chunkerSrv(blob))
log.Println("serving on localhost:8888")
log.Fatal(s.Serve(listen))
}

编写个客户端请求一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
conn, err := grpc.Dial("localhost:8888", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
client := pb.NewChunkerClient(conn)
stream, err := client.Chunker(context.Background(), &pb.Empty{})
if err != nil {
log.Fatal(err)
}

var blob []byte
for {
c, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Printf("Transfer of %d bytes successful", len(blob))
// Transfer of 134217728 bytes successful
return
}
log.Fatal(err)
}
blob = append(blob, c.Chunk...)
}

使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样客户端自己再拼接成完整的数据,无论多少数据都可以不用修改配置。

完整代码

range

在数据量大的情况,不是每次都需要请求全量的数据。基于之上可以借鉴httprange协议来分片的取获取资源。同样的在Chunkerproto基础上修改,在请求的时候能传入零个(代表全部获取)或多个Range来分片获取资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";

package pb;

service RangeChunker {
rpc Range(Res) returns (stream Chunk) {}
}

message Res {
repeated Range r = 1;
}

message Range {
int32 start = 1;
int32 stop = 2;
}

message Chunk {
bytes chunk = 1;
}

服务端的实现主要是Range的解析,这里实现和httprange类似,使用0-99代表前100字节而不是0-100,并简化了很多,比如只保留了stop设置-1时代表最后一个字节,其他的负数操作都没有实现。需要的话可以自行修改rangeLimit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const chunkSize = 64 * 1024

type chunkerSrv []byte

func (c chunkerSrv) Range(r *pb.Res, srv pb.RangeChunker_RangeServer) error {
chunk := &pb.Chunk{}
ranges := c.parseRanges(r)

for _, rr := range ranges {
start, stop := rr[0], rr[1]
for cur := start; cur < stop; cur += chunkSize {
if cur+chunkSize > stop {
chunk.Chunk = c[cur:stop]
} else {
chunk.Chunk = c[cur : cur+chunkSize]
}
if err := srv.Send(chunk); err != nil {
return err
}
}
}
return nil
}

func (c chunkerSrv) parseRanges(r *pb.Res) [][2]int {
n := len(c)
ranges := [][2]int{}
rs := r.GetR()
if len(rs) == 0 {
return [][2]int{[2]int{0, n}}
}
for _, rr := range rs {
start, stop := rangeLimit(rr, n)
if start == -1 {
return nil
}
ranges = append(ranges, [2]int{start, stop})
}
return ranges
}

func rangeLimit(r *pb.Range, llen int) (int, int) {
start, stop := int(r.Start), int(r.Stop)+1
if stop > llen || stop == 0 {
stop = llen
}
if start < 0 || stop < 0 || start >= stop {
return -1, -1
}
return start, stop
}

客户端请求也很简单。

1
2
3
4
5
6
7
stream, err := client.Range(context.Background(), &pb.Res{
R: []*pb.Range{
{0, 99},
{100, 199},
{200, -1},
},
})

完整代码

这样我们就可以只请求数据的某个部分,基于此之上还可以并行请求,断点续传等。