diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 7e25757b14a..a448b98b012 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -245,7 +245,7 @@ pq_configure(Port *port) { int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; int impl = -1; - char **server_compression_algorithms = zpq_get_supported_algorithms(); + char **server_compression_algorithms = zs_get_supported_algorithms(); int index = -1; char *protocol_extension = strchr(client_compression_algorithms, ';'); @@ -289,7 +289,8 @@ pq_configure(Port *port) if (index >= 0) /* Use compression */ { - PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0); + PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, + NULL, 0); if (!PqStream) { ereport(LOG, @@ -1080,7 +1081,7 @@ pq_recvbuf(bool nowait) if (r < 0) { - if (r == ZPQ_DECOMPRESS_ERROR) + if (r == ZS_DECOMPRESS_ERROR) { char const *msg = zpq_decompress_error(PqStream); diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 3057665bbec..1a1d447b487 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -6299,6 +6299,9 @@ threadRun(void *arg) int nsocks; /* number of sockets to be waited for */ int64 min_usec; int64 now_usec = 0; /* set this only if needed */ + bool buffered_rx = false; /* true if some of the clients has + * data left in SSL/ZPQ read + * buffers */ /* * identify which client sockets should be checked for input, and @@ -6339,6 +6342,9 @@ threadRun(void *arg) */ int sock = PQsocket(st->con); + /* check if conn has buffered SSL / ZPQ read data */ + buffered_rx = buffered_rx || PQreadPending(st->con); + if (sock < 0) { pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); @@ -6389,7 +6395,7 @@ threadRun(void *arg) { if (nsocks > 0) { - rc = wait_on_socket_set(sockets, min_usec); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, min_usec); } else /* nothing active, simple sleep */ { @@ -6398,7 +6404,7 @@ threadRun(void *arg) } else /* no explicit delay, wait without timeout */ { - rc = wait_on_socket_set(sockets, 0); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, 0); } if (rc < 0) @@ -6437,8 +6443,11 @@ threadRun(void *arg) pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); goto done; } - - if (!socket_has_input(sockets, sock, nsocks++)) + if (PQreadPending(st->con)) + { + nsocks++; + } + else if (!socket_has_input(sockets, sock, nsocks++)) continue; } else if (st->state == CSTATE_FINISHED || diff --git a/src/common/Makefile b/src/common/Makefile index bc6cba8ba9f..2808e65ab4a 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -78,6 +78,7 @@ OBJS_COMMON = \ username.o \ wait_error.o \ wchar.o \ + z_stream.o \ zpq_stream.o ifeq ($(with_openssl),yes) diff --git a/src/common/z_stream.c b/src/common/z_stream.c new file mode 100644 index 00000000000..c720578b7e9 --- /dev/null +++ b/src/common/z_stream.c @@ -0,0 +1,666 @@ +#include "c.h" +#include "pg_config.h" +#include "common/z_stream.h" + +/* + * Functions implementing streaming compression algorithm + */ +typedef struct +{ + /* + * Name of compression algorithm. + */ + char const *(*name) (void); + + /* + * Create new compression stream. level: compression level + */ + void *(*create_compressor) (int level); + + /* + * Create new decompression stream. + */ + void *(*create_decompressor) (); + + /* + * Decompress up to "src_size" compressed bytes from *src and write up to + * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed + * bytes written to *dst is stored in *dst_processed. Number of compressed + * bytes read from *src is stored in *src_processed. + * + * Return codes: ZS_OK if no errors were encountered during decompression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZS_DATA_PENDING means that there might be some data left within + * decompressor internal buffers. + * + * ZS_STREAM_END if encountered end of compressed data stream. + * + * ZS_DECOMPRESS_ERROR if encountered an error during decompression + * attempt. + */ + ssize_t (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Compress up to "src_size" raw (non-compressed) bytes from *src and + * write up to "dst_size" compressed bytes to *dst. Number of compressed + * bytes written to *dst is stored in *dst_processed. Number of + * non-compressed bytes read from *src is stored in *src_processed. + * + * Return codes: ZS_OK if no errors were encountered during compression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZS_DATA_PENDING means that there might be some data left within + * compressor internal buffers. + * + * ZS_COMPRESS_ERROR if encountered an error during compression attempt. + */ + ssize_t (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Free compression stream created by create_compressor function. + */ + void (*free_compressor) (void *cs); + + /* + * Free decompression stream created by create_decompressor function. + */ + void (*free_decompressor) (void *ds); + + /* + * Get compressor error message. + */ + char const *(*compress_error) (void *cs); + + /* + * Get decompressor error message. + */ + char const *(*decompress_error) (void *ds); + + ssize_t (*end_compression) (void *cs, void *dst, size_t dst_size, size_t *dst_processed); +} ZAlgorithm; + +struct ZStream +{ + ZAlgorithm const *c_algorithm; + void *c_stream; + + ZAlgorithm const *d_algorithm; + void *d_stream; + + bool rx_not_flushed; + bool tx_not_flushed; +}; + +#if HAVE_LIBZSTD + +#include +#include + +/* + * Maximum allowed back-reference distance, expressed as power of 2. + * This setting controls max compressor/decompressor window size. + * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + */ +#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ + + +typedef struct ZPQ_ZSTD_CStream +{ + ZSTD_CStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_CStream; + +typedef struct ZPQ_ZSTD_DStream +{ + ZSTD_DStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_DStream; + +static void * +zstd_create_compressor(int level) +{ + ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); + + c_stream->stream = ZSTD_createCStream(); + ZSTD_initCStream(c_stream->stream, level); +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); +#endif + c_stream->error = NULL; + return c_stream; +} + +static void * +zstd_create_decompressor() +{ + ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); + + d_stream->stream = ZSTD_createDStream(); + ZSTD_initDStream(d_stream->stream); +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); +#endif + d_stream->error = NULL; + return d_stream; +} + +static ssize_t +zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + size_t rc; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + rc = ZSTD_decompressStream(ds->stream, &out, &in); + + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) + { + ds->error = ZSTD_getErrorName(rc); + return ZS_DECOMPRESS_ERROR; + } + + if (rc == 0) + { + return ZS_STREAM_END; + } + + if (out.pos == out.size) + { + /* + * if `output.pos == output.size`, there might be some data left + * within internal buffers + */ + return ZS_DATA_PENDING; + } + return ZS_OK; +} + +static ssize_t +zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) + { + cs->error = ZSTD_getErrorName(rc); + return ZS_COMPRESS_ERROR; + } + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd + * buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + + *dst_processed = out.pos; + if (tx_not_flushed > 0) + { + return ZS_DATA_PENDING; + } + } + + return ZS_OK; +} + +static ssize_t +zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_outBuffer output; + + output.dst = dst; + output.pos = 0; + output.size = dst_size; + + size_t tx_not_flushed; + + do + { + tx_not_flushed = ZSTD_endStream(cs->stream, &output); + } while ((tx_not_flushed > 0) && (output.pos < output.size)); + + *dst_processed = output.pos; + + if (tx_not_flushed > 0) + { + return ZS_DATA_PENDING; + } + return ZS_OK; +} + +static void +zstd_free_compressor(void *c_stream) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + if (cs != NULL) + { + ZSTD_freeCStream(cs->stream); + free(cs); + } +} + +static void +zstd_free_decompressor(void *d_stream) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + free(ds); + } +} + +static char const * +zstd_compress_error(void *c_stream) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + return cs->error; +} + +static char const * +zstd_decompress_error(void *d_stream) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + return ds->error; +} + +static char const * +zstd_name(void) +{ + return "zstd"; +} + +#endif + +#if HAVE_LIBZ + +#include +#include + + +static void * +zlib_create_compressor(int level) +{ + int rc; + z_stream *c_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(c_stream, 0, sizeof(*c_stream)); + rc = deflateInit(c_stream, level); + if (rc != Z_OK) + { + free(c_stream); + return NULL; + } + return c_stream; +} + +static void * +zlib_create_decompressor() +{ + int rc; + z_stream *d_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); + if (rc != Z_OK) + { + free(d_stream); + return NULL; + } + return d_stream; +} + +static ssize_t +zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *ds = (z_stream *) d_stream; + int rc; + + ds->next_in = (Bytef *) src; + ds->avail_in = src_size; + ds->next_out = (Bytef *) dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) + { + return ZS_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZS_DECOMPRESS_ERROR; + } + + return ZS_OK; +} + +static ssize_t +zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc; + unsigned deflate_pending = 0; + + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *) src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; + + deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left + * in deflate buffer */ + if (deflate_pending > 0) + { + return ZS_DATA_PENDING; + } + return ZS_OK; +} + + +static ssize_t +zlib_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc; + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = NULL; + cs->avail_in = 0; + + rc = deflate(cs, Z_STREAM_END); + Assert(rc == Z_OK || rc == Z_STREAM_END); + *dst_processed = dst_size - cs->avail_out; + if (rc == Z_STREAM_END) + { + return ZS_OK; + } + + return ZS_DATA_PENDING; +} + +static void +zlib_free_compressor(void *c_stream) +{ + z_stream *cs = (z_stream *) c_stream; + + if (cs != NULL) + { + deflateEnd(cs); + free(cs); + } +} + +static void +zlib_free_decompressor(void *d_stream) +{ + z_stream *ds = (z_stream *) d_stream; + + if (ds != NULL) + { + inflateEnd(ds); + free(ds); + } +} + +static char const * +zlib_error(void *stream) +{ + z_stream *zs = (z_stream *) stream; + + return zs->msg; +} + +static char const * +zlib_name(void) +{ + return "zlib"; +} + +#endif + +static char const * +no_compression_name(void) +{ + return NULL; +} + +/* + * Array with all supported compression algorithms. + */ +static ZAlgorithm const zpq_algorithms[] = +{ +#if HAVE_LIBZSTD + {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error, zstd_end}, +#endif +#if HAVE_LIBZ + {zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error, zlib_end}, +#endif + {no_compression_name} +}; + +static ssize_t +zpq_init_compressor(ZStream * zs, int c_alg_impl, int c_level) +{ + zs->c_algorithm = &zpq_algorithms[c_alg_impl]; + zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); + if (zs->c_stream == NULL) + { + return -1; + } + return 0; +} + +static ssize_t +zpq_init_decompressor(ZStream * zs, int d_alg_impl) +{ + zs->d_algorithm = &zpq_algorithms[d_alg_impl]; + zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); + if (zs->d_stream == NULL) + { + return -1; + } + return 0; +} + +/* + * Index of used compression algorithm in zpq_algorithms array. + */ +ZStream * +zs_create(int c_alg_impl, int c_level, int d_alg_impl) +{ + ZStream *zs = (ZStream *) malloc(sizeof(ZStream)); + + zs->tx_not_flushed = false; + zs->rx_not_flushed = false; + + if (zpq_init_compressor(zs, c_alg_impl, c_level) || zpq_init_decompressor(zs, d_alg_impl)) + { + free(zs); + return NULL; + } + + return zs; +} + +ssize_t +zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + *src_processed = 0; + *dst_processed = 0; + + ssize_t rc = zs->d_algorithm->decompress(zs->d_stream, + src, src_size, src_processed, + dst, dst_size, dst_processed); + + zs->rx_not_flushed = false; + if (rc == ZS_DATA_PENDING) + { + zs->rx_not_flushed = true; + return ZS_OK; + } + + if (rc != ZS_OK && rc != ZS_STREAM_END) + { + return ZS_DECOMPRESS_ERROR; + } + + return rc; +} + +ssize_t +zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + *processed = 0; + *dst_processed = 0; + + ssize_t rc = zs->c_algorithm->compress(zs->c_stream, + buf, size, processed, + dst, dst_size, dst_processed); + + zs->tx_not_flushed = false; + if (rc == ZS_DATA_PENDING) + { + zs->tx_not_flushed = true; + return ZS_OK; + } + if (rc != ZS_OK) + { + return ZS_COMPRESS_ERROR; + } + + return rc; +} + +void +zs_free(ZStream * zs) +{ + if (zs) + { + if (zs->c_stream) + { + zs->c_algorithm->free_compressor(zs->c_stream); + } + if (zs->d_stream) + { + zs->d_algorithm->free_decompressor(zs->d_stream); + } + free(zs); + } +} + +ssize_t +zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed) +{ + *dst_processed = 0; + + ssize_t rc = zs->c_algorithm->end_compression(zs->c_stream, dst, dst_size, dst_processed); + + zs->tx_not_flushed = false; + if (rc == ZS_DATA_PENDING) + { + zs->tx_not_flushed = true; + return ZS_OK; + } + if (rc != ZS_OK) + { + return ZS_COMPRESS_ERROR; + } + + return rc; +} + +char const * +zs_compress_error(ZStream * zs) +{ + return zs->c_algorithm->compress_error(zs->c_stream); +} + +char const * +zs_decompress_error(ZStream * zs) +{ + return zs->d_algorithm->decompress_error(zs->d_stream); +} + +bool +zs_buffered_rx(ZStream * zs) +{ + return zs ? zs->rx_not_flushed : 0; +} + +bool +zs_buffered_tx(ZStream * zs) +{ + return zs ? zs->tx_not_flushed : 0; +} + +/* + * Get list of the supported algorithms. + */ +char ** +zs_get_supported_algorithms(void) +{ + size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); + char **algorithm_names = malloc(n_algorithms * sizeof(char *)); + + for (size_t i = 0; i < n_algorithms; i++) + { + algorithm_names[i] = (char *) zpq_algorithms[i].name(); + } + + return algorithm_names; +} + +char const * +zs_compress_algorithm_name(ZStream * zs) +{ + return zs ? zs->c_algorithm->name() : NULL; +} + +char const * +zs_decompress_algorithm_name(ZStream * zs) +{ + return zs ? zs->d_algorithm->name() : NULL; +} diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index 0451d2d5021..48f19a2f0b8 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -2,522 +2,507 @@ #include "common/zpq_stream.h" #include "c.h" #include "pg_config.h" +#include "port/pg_bswap.h" +#include "common/z_stream.h" -/* - * Functions implementing streaming compression algorithm - */ -typedef struct -{ - /* - * Name of compression algorithm. - */ - char const *(*name) (void); +#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each + * protocol command and command is mostly + * limited by record length, which in turn + * is usually less than page size (except + * TOAST) */ +#define ZPQ_COMPRESSED_MSG_TYPE 'm' +#define ZPQ_COMPRESS_THRESHOLD 50 - /* - * Create new compression stream. - * level: compression level - */ - void *(*create_compressor) (int level); +typedef struct ZpqBuffer ZpqBuffer; - /* - * Create new decompression stream. - */ - void *(*create_decompressor) (); +struct ZpqBuffer +{ + char buf[ZPQ_BUFFER_SIZE]; + size_t size; + size_t pos; +}; - /* - * Decompress up to "src_size" compressed bytes from *src and write up to - * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed - * bytes written to *dst is stored in *dst_processed. Number of compressed - * bytes read from *src is stored in *src_processed. - * - * Return codes: ZPQ_OK if no errors were encountered during decompression - * attempt. This return code does not guarantee that *src_processed > 0 or - * *dst_processed > 0. - * - * ZPQ_DATA_PENDING means that there might be some data left within - * decompressor internal buffers. - * - * ZPQ_STREAM_END if encountered end of compressed data stream. - * - * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression - * attempt. - */ - ssize_t (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); +static inline void +zpq_buf_init(ZpqBuffer * zb) +{ + zb->size = 0; + zb->pos = 0; +} - /* - * Compress up to "src_size" raw (non-compressed) bytes from *src and - * write up to "dst_size" compressed bytes to *dst. Number of compressed - * bytes written to *dst is stored in *dst_processed. Number of - * non-compressed bytes read from *src is stored in *src_processed. - * - * Return codes: ZPQ_OK if no errors were encountered during compression - * attempt. This return code does not guarantee that *src_processed > 0 or - * *dst_processed > 0. - * - * ZPQ_DATA_PENDING means that there might be some data left within - * compressor internal buffers. - * - * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt. - */ - ssize_t (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); +static inline size_t +zpq_buf_left(ZpqBuffer * zb) +{ + Assert(zb->buf); + return ZPQ_BUFFER_SIZE - zb->size; +} - /* - * Free compression stream created by create_compressor function. - */ - void (*free_compressor) (void *cs); +static inline size_t +zpq_buf_unread(ZpqBuffer * zb) +{ + return zb->size - zb->pos; +} - /* - * Free decompression stream created by create_decompressor function. - */ - void (*free_decompressor) (void *ds); +static inline char * +zpq_buf_size(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->size; +} - /* - * Get compressor error message. - */ - char const *(*compress_error) (void *cs); +static inline char * +zpq_buf_pos(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->pos; +} - /* - * Get decompressor error message. - */ - char const *(*decompress_error) (void *ds); -} ZpqAlgorithm; +static inline void * +zpq_buf_size_advance(ZpqBuffer * zb, size_t value) +{ + zb->size += value; +} +static inline void * +zpq_buf_pos_advance(ZpqBuffer * zb, size_t value) +{ + zb->pos += value; +} -#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each - * protocol command and command is mostly - * limited by record length, which in turn - * is usually less than page size (except - * TOAST) - */ +static inline void +zpq_buf_reuse(ZpqBuffer * zb) +{ + size_t unread = zpq_buf_unread(zb); + + if (unread > 5) /* can read message header, don't do anything */ + return; + if (unread == 0) + { + zb->size = 0; + zb->pos = 0; + return; + } + memmove(zb->buf, zb->buf + zb->pos, unread); + zb->size = unread; + zb->pos = 0; +} struct ZpqStream { - ZpqAlgorithm const *c_algorithm; - void *c_stream; + ZStream *z_stream; /* underlying compression stream */ - ZpqAlgorithm const *d_algorithm; - void *d_stream; + size_t tx_total; /* amount of bytes sent to tx_func */ - char tx_buf[ZPQ_BUFFER_SIZE]; - size_t tx_pos; - size_t tx_size; + size_t tx_total_raw; /* amount of bytes received by zpq_write */ + size_t rx_total; /* amount of bytes read by rx_func */ + size_t rx_total_raw; /* amount of bytes returned by zpq_write */ + bool is_compressing; /* current compression state */ - char rx_buf[ZPQ_BUFFER_SIZE]; - size_t rx_pos; - size_t rx_size; + bool is_decompressing; /* current decompression state */ + size_t rx_msg_bytes_left; /* number of bytes left to process without + * + * changing the decompression state */ + size_t tx_msg_bytes_left; /* number of bytes left to process without + * changing the compression state */ + + ZpqBuffer rx_in; /* buffer for unprocessed data read by rx_func */ + ZpqBuffer tx_in; /* buffer for unprocessed data consumed by + * zpq_write */ + ZpqBuffer tx_out; /* buffer for processed data waiting for send + * via tx_func */ - zpq_tx_func tx_func; zpq_rx_func rx_func; + zpq_tx_func tx_func; void *arg; - - size_t tx_total; - size_t tx_total_raw; - size_t rx_total; - size_t rx_total_raw; - - bool rx_not_flushed; - bool tx_not_flushed; }; -#if HAVE_LIBZSTD - -#include -#include - /* - * Maximum allowed back-reference distance, expressed as power of 2. - * This setting controls max compressor/decompressor window size. - * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + * Check if should compress message of msg_type with msg_len. + * Return true if should, false if should not. */ -#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ - -typedef struct ZPQ_ZSTD_CStream -{ - ZSTD_CStream *stream; - char const *error; /* error message */ -} ZPQ_ZSTD_CStream; - -typedef struct ZPQ_ZSTD_DStream -{ - ZSTD_DStream *stream; - char const *error; /* error message */ -} ZPQ_ZSTD_DStream; - -static void * -zstd_create_compressor(int level) +static inline bool +zpq_should_compress(char msg_type, uint32 msg_len) { - ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); - - c_stream->stream = ZSTD_createCStream(); - ZSTD_initCStream(c_stream->stream, level); -#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 - ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); -#endif - c_stream->error = NULL; - return c_stream; + return msg_len >= ZPQ_COMPRESS_THRESHOLD; } -static void * -zstd_create_decompressor() +/* + * Check if message is a CompressedMessage. + * Return true if it is, otherwise false. + * */ +static inline bool +zpq_is_compressed_message(char msg_type) { - ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); - - d_stream->stream = ZSTD_createDStream(); - ZSTD_initDStream(d_stream->stream); -#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 - ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); -#endif - d_stream->error = NULL; - return d_stream; + return msg_type == ZPQ_COMPRESSED_MSG_TYPE; } -static ssize_t -zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +ZpqStream * +zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) { - ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; - ZSTD_inBuffer in; - ZSTD_outBuffer out; - size_t rc; + ZpqStream *zpq = (ZpqStream *) malloc(sizeof(ZpqStream)); + + zpq->is_compressing = false; + zpq->is_decompressing = false; + zpq->rx_msg_bytes_left = 0; + zpq->tx_msg_bytes_left = 0; + zpq_buf_init(&zpq->tx_in); - in.src = src; - in.pos = 0; - in.size = src_size; + zpq->tx_total = 0; + zpq->tx_total_raw = 0; + zpq->rx_total = 0; + zpq->rx_total_raw = 0; - out.dst = dst; - out.pos = 0; - out.size = dst_size; + zpq_buf_init(&zpq->rx_in); + zpq_buf_size_advance(&zpq->rx_in, rx_data_size); + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zpq->rx_in.buf, rx_data, rx_data_size); - rc = ZSTD_decompressStream(ds->stream, &out, &in); + zpq_buf_init(&zpq->tx_out); - *src_processed = in.pos; - *dst_processed = out.pos; - if (ZSTD_isError(rc)) - { - ds->error = ZSTD_getErrorName(rc); - return ZPQ_DECOMPRESS_ERROR; - } + zpq->rx_func = rx_func; + zpq->tx_func = tx_func; + zpq->arg = arg; - if (out.pos == out.size) + zpq->z_stream = zs_create(c_alg_impl, c_level, d_alg_impl); + if (zpq->z_stream == NULL) { - /* - * if `output.pos == output.size`, there might be some data left - * within internal buffers - */ - return ZPQ_DATA_PENDING; + free(zpq); + return NULL; } - return ZPQ_OK; + return zpq; } -static ssize_t -zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +/* Compress up to src_size bytes from *src into CompressedMessage and write it to the tx buffer. + * Returns ZS_OK on success, ZS_COMPRESS_ERROR if encountered a compression error. */ +static inline ssize_t +zpq_write_compressed_message(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) { - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; - ZSTD_inBuffer in; - ZSTD_outBuffer out; - - in.src = src; - in.pos = 0; - in.size = src_size; - - out.dst = dst; - out.pos = 0; - out.size = dst_size; + size_t compressed_len; + ssize_t rc; + uint32 size; - if (in.pos < src_size) /* Has something to compress in input buffer */ + /* check if have enough space */ + if (zpq_buf_left(&zpq->tx_out) <= 5) { - size_t rc = ZSTD_compressStream(cs->stream, &out, &in); - - *dst_processed = out.pos; - *src_processed = in.pos; - if (ZSTD_isError(rc)) - { - cs->error = ZSTD_getErrorName(rc); - return ZPQ_COMPRESS_ERROR; - } + /* too little space for CompressedMessage, abort */ + *src_processed = 0; + return ZS_OK; } - if (in.pos == src_size) /* All data is compressed: flush internal zstd - * buffer */ + compressed_len = 0; + rc = zs_write(zpq->z_stream, src, src_size, src_processed, + zpq_buf_size(&zpq->tx_out) + 5, zpq_buf_left(&zpq->tx_out) - 5, &compressed_len); + + if (compressed_len > 0) { - size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + /* write CompressedMessage type */ + *zpq_buf_size(&zpq->tx_out) = ZPQ_COMPRESSED_MSG_TYPE; + size = pg_hton32(compressed_len + 4); - *dst_processed = out.pos; - if (tx_not_flushed > 0) - { - return ZPQ_DATA_PENDING; - } + memcpy(zpq_buf_size(&zpq->tx_out) + 1, &size, sizeof(uint32)); /* write msg length */ + compressed_len += 5; /* append header length to compressed data + * length */ } - return ZPQ_OK; + zpq->tx_total_raw += *src_processed; + zpq->tx_total += compressed_len; + + zpq_buf_size_advance(&zpq->tx_out, compressed_len); + return rc; } +/* Copy the data directly from *src to the tx buffer */ static void -zstd_free_compressor(void *c_stream) +zpq_write_uncompressed(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) { - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + src_size = Min(zpq_buf_left(&zpq->tx_out), src_size); + memcpy(zpq_buf_size(&zpq->tx_out), src, src_size); - if (cs != NULL) - { - ZSTD_freeCStream(cs->stream); - free(cs); - } + zpq->tx_total_raw += src_size; + zpq->tx_total += src_size; + zpq_buf_size_advance(&zpq->tx_out, src_size); + *src_processed = src_size; } -static void -zstd_free_decompressor(void *d_stream) +/* Determine if should compress the next message and + * change the current compression state */ +static ssize_t +zpq_toggle_compression(ZpqStream * zpq, char msg_type, uint32 msg_len) { - ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; - - if (ds != NULL) + if (zpq_should_compress(msg_type, msg_len)) { - ZSTD_freeDStream(ds->stream); - free(ds); + zpq->is_compressing = true; } + else if (zpq->is_compressing) + { + /* + * Won't compress the next message, should now finish the compression. + * Make sure there is no buffered data left in underlying compression + * stream + */ + while (zs_buffered_tx(zpq->z_stream)) + { + size_t flushed_len = 0; + ssize_t flush_rc = zpq_write_compressed_message(zpq, NULL, 0, &flushed_len); + + if (flush_rc != ZS_OK) + { + return flush_rc; + } + } + zpq->is_compressing = false; + } + zpq->tx_msg_bytes_left = msg_len + 1; + return 0; } -static char const * -zstd_compress_error(void *c_stream) +/* + * Internal write function. Reads the data from *src buffer, + * determines the postgres messages type and length. + * If message matches the compression criteria, it wraps the message into + * CompressedMessage. Otherwise, leaves the message unchanged. + * If *src data ends with incomplete message header, this function is not + * going to read this message header. + * Returns number of written raw bytes or error code. + * In the last case number of bytes written is stored in *processed. + */ +static ssize_t +zpq_write_internal(ZpqStream * zpq, void const *src, size_t src_size, size_t *processed) { - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + size_t src_pos = 0; + ssize_t rc; - return cs->error; -} + do + { + /* + * try to read ahead the next message types and increase + * tx_msg_bytes_left, if possible + */ + while (zpq->tx_msg_bytes_left > 0 && src_size - src_pos >= zpq->tx_msg_bytes_left + 5) + { + char msg_type = *((char *) src + src_pos + zpq->tx_msg_bytes_left); + uint32 msg_len; -static char const * -zstd_decompress_error(void *d_stream) -{ - ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + memcpy(&msg_len, (char *) src + src_pos + zpq->tx_msg_bytes_left + 1, 4); + msg_len = pg_ntoh32(msg_len); + if (zpq_should_compress(msg_type, msg_len) != zpq->is_compressing) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + zpq->tx_msg_bytes_left += msg_len + 1; + } - return ds->error; -} + /* + * Write CompressedMessage if currently is compressing or have some + * buffered data left in underlying compression stream + */ + if (zs_buffered_tx(zpq->z_stream) || (zpq->is_compressing && zpq->tx_msg_bytes_left > 0)) + { + size_t buf_processed = 0; + size_t to_compress = Min(zpq->tx_msg_bytes_left, src_size - src_pos); -static char const * -zstd_name(void) -{ - return "zstd"; -} + rc = zpq_write_compressed_message(zpq, (char *) src + src_pos, to_compress, &buf_processed); + src_pos += buf_processed; + zpq->tx_msg_bytes_left -= buf_processed; -#endif + if (rc != ZS_OK) + { + *processed = src_pos; + return rc; + } + } -#if HAVE_LIBZ + /* + * If not going to compress the data from *src, just write it + * uncompressed. + */ + else if (zpq->tx_msg_bytes_left > 0) + { /* determine next message type */ + size_t copy_len = Min(src_size - src_pos, zpq->tx_msg_bytes_left); + size_t copy_processed = 0; + + zpq_write_uncompressed(zpq, (char *) src + src_pos, copy_len, ©_processed); + src_pos += copy_processed; + zpq->tx_msg_bytes_left -= copy_processed; + } -#include -#include + /* + * Reached the compression toggle point, fetch next message header to + * determine compression state. + */ + else + { + char msg_type; + uint32 msg_len; + if (src_size - src_pos < 5) + { + /* + * must return here because we can't continue without full + * message header + */ + *processed = src_pos; + return ZPQ_INCOMPLETE_HEADER; + } -static void * -zlib_create_compressor(int level) -{ - int rc; - z_stream *c_stream = (z_stream *) malloc(sizeof(z_stream)); + msg_type = *((char *) src + src_pos); + memcpy(&msg_len, (char *) src + src_pos + 1, 4); + msg_len = pg_ntoh32(msg_len); + rc = zpq_toggle_compression(zpq, msg_type, msg_len); + if (rc) + { + return rc; + } + } - memset(c_stream, 0, sizeof(*c_stream)); - rc = deflateInit(c_stream, level); - if (rc != Z_OK) - { - free(c_stream); - return NULL; - } - return c_stream; + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (src_pos < src_size && zpq_buf_left(&zpq->tx_out)); + + return src_pos; } -static void * -zlib_create_decompressor() +ssize_t +zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed) { - int rc; - z_stream *d_stream = (z_stream *) malloc(sizeof(z_stream)); + size_t src_pos = 0; + ssize_t rc; - memset(d_stream, 0, sizeof(*d_stream)); - rc = inflateInit(d_stream); - if (rc != Z_OK) + while (zpq_buf_left(&zpq->tx_out)) { - free(d_stream); - return NULL; - } - return d_stream; -} + size_t copy_len = Min(zpq_buf_left(&zpq->tx_in), src_size - src_pos); -static ssize_t -zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) -{ - z_stream *ds = (z_stream *) d_stream; - int rc; + memcpy(zpq_buf_size(&zpq->tx_in), (char *) src + src_pos, copy_len); + zpq_buf_size_advance(&zpq->tx_in, copy_len); + src_pos += copy_len; - ds->next_in = (Bytef *) src; - ds->avail_in = src_size; - ds->next_out = (Bytef *) dst; - ds->avail_out = dst_size; + if (zpq_buf_unread(&zpq->tx_in) == 0 && !zs_buffered_tx(zpq->z_stream)) { + break; + } - rc = inflate(ds, Z_SYNC_FLUSH); - *src_processed = src_size - ds->avail_in; - *dst_processed = dst_size - ds->avail_out; + size_t processed = 0; - if (rc == Z_STREAM_END) - { - return ZPQ_STREAM_END; + rc = zpq_write_internal(zpq, zpq_buf_pos(&zpq->tx_in), zpq_buf_unread(&zpq->tx_in), &processed); + if (rc > 0) + { + zpq_buf_pos_advance(&zpq->tx_in, rc); + zpq_buf_reuse(&zpq->tx_in); + } + else + { + zpq_buf_pos_advance(&zpq->tx_in, processed); + zpq_buf_reuse(&zpq->tx_in); + if (rc == ZPQ_INCOMPLETE_HEADER) { + break; + } + *src_processed = src_pos; + return rc; + } } - if (rc != Z_OK && rc != Z_BUF_ERROR) + + /* + * call the tx_func if have any bytes to send + */ + while (zpq_buf_unread(&zpq->tx_out)) { - return ZPQ_DECOMPRESS_ERROR; + rc = zpq->tx_func(zpq->arg, zpq_buf_pos(&zpq->tx_out), zpq_buf_unread(&zpq->tx_out)); + if (rc > 0) + { + zpq_buf_pos_advance(&zpq->tx_out, rc); + } + else + { + *src_processed = src_pos; + zpq_buf_reuse(&zpq->tx_out); + return rc; + } } - return ZPQ_OK; + zpq_buf_reuse(&zpq->tx_out); + return src_pos; } -static ssize_t -zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) -{ - z_stream *cs = (z_stream *) c_stream; - int rc; - unsigned deflate_pending = 0; - - cs->next_out = (Bytef *) dst; - cs->avail_out = dst_size; - cs->next_in = (Bytef *) src; - cs->avail_in = src_size; - - rc = deflate(cs, Z_SYNC_FLUSH); - Assert(rc == Z_OK); - *dst_processed = dst_size - cs->avail_out; - *src_processed = src_size - cs->avail_in; - - deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left - * in deflate buffer */ - if (deflate_pending > 0) - { - return ZPQ_DATA_PENDING; - } - return ZPQ_OK; -} - -static void -zlib_free_compressor(void *c_stream) +/* Decompress bytes from RX buffer and write up to dst_len of uncompressed data to *dst. + * Returns: + * ZS_OK on success, + * ZS_STREAM_END if reached end of compressed chunk + * ZS_DECOMPRESS_ERROR if encountered a decompression error */ +static inline ssize_t +zpq_read_compressed_message(ZpqStream * zpq, char *dst, size_t dst_len, size_t *dst_processed) { - z_stream *cs = (z_stream *) c_stream; - - if (cs != NULL) - { - deflateEnd(cs); - free(cs); - } -} + size_t rx_processed = 0; + ssize_t rc; + size_t read_len = Min(zpq->rx_msg_bytes_left, zpq_buf_unread(&zpq->rx_in)); -static void -zlib_free_decompressor(void *d_stream) -{ - z_stream *ds = (z_stream *) d_stream; + rc = zs_read(zpq->z_stream, zpq_buf_pos(&zpq->rx_in), read_len, &rx_processed, + dst, dst_len, dst_processed); - if (ds != NULL) - { - inflateEnd(ds); - free(ds); - } + zpq_buf_pos_advance(&zpq->rx_in, rx_processed); + zpq->rx_total_raw += *dst_processed; + zpq->rx_msg_bytes_left -= rx_processed; + return rc; } -static char const * -zlib_error(void *stream) +/* Copy up to dst_len bytes from rx buffer to *dst. + * Returns amount of bytes copied. */ +static inline size_t +zpq_read_uncompressed(ZpqStream * zpq, char *dst, size_t dst_len) { - z_stream *zs = (z_stream *) stream; + Assert(zpq_buf_unread(&zpq->rx_in) > 0); + size_t copy_len = Min(zpq->rx_msg_bytes_left, Min(zpq_buf_unread(&zpq->rx_in), dst_len)); - return zs->msg; -} + memcpy(dst, zpq_buf_pos(&zpq->rx_in), copy_len); -static char const * -zlib_name(void) -{ - return "zlib"; + zpq_buf_pos_advance(&zpq->rx_in, copy_len); + zpq->rx_total_raw += copy_len; + zpq->rx_msg_bytes_left -= copy_len; + return copy_len; } -#endif - -static char const * -no_compression_name(void) +/* Determine if should decompress the next message and + * change the current decompression state */ +static inline void +zpq_toggle_decompression(ZpqStream * zpq) { - return NULL; -} + uint32 msg_len; + char msg_type = *zpq_buf_pos(&zpq->rx_in); -/* - * Array with all supported compression algorithms. - */ -static ZpqAlgorithm const zpq_algorithms[] = -{ -#if HAVE_LIBZSTD - {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error}, -#endif -#if HAVE_LIBZ - {zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error}, -#endif - {no_compression_name} -}; + zpq->is_decompressing = zpq_is_compressed_message(msg_type); -/* - * Index of used compression algorithm in zpq_algorithms array. - */ -ZpqStream * -zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) -{ - ZpqStream *zs = (ZpqStream *) malloc(sizeof(ZpqStream)); - - zs->tx_pos = 0; - zs->tx_size = 0; - zs->rx_pos = 0; - zs->rx_size = 0; - zs->tx_func = tx_func; - zs->rx_func = rx_func; - zs->arg = arg; - zs->tx_total = 0; - zs->tx_total_raw = 0; - zs->rx_total = 0; - zs->rx_total_raw = 0; - zs->tx_not_flushed = false; - zs->rx_not_flushed = false; - - zs->rx_size = rx_data_size; - Assert(rx_data_size < ZPQ_BUFFER_SIZE); - memcpy(zs->rx_buf, rx_data, rx_data_size); + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + 1, 4); + zpq->rx_msg_bytes_left = pg_ntoh32(msg_len) + 1; - zs->c_algorithm = &zpq_algorithms[c_alg_impl]; - zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); - if (zs->c_stream == NULL) - { - free(zs); - return NULL; - } - zs->d_algorithm = &zpq_algorithms[d_alg_impl]; - zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); - if (zs->d_stream == NULL) + if (zpq->is_decompressing) { - free(zs); - return NULL; + /* compressed message header is no longer needed, just skip it */ + zpq_buf_pos_advance(&zpq->rx_in, 5); + zpq->rx_msg_bytes_left -= 5; } - - return zs; } ssize_t -zpq_read(ZpqStream * zs, void *buf, size_t size) +zpq_read(ZpqStream * zpq, void *dst, size_t dst_size) { - size_t buf_pos = 0; - size_t rx_processed; - size_t buf_processed; + size_t dst_pos = 0; + size_t dst_processed = 0; ssize_t rc; - while (buf_pos == 0) - { /* Read until some data fetched */ - if (zs->rx_pos == zs->rx_size) - { - zs->rx_pos = zs->rx_size = 0; /* Reset rx buffer */ - } + /* Read until some data fetched */ + while (dst_pos == 0) + { + zpq_buf_reuse(&zpq->rx_in); - if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed) + if (!zpq_buffered_rx(zpq)) { - ssize_t rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size); - + rc = zpq->rx_func(zpq->arg, zpq_buf_size(&zpq->rx_in), zpq_buf_left(&zpq->rx_in)); if (rc > 0) /* read fetches some data */ { - zs->rx_size += rc; - zs->rx_total += rc; + zpq->rx_total += rc; + zpq_buf_size_advance(&zpq->rx_in, rc); } else /* read failed */ { @@ -525,160 +510,97 @@ zpq_read(ZpqStream * zs, void *buf, size_t size) } } - Assert(zs->rx_pos <= zs->rx_size); - rx_processed = 0; - buf_processed = 0; - rc = zs->d_algorithm->decompress(zs->d_stream, - (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed, - buf, size, &buf_processed); - - zs->rx_pos += rx_processed; - zs->rx_total_raw += rx_processed; - buf_pos += buf_processed; - zs->rx_not_flushed = false; - if (rc == ZPQ_STREAM_END) - { - break; - } - if (rc == ZPQ_DATA_PENDING) - { - zs->rx_not_flushed = true; - continue; - } - if (rc != ZPQ_OK) - { - return ZPQ_DECOMPRESS_ERROR; - } - } - return buf_pos; -} - -ssize_t -zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed) -{ - size_t buf_pos = 0; - - do - { - if (zs->tx_pos == zs->tx_size) /* Have nothing to send */ + /* + * try to read ahead the next message types and increase + * rx_msg_bytes_left, if possible + */ + while (zpq->rx_msg_bytes_left > 0 && (zpq_buf_unread(&zpq->rx_in) >= zpq->rx_msg_bytes_left + 5)) { - size_t tx_processed = 0; - size_t buf_processed = 0; - ssize_t rc; - - zs->tx_pos = zs->tx_size = 0; /* Reset pointer to the beginning of buffer */ + char msg_type; - rc = zs->c_algorithm->compress(zs->c_stream, - (char *) buf + buf_pos, size - buf_pos, &buf_processed, - (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed); - - zs->tx_size += tx_processed; - buf_pos += buf_processed; - zs->tx_total_raw += buf_processed; - zs->tx_not_flushed = false; - - if (rc == ZPQ_DATA_PENDING) - { - zs->tx_not_flushed = true; - continue; - } - if (rc != ZPQ_OK) + msg_type = *(zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left); + if (zpq->is_decompressing || zpq_is_compressed_message(msg_type)) { - *processed = buf_pos; - return ZPQ_COMPRESS_ERROR; + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; } - } - while (zs->tx_pos < zs->tx_size) - { - ssize_t rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos); + uint32 msg_len; - if (rc > 0) - { - zs->tx_pos += rc; - zs->tx_total += rc; - } - else - { - *processed = buf_pos; - return rc; - } + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left + 1, 4); + zpq->rx_msg_bytes_left += pg_ntoh32(msg_len) + 1; } - /* - * repeat sending while there is some data in input or internal - * compression buffer - */ - } while (buf_pos < size || zs->tx_not_flushed); - return buf_pos; -} - -void -zpq_free(ZpqStream * zs) -{ - if (zs) - { - if (zs->c_stream) + if (zpq->rx_msg_bytes_left > 0 || zs_buffered_rx(zpq->z_stream)) { - zs->c_algorithm->free_compressor(zs->c_stream); - } - if (zs->d_stream) - { - zs->d_algorithm->free_decompressor(zs->d_stream); + dst_processed = 0; + if (zpq->is_decompressing || zs_buffered_rx(zpq->z_stream)) + { + rc = zpq_read_compressed_message(zpq, dst, dst_size - dst_pos, &dst_processed); + dst_pos += dst_processed; + if (rc == ZS_STREAM_END) + { + continue; + } + if (rc != ZS_OK) + { + return rc; + } + } + else + dst_pos += zpq_read_uncompressed(zpq, dst, dst_size - dst_pos); } - free(zs); + else if (zpq_buf_unread(&zpq->rx_in) >= 5) + zpq_toggle_decompression(zpq); } + return dst_pos; } -char const * -zpq_compress_error(ZpqStream * zs) +bool +zpq_buffered_rx(ZpqStream * zpq) { - return zs->c_algorithm->compress_error(zs->c_stream); + return zpq ? zpq_buf_unread(&zpq->rx_in) >= 5 || (zpq_buf_unread(&zpq->rx_in) > 0 && zpq->rx_msg_bytes_left > 0) || zs_buffered_rx(zpq->z_stream) : 0; } -char const * -zpq_decompress_error(ZpqStream * zs) +bool +zpq_buffered_tx(ZpqStream * zpq) { - return zs->d_algorithm->decompress_error(zs->d_stream); + return zpq ? zpq_buf_unread(&zpq->tx_in) >= 5 || (zpq_buf_unread(&zpq->tx_in) > 0 && zpq->tx_msg_bytes_left > 0) || zpq_buf_unread(&zpq->tx_out) > 0 || + zs_buffered_tx(zpq->z_stream) : 0; } -size_t -zpq_buffered_rx(ZpqStream * zs) -{ - return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0; -} -size_t -zpq_buffered_tx(ZpqStream * zs) -{ - return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0; -} -/* - * Get list of the supported algorithms. - */ -char ** -zpq_get_supported_algorithms(void) +void +zpq_free(ZpqStream * zpq) { - size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); - char **algorithm_names = malloc(n_algorithms * sizeof(char *)); - - for (size_t i = 0; i < n_algorithms; i++) + if (zpq) { - algorithm_names[i] = (char *) zpq_algorithms[i].name(); + if (zpq->z_stream) + { + zs_free(zpq->z_stream); + } + free(zpq); } +} - return algorithm_names; +char const * +zpq_compress_error(ZpqStream * zpq) +{ + return zs_compress_error(zpq->z_stream); } char const * -zpq_compress_algorithm_name(ZpqStream * zs) +zpq_decompress_error(ZpqStream * zpq) { - return zs ? zs->c_algorithm->name() : NULL; + return zs_decompress_error(zpq->z_stream); } char const * -zpq_decompress_algorithm_name(ZpqStream * zs) +zpq_compress_algorithm_name(ZpqStream * zpq) { - return zs ? zs->d_algorithm->name() : NULL; + return zs_compress_algorithm_name(zpq->z_stream); } diff --git a/src/include/common/z_stream.h b/src/include/common/z_stream.h new file mode 100644 index 00000000000..a19ea90ca50 --- /dev/null +++ b/src/include/common/z_stream.h @@ -0,0 +1,90 @@ +/* + * z_stream.h + * Streaming compression + */ + + +#ifndef Z_STREAM_H +#define Z_STREAM_H + +#include + +#define ZS_OK (0) +#define ZS_IO_ERROR (-1) +#define ZS_DECOMPRESS_ERROR (-2) +#define ZS_COMPRESS_ERROR (-3) +#define ZS_STREAM_END (-4) +#define ZS_DATA_PENDING (-5) + +struct ZStream; +typedef struct ZStream ZStream; + +#endif + +/* + * Create compression stream with rx/tx function for reading/sending compressed data. + * c_alg_impl: index of chosen compression algorithm + * c_level: compression c_level + * d_alg_impl: index of chosen decompression algorithm + */ +extern ZStream * zs_create(int c_alg_impl, int c_level, int d_alg_impl); + +/* + * Read up to "size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZS_DECOMPRESS_ERROR or error code returned by the rx function. + */ +extern ssize_t zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZS_COMPRESS_ERROR or error code returned by the tx function. + * In the last case number of bytes written is stored in *processed. + */ +extern ssize_t zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Get decompressor error message. + */ +extern char const *zs_decompress_error(ZStream * zs); + +/* + * Get compressor error message. + */ +extern char const *zs_compress_error(ZStream * zs); + +/* + * Return true if non-flushed data might left in internal rx decompression buffer. + */ +extern bool zs_buffered_rx(ZStream * zs); + +/* + * Return true if non-flushed data might left in internal tx compression buffer. + */ +extern bool zs_buffered_tx(ZStream * zs); + +/* + * End the compression stream. + */ +extern ssize_t zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Free stream created by zs_create function. + */ +extern void zs_free(ZStream * zs); + +/* + * Get the name of chosen compression algorithm. + */ +extern char const *zs_compress_algorithm_name(ZStream * zs); + +/* + * Get the name of chosen decompression algorithm. + */ +extern char const *zs_decompress_algorithm_name(ZStream * zs); + +/* + Returns zero terminated array with compression algorithms names +*/ +extern char **zs_get_supported_algorithms(void); diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h index e48b5ac9303..690ad16c0e0 100644 --- a/src/include/common/zpq_stream.h +++ b/src/include/common/zpq_stream.h @@ -3,26 +3,25 @@ * Streaming compression for libpq */ +#include "z_stream.h" + #ifndef ZPQ_STREAM_H #define ZPQ_STREAM_H #include -#define ZPQ_OK (0) -#define ZPQ_IO_ERROR (-1) -#define ZPQ_DECOMPRESS_ERROR (-2) -#define ZPQ_COMPRESS_ERROR (-3) -#define ZPQ_STREAM_END (-4) -#define ZPQ_DATA_PENDING (-5) #define ZPQ_DEFAULT_COMPRESSION_LEVEL (1) - +#define ZPQ_INCOMPLETE_HEADER (-6) struct ZpqStream; typedef struct ZpqStream ZpqStream; typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size); typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); + +#endif + /* * Create compression stream with rx/tx function for reading/sending compressed data. * c_alg_impl: index of chosen compression algorithm @@ -34,61 +33,49 @@ typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); * rx_data: received data (compressed data already fetched from input stream) * rx_data_size: size of data fetched from input stream */ -extern ZpqStream *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); +extern ZpqStream * zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); /* - * Read up to "size" raw (decompressed) bytes. - * Returns number of decompressed bytes or error code. - * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. - */ -extern ssize_t zpq_read(ZpqStream * zs, void *buf, size_t size); - -/* - * Write up to "size" raw (decompressed) bytes. + * Write up to "src_size" raw (decompressed) bytes. * Returns number of written raw bytes or error code. * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. - * In the last case number of bytes written is stored in *processed. + * In the last case number of bytes written is stored in *src_processed. */ -extern ssize_t zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed); +extern ssize_t zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed); /* - * Get decompressor error message. + * Read up to "dst_size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. */ -extern char const *zpq_decompress_error(ZpqStream * zs); +extern ssize_t zpq_read(ZpqStream * zpq, void *dst, size_t dst_size); /* - * Get compressor error message. + * Return true if non-flushed data left in internal rx decompression buffer. */ -extern char const *zpq_compress_error(ZpqStream * zs); +extern bool zpq_buffered_rx(ZpqStream * zpq); /* - * Return an estimated amount of data in internal rx decompression buffer. + * Return true if non-flushed data left in internal tx compression buffer. */ -extern size_t zpq_buffered_rx(ZpqStream * zs); +extern bool zpq_buffered_tx(ZpqStream * zpq); /* - * Return an estimated amount of data in internal tx compression buffer. + * Free stream created by zs_create function. */ -extern size_t zpq_buffered_tx(ZpqStream * zs); +extern void zpq_free(ZpqStream * zpq); /* - * Free stream created by zpq_create function. + * Get decompressor error message. */ -extern void zpq_free(ZpqStream * zs); +extern char const *zpq_decompress_error(ZpqStream * zpq); /* - * Get the name of chosen compression algorithm. + * Get compressor error message. */ -extern char const *zpq_compress_algorithm_name(ZpqStream * zs); +extern char const *zpq_compress_error(ZpqStream * zpq); /* - * Get the name of chosen decompression algorithm. + * Get the name of chosen compression algorithm. */ -extern char const *zpq_decompress_algorithm_name(ZpqStream * zs); - -/* - Returns zero terminated array with compression algorithms names -*/ -extern char **zpq_get_supported_algorithms(void); - -#endif +extern char const *zpq_compress_algorithm_name(ZpqStream * zpq); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index bbc1f904816..79a4cfbaa74 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -179,3 +179,4 @@ PQgetgssctx 176 PQsetSSLKeyPassHook_OpenSSL 177 PQgetSSLKeyPassHook_OpenSSL 178 PQdefaultSSLKeyPassHook_OpenSSL 179 +PQreadPending 180 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 3c704ba25b1..c2e33309757 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -464,8 +464,8 @@ void pqDropConnection(PGconn *conn, bool flushInput) { /* Release compression streams */ - zpq_free(conn->zstream); - conn->zstream = NULL; + zpq_free(conn->zpqStream); + conn->zpqStream = NULL; /* Drop any SSL state */ pqsecure_close(conn); @@ -2157,12 +2157,6 @@ connectDBComplete(PGconn *conn) return 1; /* success! */ case PGRES_POLLING_READING: - /* if there is some buffered RX data in ZpqStream - * then don't proceed to pqWaitTimed */ - if (zpq_buffered_rx(conn->zstream)) { - break; - } - ret = pqWaitTimed(1, 0, conn, finish_time); if (ret == -1) { @@ -3240,10 +3234,11 @@ PQconnectPoll(PGconn *conn) return PGRES_POLLING_READING; } - if (beresp == 'z') /* Switch on compression */ + if (beresp == 'z') /* Switch on compression */ { - int index; - char resp; + int index; + char resp; + /* Read message length word */ if (pqGetInt(&msgLength, 4, conn)) { @@ -3254,40 +3249,43 @@ PQconnectPoll(PGconn *conn) { appendPQExpBuffer(&conn->errorMessage, libpq_gettext( - "expected compression algorithm specification message length is 5 bytes, but %d is received\n"), + "expected compression algorithm specification message length is 5 bytes, but %d is received\n"), msgLength); goto error_return; } pqGetc(&resp, conn); index = resp; - if (index == (char)-1) + if (index == (char) -1) { appendPQExpBuffer(&conn->errorMessage, libpq_gettext( - "server does not support requested compression algorithms %s\n"), + "server does not support requested compression algorithms %s\n"), conn->compression); goto error_return; } - if ((unsigned)index >= conn->n_compressors) + if ((unsigned) index >= conn->n_compressors) { appendPQExpBuffer(&conn->errorMessage, libpq_gettext( - "server returns incorrect compression aslogirhm index: %d\n"), + "server returns incorrect compression aslogirhm index: %d\n"), index); goto error_return; } - Assert(!conn->zstream); - conn->zstream = zpq_create(conn->compressors[index].impl, - conn->compressors[index].level, - conn->compressors[index].impl, - (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn, - &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor); - if (!conn->zstream) + Assert(!conn->zpqStream); + conn->zpqStream = zpq_create(conn->compressors[index].impl, + conn->compressors[index].level, + conn->compressors[index].impl, + (zpq_tx_func) pqsecure_write, (zpq_rx_func) pqsecure_read, + conn, + &conn->inBuffer[conn->inCursor], + conn->inEnd - conn->inCursor); + if (!conn->zpqStream) { - char** supported_algorithms = zpq_get_supported_algorithms(); + char **supported_algorithms = zs_get_supported_algorithms(); + appendPQExpBuffer(&conn->errorMessage, libpq_gettext( - "failed to initialize compressor %s\n"), + "failed to initialize compressor %s\n"), supported_algorithms[conn->compressors[index].impl]); free(supported_algorithms); goto error_return; @@ -3295,13 +3293,15 @@ PQconnectPoll(PGconn *conn) /* reset buffer */ conn->inStart = conn->inCursor = conn->inEnd = 0; } - else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol version */ + else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol + * version */ { appendPQExpBuffer(&conn->errorMessage, libpq_gettext( - "server is not supporting libpq compression\n")); + "server is not supporting libpq compression\n")); goto error_return; - } else + } + else break; } diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 00ccefbac0e..d053c1f302c 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1810,9 +1810,7 @@ PQgetResult(PGconn *conn) * EOF indication. We expect therefore that this won't result in any * undue delay in reporting a previous write failure.) */ - if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 && - pqWait(true, false, conn)) || - pqReadData(conn) < 0) + if (flushResult || pqWait(true, false, conn) || pqReadData(conn) < 0) { /* * conn->errorMessage has been set by pqWait or pqReadData. We @@ -3285,6 +3283,12 @@ PQflush(PGconn *conn) return pqFlush(conn); } +int +PQreadPending(PGconn *conn) +{ + return pqReadPending(conn); +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index c9bae6531df..c93ede21218 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -65,8 +65,8 @@ static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); * Use zpq_read if compression is switched on */ #define pq_read_conn(conn) \ - (conn->zstream \ - ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd, \ + (conn->zpqStream \ + ? zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd, \ conn->inBufSize - conn->inEnd) \ : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \ conn->inBufSize - conn->inEnd)) @@ -679,11 +679,11 @@ pqReadData(PGconn *conn) nread = pq_read_conn(conn); if (nread < 0) { - if (nread == ZPQ_DECOMPRESS_ERROR) + if (nread == ZS_DECOMPRESS_ERROR) { printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("decompress error: %s\n"), - zpq_decompress_error(conn->zstream)); + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); return -1; } @@ -782,11 +782,11 @@ pqReadData(PGconn *conn) if (nread < 0) { - if (nread == ZPQ_DECOMPRESS_ERROR) + if (nread == ZS_DECOMPRESS_ERROR) { printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("decompress error: %s\n"), - zpq_decompress_error(conn->zstream)); + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); return -1; } @@ -902,15 +902,16 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0 || zpq_buffered_tx(conn->zstream)) + while (len > 0 || zpq_buffered_tx(conn->zpqStream)) { int sent; - size_t processed = 0; - /* + size_t processed = 0; + + /* * Use zpq_write if compression is switched on */ - sent = conn->zstream - ? zpq_write(conn->zstream, ptr, len, &processed) + sent = conn->zpqStream + ? zpq_write(conn->zpqStream, ptr, len, &processed) #ifndef WIN32 : pqsecure_write(conn, ptr, len); #else @@ -978,7 +979,7 @@ pqSendSome(PGconn *conn, int len) remaining -= sent; } - if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream)) + if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zpqStream)) { /* * We didn't send it all, wait till we can send more. @@ -1128,6 +1129,9 @@ pqWriteReady(PGconn *conn) * * If SSL is in use, the SSL buffer is checked prior to checking the socket * for read data directly. + * + * If ZPQ stream is in use, the ZPQ buffer is checked prior to checking + * the socket for read data directly. */ static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) @@ -1143,14 +1147,10 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return -1; } -#ifdef USE_SSL - /* Check for SSL library buffering read bytes */ - if (forRead && conn->ssl_in_use && pgtls_read_pending(conn)) + if (forRead && (pqReadPending(conn) > 0)) { - /* short-circuit the select */ return 1; } -#endif /* We will retry as long as we get EINTR */ do @@ -1169,6 +1169,33 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return result; } +/* + * Check if there is some data pending in ZPQ / SSL read buffers. + * Returns -1 on failure, 0 if no, 1 if yes. + */ +int +pqReadPending(PGconn *conn) +{ + if (!conn) + return -1; + + /* check for ZPQ stream buffered read bytes */ + if (zpq_buffered_rx(conn->zpqStream)) + { + /* short-circuit the select */ + return 1; + } + +#ifdef USE_SSL + /* Check for SSL library buffering read bytes */ + if (conn->ssl_in_use && pgtls_read_pending(conn)) + { + /* short-circuit the select */ + return 1; + } +#endif + return 0; +} /* * Check a file descriptor for read and/or write data, possibly waiting. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index fe4eaca032f..f85571a375d 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) if (async) return 0; /* Need to load more data */ - if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) || + if (pqWait(true, false, conn) || pqReadData(conn) < 0) return -2; continue; @@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen) while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0) { /* need to load more data */ - if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) || + if (pqWait(true, false, conn) || pqReadData(conn) < 0) { *s = '\0'; @@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid, if (needInput) { /* Wait for some data to arrive (or for the channel to close) */ - if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) || + if (pqWait(true, false, conn) || pqReadData(conn) < 0) break; } @@ -2146,17 +2146,17 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen, * and is used during handshake when a compression acknowledgment response is received from the server. */ static bool -build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors) +build_compressors_list(PGconn *conn, char **client_compressors, bool build_descriptors) { - char** supported_algorithms = zpq_get_supported_algorithms(); - char* value = conn->compression; - int n_supported_algorithms; - int total_len = 0; - int i; + char **supported_algorithms = zs_get_supported_algorithms(); + char *value = conn->compression; + int n_supported_algorithms; + int total_len = 0; + int i; for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++) { - total_len += strlen(supported_algorithms[n_supported_algorithms])+1; + total_len += strlen(supported_algorithms[n_supported_algorithms]) + 1; } if (pg_strcasecmp(value, "true") == 0 || @@ -2166,7 +2166,7 @@ build_compressors_list(PGconn *conn, char** client_compressors, bool build_descr pg_strcasecmp(value, "1") == 0) { /* Compression is enabled: choose algorithm automatically */ - char* p; + char *p; if (n_supported_algorithms == 0) { @@ -2177,7 +2177,7 @@ build_compressors_list(PGconn *conn, char** client_compressors, bool build_descr } *client_compressors = p = malloc(total_len); if (build_descriptors) - conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor)); + conn->compressors = malloc(n_supported_algorithms * sizeof(pg_conn_compressor)); for (i = 0; i < n_supported_algorithms; i++) { strcpy(p, supported_algorithms[i]); @@ -2207,20 +2207,22 @@ build_compressors_list(PGconn *conn, char** client_compressors, bool build_descr else { /* List of compression algorithms separated by commas */ - char *src, *dst; - int n_suggested_algorithms = 0; - char* suggested_algorithms = strdup(value); + char *src, + *dst; + int n_suggested_algorithms = 0; + char *suggested_algorithms = strdup(value); + src = suggested_algorithms; *client_compressors = dst = strdup(value); if (build_descriptors) - conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor)); + conn->compressors = malloc(n_supported_algorithms * sizeof(pg_conn_compressor)); while (*src != '\0') { - char* sep = strchr(src, ','); - char* col; - int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + char *sep = strchr(src, ','); + char *col; + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; if (sep != NULL) *sep = '\0'; @@ -2231,11 +2233,11 @@ build_compressors_list(PGconn *conn, char** client_compressors, bool build_descr if (col != NULL) { *col = '\0'; - if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors) + if (sscanf(col + 1, "%d", &compression_level) != 1 && !build_descriptors) { fprintf(stderr, libpq_gettext("WARNING: invalid compression level %s in compression option '%s'\n"), - col+1, value); + col + 1, value); return false; } } @@ -2255,7 +2257,7 @@ build_compressors_list(PGconn *conn, char** client_compressors, bool build_descr } } if (sep) - src = sep+1; + src = sep + 1; else break; } @@ -2330,8 +2332,9 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("options", conn->pgoptions); if (conn->compression && conn->compression[0]) { - char* client_compression_algorithms; - if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL)) + char *client_compression_algorithms; + + if (build_compressors_list((PGconn *) conn, &client_compression_algorithms, packet == NULL)) { if (client_compression_algorithms) { diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 3b6a9fbce35..5047b1bccd4 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -461,6 +461,9 @@ extern PGPing PQpingParams(const char *const *keywords, /* Force the write buffer to be written (or at least try) */ extern int PQflush(PGconn *conn); +extern int + PQreadPending(PGconn *conn); + /* * "Fast path" interface --- not really recommended for application * use diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 7406857fa19..3e353c9dbaf 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -386,7 +386,7 @@ struct pg_conn * comma) */ pg_conn_compressor *compressors; /* descriptors of compression * algorithms chosen by client */ - unsigned n_compressors; /* size of compressors array */ + unsigned n_compressors; /* size of compressors array */ /* Type of connection to make. Possible values: any, read-write. */ char *target_session_attrs; @@ -547,7 +547,7 @@ struct pg_conn PQExpBufferData workBuffer; /* expansible string */ /* Compression stream */ - ZpqStream *zstream; + ZpqStream *zpqStream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this @@ -695,6 +695,7 @@ extern int pqWaitTimed(int forRead, int forWrite, PGconn *conn, time_t finish_time); extern int pqReadReady(PGconn *conn); extern int pqWriteReady(PGconn *conn); +extern int pqReadPending(PGconn *conn); /* === in fe-secure.c === */ diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index daac58c662f..c52d4fd3c4a 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -123,7 +123,7 @@ sub mkvcbuild config_info.c controldata_utils.c d2s.c encnames.c exec.c f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c keywords.c kwlookup.c link-canary.c md5.c - pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c + pg_get_line.c z_stream.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c wait_error.c wchar.c);