- Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathstream.go
142 lines (125 loc) · 4.07 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.
package blobs
import (
"context"
"io"
"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
)
// Within the blob service, streaming is used in two functions:
// - GetStream, streaming from server to client
// - PutStream, streaming from client to server
// These functions are used to read or write files on a remote node.
// The io.ReadCloser we implement here are used on the _receiver's_
// side, to read from either Blob_GetStreamClient or Blob_PutStreamServer.
// The function streamContent() is used on the _sender's_ side to split
// the content and send it using Blob_GetStreamServer or Blob_PutStreamClient.
// chunkSize was decided to be 128K after running an experiment benchmarking
// ReadFile and WriteFile. It seems like the benefits of streaming do not appear
// until files of 1 MB or larger, and for those files, 128K chunks are optimal.
// For ReadFile, larger chunks are more efficient but the gains are not as significant
// past 128K. For WriteFile, 128K chunks perform best, and past that, performance
// starts decreasing.
varchunkSize=128*1<<10
// blobStreamReader implements a ReadCloser which receives
// gRPC streaming messages.
var_ ioctx.ReadCloserCtx=&blobStreamReader{}
typestreamReceiverinterface {
SendAndClose(*blobspb.StreamResponse) error
Recv() (*blobspb.StreamChunk, error)
}
// nopSendAndClose creates a GetStreamClient that has a nop SendAndClose function.
// This is needed as Blob_GetStreamClient does not have a Close() function, whereas
// the other sender, Blob_PutStreamServer, does.
typenopSendAndClosestruct {
blobspb.Blob_GetStreamClient
}
func (*nopSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
returnnil
}
// newGetStreamReader creates an io.ReadCloser that uses gRPC's streaming API
// to read chunks of data.
funcnewGetStreamReader(client blobspb.Blob_GetStreamClient) ioctx.ReadCloserCtx {
return&blobStreamReader{
stream: &nopSendAndClose{client},
}
}
// newPutStreamReader creates an io.ReadCloser that uses gRPC's streaming API
// to read chunks of data.
funcnewPutStreamReader(client blobspb.Blob_PutStreamServer) ioctx.ReadCloserCtx {
return&blobStreamReader{stream: client}
}
typeblobStreamReaderstruct {
lastPayload []byte
lastOffsetint
streamstreamReceiver
EOFReachedbool
}
func (r*blobStreamReader) Read(ctx context.Context, out []byte) (int, error) {
ifr.EOFReached {
return0, io.EOF
}
offset:=0
// Use the last payload.
ifr.lastPayload!=nil {
offset=len(r.lastPayload) -r.lastOffset
iflen(out) <offset {
copy(out, r.lastPayload[r.lastOffset:])
r.lastOffset+=len(out)
returnlen(out), nil
}
copy(out[:offset], r.lastPayload[r.lastOffset:])
r.lastPayload=nil
}
foroffset<len(out) {
chunk, err:=r.stream.Recv()
iferr==io.EOF {
r.EOFReached=true
break
}
iferr!=nil {
returnoffset, err
}
varlenToWriteint
iflen(out)-offset>=len(chunk.Payload) {
lenToWrite=len(chunk.Payload)
} else {
lenToWrite=len(out) -offset
// Need to cache payload.
r.lastPayload=chunk.Payload
r.lastOffset=lenToWrite
}
copy(out[offset:offset+lenToWrite], chunk.Payload[:lenToWrite])
offset+=lenToWrite
}
returnoffset, nil
}
func (r*blobStreamReader) Close(ctx context.Context) error {
returnr.stream.SendAndClose(&blobspb.StreamResponse{})
}
typestreamSenderinterface {
Send(*blobspb.StreamChunk) error
}
// streamContent splits the content into chunks, of size `chunkSize`,
// and streams those chunks to sender.
// Note: This does not close the stream.
funcstreamContent(ctx context.Context, senderstreamSender, content ioctx.ReaderCtx) error {
payload:=make([]byte, chunkSize)
varchunk blobspb.StreamChunk
for {
n, err:=content.Read(ctx, payload)
ifn>0 {
chunk.Payload=payload[:n]
err=sender.Send(&chunk)
}
iferr==io.EOF {
returnnil
}
iferr!=nil {
returnerr
}
}
}