Skip to content

Commit 30d8f07

Browse files
committed
Make IO Buffer size configurable.
1 parent 6014154 commit 30d8f07

File tree

8 files changed

+72
-9
lines changed

8 files changed

+72
-9
lines changed

benchmark/benchmark.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
163163
if nw != nil {
164164
lis = nw.Listener(lis)
165165
}
166+
opts = append(opts, grpc.WriteBufferSize(128*1024))
167+
opts = append(opts, grpc.ReadBufferSize(128*1024))
166168
s := grpc.NewServer(opts...)
167169
switch info.Type {
168170
case "protobuf":
@@ -236,6 +238,8 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli
236238

237239
// NewClientConn creates a gRPC client connection to addr.
238240
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
241+
opts = append(opts, grpc.WithWriteBufferSize(128*1024))
242+
opts = append(opts, grpc.WithReadBufferSize(128*1024))
239243
conn, err := grpc.Dial(addr, opts...)
240244
if err != nil {
241245
grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)

clientconn.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,22 @@ const (
100100
// DialOption configures how we set up the connection.
101101
type DialOption func(*dialOptions)
102102

103+
// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
104+
// before doing a write on the wire.
105+
func WithWriteBufferSize(s int) DialOption {
106+
return func(o *dialOptions) {
107+
o.copts.WriteBufferSize = s
108+
}
109+
}
110+
111+
// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
112+
// for each read syscall.
113+
func WithReadBufferSize(s int) DialOption {
114+
return func(o *dialOptions) {
115+
o.copts.ReadBufferSize = s
116+
}
117+
}
118+
103119
// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
104120
// The lower bound for window size is 64K and any value smaller than that will be ignored.
105121
func WithInitialWindowSize(s int32) DialOption {

server.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ type options struct {
116116
keepalivePolicy keepalive.EnforcementPolicy
117117
initialWindowSize int32
118118
initialConnWindowSize int32
119+
writeBufferSize int
120+
readBufferSize int
119121
}
120122

121123
var defaultServerOptions = options{
@@ -126,6 +128,22 @@ var defaultServerOptions = options{
126128
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
127129
type ServerOption func(*options)
128130

131+
// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
132+
// before doing a write on the wire.
133+
func WriteBufferSize(s int) ServerOption {
134+
return func(o *options) {
135+
o.writeBufferSize = s
136+
}
137+
}
138+
139+
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
140+
// for one read syscall.
141+
func ReadBufferSize(s int) ServerOption {
142+
return func(o *options) {
143+
o.readBufferSize = s
144+
}
145+
}
146+
129147
// InitialWindowSize returns a ServerOption that sets window size for stream.
130148
// The lower bound for window size is 64K and any value smaller than that will be ignored.
131149
func InitialWindowSize(s int32) ServerOption {
@@ -524,6 +542,8 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
524542
KeepalivePolicy: s.opts.keepalivePolicy,
525543
InitialWindowSize: s.opts.initialWindowSize,
526544
InitialConnWindowSize: s.opts.initialConnWindowSize,
545+
WriteBufferSize: s.opts.writeBufferSize,
546+
ReadBufferSize: s.opts.readBufferSize,
527547
}
528548
st, err := transport.NewServerTransport("http2", c, config)
529549
if err != nil {

transport/http2_client.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
194194
dynamicWindow = false
195195
}
196196
var buf bytes.Buffer
197+
writeBufSize := defaultWriteBufSize
198+
if opts.WriteBufferSize > 0 {
199+
writeBufSize = opts.WriteBufferSize
200+
}
201+
readBufSize := defaultReadBufSize
202+
if opts.ReadBufferSize > 0 {
203+
readBufSize = opts.ReadBufferSize
204+
}
197205
t := &http2Client{
198206
ctx: ctx,
199207
target: addr.Addr,
@@ -209,9 +217,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
209217
errorChan: make(chan struct{}),
210218
goAway: make(chan struct{}),
211219
awakenKeepalive: make(chan struct{}, 1),
212-
framer: newFramer(conn),
213220
hBuf: &buf,
214221
hEnc: hpack.NewEncoder(&buf),
222+
framer: newFramer(conn, writeBufSize, readBufSize),
215223
controlBuf: newControlBuffer(),
216224
fc: &inFlow{limit: uint32(icwz)},
217225
sendQuotaPool: newQuotaPool(defaultWindowSize),

transport/http2_server.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,15 @@ type http2Server struct {
116116
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
117117
// returned if something goes wrong.
118118
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
119-
framer := newFramer(conn)
119+
writeBufSize := defaultWriteBufSize
120+
if config.WriteBufferSize > 0 {
121+
writeBufSize = config.WriteBufferSize
122+
}
123+
readBufSize := defaultReadBufSize
124+
if config.ReadBufferSize > 0 {
125+
readBufSize = config.ReadBufferSize
126+
}
127+
framer := newFramer(conn, writeBufSize, readBufSize)
120128
// Send initial settings as connection preface to client.
121129
var isettings []http2.Setting
122130
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is

transport/http_util.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ const (
4444
// http://http2.github.io/http2-spec/#SettingValues
4545
http2InitHeaderTableSize = 4096
4646
// http2IOBufSize specifies the buffer size for sending frames.
47-
http2IOBufSize = 32 * 1024
47+
defaultWriteBufSize = 32 * 1024
48+
defaultReadBufSize = 32 * 1024
4849
)
4950

5051
var (
@@ -474,10 +475,10 @@ type framer struct {
474475
fr *http2.Framer
475476
}
476477

477-
func newFramer(conn net.Conn) *framer {
478+
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
478479
f := &framer{
479-
reader: bufio.NewReaderSize(conn, http2IOBufSize),
480-
writer: bufio.NewWriterSize(conn, http2IOBufSize),
480+
reader: bufio.NewReaderSize(conn, readBufferSize),
481+
writer: bufio.NewWriterSize(conn, writeBufferSize),
481482
}
482483
f.fr = http2.NewFramer(f.writer, f.reader)
483484
// Opt-in to Frame reuse API on framer to reduce garbage.

transport/transport.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,8 @@ type ServerConfig struct {
472472
KeepalivePolicy keepalive.EnforcementPolicy
473473
InitialWindowSize int32
474474
InitialConnWindowSize int32
475+
WriteBufferSize int
476+
ReadBufferSize int
475477
}
476478

477479
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -503,6 +505,10 @@ type ConnectOptions struct {
503505
InitialWindowSize int32
504506
// InitialConnWindowSize sets the initial window size for a connection.
505507
InitialConnWindowSize int32
508+
// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
509+
WriteBufferSize int
510+
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
511+
ReadBufferSize int
506512
}
507513

508514
// TargetInfo contains the information of the target such as network address and metadata.
@@ -525,7 +531,7 @@ type Options struct {
525531
Last bool
526532

527533
// Delay is a hint to the transport implementation for whether
528-
// the data could be buffered for a batching write. The
534+
// the data could be buffered for adetermines how much data can be batched before it's written on the wire. batching write. The
529535
// Transport implementation may ignore the hint.
530536
Delay bool
531537
}

transport/transport_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,8 +1989,8 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
19891989
t.Errorf("Error at server-side while reading preface from cleint. Err: %v", err)
19901990
return
19911991
}
1992-
reader := bufio.NewReaderSize(s.conn, http2IOBufSize)
1993-
writer := bufio.NewWriterSize(s.conn, http2IOBufSize)
1992+
reader := bufio.NewReaderSize(s.conn, defaultWriteBufSize)
1993+
writer := bufio.NewWriterSize(s.conn, defaultReadBufSize)
19941994
framer := http2.NewFramer(writer, reader)
19951995
if err = framer.WriteSettingsAck(); err != nil {
19961996
t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)

0 commit comments

Comments
 (0)