Skip to content

Commit e64d816

Browse files
committed
grpc-js: Avoid buffering significantly more than max_receive_message_size per received message (1.10.x)
1 parent 45e5fe5 commit e64d816

9 files changed

+173
-148
lines changed

packages/grpc-js/src/compression-filter.ts

+51-16
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
2121
import { Channel } from './channel';
2222
import { ChannelOptions } from './channel-options';
2323
import { CompressionAlgorithms } from './compression-algorithms';
24-
import { LogVerbosity } from './constants';
24+
import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
2525
import { BaseFilter, Filter, FilterFactory } from './filter';
2626
import * as logging from './logging';
2727
import { Metadata, MetadataValue } from './metadata';
@@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler {
9898
}
9999

100100
class DeflateHandler extends CompressionHandler {
101+
constructor(private maxRecvMessageLength: number) {
102+
super();
103+
}
104+
101105
compressMessage(message: Buffer) {
102106
return new Promise<Buffer>((resolve, reject) => {
103107
zlib.deflate(message, (err, output) => {
@@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler {
112116

113117
decompressMessage(message: Buffer) {
114118
return new Promise<Buffer>((resolve, reject) => {
115-
zlib.inflate(message, (err, output) => {
116-
if (err) {
117-
reject(err);
118-
} else {
119-
resolve(output);
119+
let totalLength = 0;
120+
const messageParts: Buffer[] = [];
121+
const decompresser = zlib.createInflate();
122+
decompresser.on('data', (chunk: Buffer) => {
123+
messageParts.push(chunk);
124+
totalLength += chunk.byteLength;
125+
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
126+
decompresser.destroy();
127+
reject({
128+
code: Status.RESOURCE_EXHAUSTED,
129+
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
130+
});
120131
}
121132
});
133+
decompresser.on('end', () => {
134+
resolve(Buffer.concat(messageParts));
135+
});
136+
decompresser.write(message);
137+
decompresser.end();
122138
});
123139
}
124140
}
125141

126142
class GzipHandler extends CompressionHandler {
143+
constructor(private maxRecvMessageLength: number) {
144+
super();
145+
}
146+
127147
compressMessage(message: Buffer) {
128148
return new Promise<Buffer>((resolve, reject) => {
129149
zlib.gzip(message, (err, output) => {
@@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler {
138158

139159
decompressMessage(message: Buffer) {
140160
return new Promise<Buffer>((resolve, reject) => {
141-
zlib.unzip(message, (err, output) => {
142-
if (err) {
143-
reject(err);
144-
} else {
145-
resolve(output);
161+
let totalLength = 0;
162+
const messageParts: Buffer[] = [];
163+
const decompresser = zlib.createGunzip();
164+
decompresser.on('data', (chunk: Buffer) => {
165+
messageParts.push(chunk);
166+
totalLength += chunk.byteLength;
167+
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
168+
decompresser.destroy();
169+
reject({
170+
code: Status.RESOURCE_EXHAUSTED,
171+
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
172+
});
146173
}
147174
});
175+
decompresser.on('end', () => {
176+
resolve(Buffer.concat(messageParts));
177+
});
178+
decompresser.write(message);
179+
decompresser.end();
148180
});
149181
}
150182
}
@@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler {
169201
}
170202
}
171203

172-
function getCompressionHandler(compressionName: string): CompressionHandler {
204+
function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
173205
switch (compressionName) {
174206
case 'identity':
175207
return new IdentityHandler();
176208
case 'deflate':
177-
return new DeflateHandler();
209+
return new DeflateHandler(maxReceiveMessageSize);
178210
case 'gzip':
179-
return new GzipHandler();
211+
return new GzipHandler(maxReceiveMessageSize);
180212
default:
181213
return new UnknownHandler(compressionName);
182214
}
@@ -186,6 +218,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
186218
private sendCompression: CompressionHandler = new IdentityHandler();
187219
private receiveCompression: CompressionHandler = new IdentityHandler();
188220
private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
221+
private maxReceiveMessageLength: number;
189222

190223
constructor(
191224
channelOptions: ChannelOptions,
@@ -195,6 +228,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
195228

196229
const compressionAlgorithmKey =
197230
channelOptions['grpc.default_compression_algorithm'];
231+
this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
198232
if (compressionAlgorithmKey !== undefined) {
199233
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
200234
const clientSelectedEncoding = CompressionAlgorithms[
@@ -215,7 +249,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
215249
) {
216250
this.currentCompressionAlgorithm = clientSelectedEncoding;
217251
this.sendCompression = getCompressionHandler(
218-
this.currentCompressionAlgorithm
252+
this.currentCompressionAlgorithm,
253+
-1
219254
);
220255
}
221256
} else {
@@ -247,7 +282,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
247282
if (receiveEncoding.length > 0) {
248283
const encoding: MetadataValue = receiveEncoding[0];
249284
if (typeof encoding === 'string') {
250-
this.receiveCompression = getCompressionHandler(encoding);
285+
this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
251286
}
252287
}
253288
metadata.remove('grpc-encoding');

packages/grpc-js/src/internal-channel.ts

-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import {
3333
} from './resolver';
3434
import { trace } from './logging';
3535
import { SubchannelAddress } from './subchannel-address';
36-
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
3736
import { mapProxyName } from './http_proxy';
3837
import { GrpcUri, parseUri, uriToString } from './uri-parser';
3938
import { ServerSurfaceCall } from './server-call';
@@ -402,7 +401,6 @@ export class InternalChannel {
402401
}
403402
);
404403
this.filterStackFactory = new FilterStackFactory([
405-
new MaxMessageSizeFilterFactory(this.options),
406404
new CompressionFilterFactory(this, this.options),
407405
]);
408406
this.trace(

packages/grpc-js/src/max-message-size-filter.ts

-88
This file was deleted.

packages/grpc-js/src/server-interceptors.ts

+54-34
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,10 @@ import {
3030
import * as http2 from 'http2';
3131
import { getErrorMessage } from './error';
3232
import * as zlib from 'zlib';
33-
import { promisify } from 'util';
3433
import { StreamDecoder } from './stream-decoder';
3534
import { CallEventTracker } from './transport';
3635
import * as logging from './logging';
3736

38-
const unzip = promisify(zlib.unzip);
39-
const inflate = promisify(zlib.inflate);
40-
4137
const TRACER_NAME = 'server_call';
4238

4339
function trace(text: string) {
@@ -496,7 +492,7 @@ export class BaseServerInterceptingCall
496492
private wantTrailers = false;
497493
private cancelNotified = false;
498494
private incomingEncoding = 'identity';
499-
private decoder = new StreamDecoder();
495+
private decoder: StreamDecoder;
500496
private readQueue: ReadQueueEntry[] = [];
501497
private isReadPending = false;
502498
private receivedHalfClose = false;
@@ -554,6 +550,8 @@ export class BaseServerInterceptingCall
554550
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
555551
}
556552

553+
this.decoder = new StreamDecoder(this.maxReceiveMessageSize);
554+
557555
const metadata = Metadata.fromHttp2Headers(headers);
558556

559557
if (logging.isTracerEnabled(TRACER_NAME)) {
@@ -674,18 +672,41 @@ export class BaseServerInterceptingCall
674672
message: Buffer,
675673
encoding: string
676674
): Buffer | Promise<Buffer> {
677-
switch (encoding) {
678-
case 'deflate':
679-
return inflate(message.subarray(5));
680-
case 'gzip':
681-
return unzip(message.subarray(5));
682-
case 'identity':
683-
return message.subarray(5);
684-
default:
685-
return Promise.reject({
686-
code: Status.UNIMPLEMENTED,
687-
details: `Received message compressed with unsupported encoding "${encoding}"`,
675+
const messageContents = message.subarray(5);
676+
if (encoding === 'identity') {
677+
return messageContents;
678+
} else if (encoding === 'deflate' || encoding === 'gzip') {
679+
let decompresser: zlib.Gunzip | zlib.Deflate;
680+
if (encoding === 'deflate') {
681+
decompresser = zlib.createInflate();
682+
} else {
683+
decompresser = zlib.createGunzip();
684+
}
685+
return new Promise((resolve, reject) => {
686+
let totalLength = 0
687+
const messageParts: Buffer[] = [];
688+
decompresser.on('data', (chunk: Buffer) => {
689+
messageParts.push(chunk);
690+
totalLength += chunk.byteLength;
691+
if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) {
692+
decompresser.destroy();
693+
reject({
694+
code: Status.RESOURCE_EXHAUSTED,
695+
details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}`
696+
});
697+
}
698+
});
699+
decompresser.on('end', () => {
700+
resolve(Buffer.concat(messageParts));
688701
});
702+
decompresser.write(messageContents);
703+
decompresser.end();
704+
});
705+
} else {
706+
return Promise.reject({
707+
code: Status.UNIMPLEMENTED,
708+
details: `Received message compressed with unsupported encoding "${encoding}"`,
709+
});
689710
}
690711
}
691712

@@ -698,10 +719,16 @@ export class BaseServerInterceptingCall
698719
const compressedMessageEncoding = compressed
699720
? this.incomingEncoding
700721
: 'identity';
701-
const decompressedMessage = await this.decompressMessage(
702-
queueEntry.compressedMessage!,
703-
compressedMessageEncoding
704-
);
722+
let decompressedMessage: Buffer;
723+
try {
724+
decompressedMessage = await this.decompressMessage(
725+
queueEntry.compressedMessage!,
726+
compressedMessageEncoding
727+
);
728+
} catch (err) {
729+
this.sendStatus(err as PartialStatusObject);
730+
return;
731+
}
705732
try {
706733
queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage);
707734
} catch (err) {
@@ -743,23 +770,16 @@ export class BaseServerInterceptingCall
743770
' received data frame of size ' +
744771
data.length
745772
);
746-
const rawMessages = this.decoder.write(data);
773+
let rawMessages: Buffer[];
774+
try {
775+
rawMessages = this.decoder.write(data);
776+
} catch (e) {
777+
this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, details: (e as Error).message });
778+
return;
779+
}
747780

748781
for (const messageBytes of rawMessages) {
749782
this.stream.pause();
750-
if (
751-
this.maxReceiveMessageSize !== -1 &&
752-
messageBytes.length - 5 > this.maxReceiveMessageSize
753-
) {
754-
this.sendStatus({
755-
code: Status.RESOURCE_EXHAUSTED,
756-
details: `Received message larger than max (${
757-
messageBytes.length - 5
758-
} vs. ${this.maxReceiveMessageSize})`,
759-
metadata: null,
760-
});
761-
return;
762-
}
763783
const queueEntry: ReadQueueEntry = {
764784
type: 'COMPRESSED',
765785
compressedMessage: messageBytes,

0 commit comments

Comments
 (0)