53
53
#include "storage/bufmgr.h"
54
54
#include "storage/fd.h"
55
55
#include "utils/resowner.h"
56
+ #include "utils/memutils.h"
57
+
58
+ #include "common/pg_lzcompress.h"
59
+ #ifdef USE_LZ4
60
+ #include <lz4.h>
61
+ #endif
62
+
63
+ #define NO_LZ4_SUPPORT () \
64
+ ereport(ERROR, \
65
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
66
+ errmsg("compression method lz4 not supported"), \
67
+ errdetail("This functionality requires the server to be built with lz4 support.")))
56
68
57
69
/*
58
70
* We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
62
74
#define MAX_PHYSICAL_FILESIZE 0x40000000
63
75
#define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ)
64
76
77
+ int temp_file_compression = TEMP_NONE_COMPRESSION ;
78
+
65
79
/*
66
80
* This data structure represents a buffered file that consists of one or
67
81
* more physical files (each accessed through a virtual file descriptor
@@ -95,7 +109,8 @@ struct BufFile
95
109
off_t curOffset ; /* offset part of current pos */
96
110
int pos ; /* next read/write position in buffer */
97
111
int nbytes ; /* total # of valid bytes in buffer */
98
-
112
+ bool compress ; /* State of usege file compression */
113
+ char * cBuffer ;
99
114
/*
100
115
* XXX Should ideally use PGIOAlignedBlock, but might need a way to avoid
101
116
* wasting per-file alignment padding when some users create many files.
@@ -127,6 +142,8 @@ makeBufFileCommon(int nfiles)
127
142
file -> curOffset = 0 ;
128
143
file -> pos = 0 ;
129
144
file -> nbytes = 0 ;
145
+ file -> compress = false;
146
+ file -> cBuffer = NULL ;
130
147
131
148
return file ;
132
149
}
@@ -188,9 +205,17 @@ extendBufFile(BufFile *file)
188
205
* Note: if interXact is true, the caller had better be calling us in a
189
206
* memory context, and with a resource owner, that will survive across
190
207
* transaction boundaries.
208
+ *
209
+ * If compress is true the temporary files will be compressed before
210
+ * writing on disk.
211
+ *
212
+ * Note: The compression does not support random access. Only the hash joins
213
+ * use it for now. The seek operation other than seek to the beginning of the
214
+ * buffile will corrupt temporary data offsets.
215
+ *
191
216
*/
192
217
BufFile *
193
- BufFileCreateTemp (bool interXact )
218
+ BufFileCreateTemp (bool interXact , bool compress )
194
219
{
195
220
BufFile * file ;
196
221
File pfile ;
@@ -212,9 +237,47 @@ BufFileCreateTemp(bool interXact)
212
237
file = makeBufFile (pfile );
213
238
file -> isInterXact = interXact ;
214
239
240
+ if (temp_file_compression != TEMP_NONE_COMPRESSION )
241
+ {
242
+ file -> compress = compress ;
243
+ }
244
+
215
245
return file ;
246
+
216
247
}
248
+ /*
249
+ * Wrapper for BuffileCreateTemp
250
+ * We want to limit the number of memory allocations for the compression buffer,
251
+ * only one buffer for all compression operations is enough
252
+ */
253
+ BufFile *
254
+ BufFileCreateCompressTemp (bool interXact ){
255
+ static char * buff = NULL ;
256
+ BufFile * tmpBufFile = BufFileCreateTemp (interXact , true);
217
257
258
+ if (buff == NULL && temp_file_compression != TEMP_NONE_COMPRESSION )
259
+ {
260
+ int size = 0 ;
261
+
262
+ switch (temp_file_compression )
263
+ {
264
+ case TEMP_LZ4_COMPRESSION :
265
+ #ifdef USE_LZ4
266
+ size = LZ4_compressBound (BLCKSZ )+ sizeof (int );
267
+ #endif
268
+ break ;
269
+ case TEMP_PGLZ_COMPRESSION :
270
+ size = pglz_maximum_compressed_size (BLCKSZ , BLCKSZ )+ sizeof (int );
271
+ break ;
272
+ }
273
+ /*
274
+ * Persistent buffer for all temporary file compressions
275
+ */
276
+ buff = MemoryContextAlloc (TopMemoryContext , size );
277
+ }
278
+ tmpBufFile -> cBuffer = buff ;
279
+ return tmpBufFile ;
280
+ }
218
281
/*
219
282
* Build the name for a given segment of a given BufFile.
220
283
*/
@@ -275,6 +338,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
275
338
file -> files [0 ] = MakeNewFileSetSegment (file , 0 );
276
339
file -> readOnly = false;
277
340
341
+
278
342
return file ;
279
343
}
280
344
@@ -457,11 +521,75 @@ BufFileLoadBuffer(BufFile *file)
457
521
/*
458
522
* Read whatever we can get, up to a full bufferload.
459
523
*/
460
- file -> nbytes = FileRead (thisfile ,
524
+ if (!file -> compress )
525
+ {
526
+
527
+ /*
528
+ * Read whatever we can get, up to a full bufferload.
529
+ */
530
+ file -> nbytes = FileRead (thisfile ,
461
531
file -> buffer .data ,
462
- sizeof (file -> buffer .data ),
532
+ sizeof (file -> buffer ),
533
+ file -> curOffset ,
534
+ WAIT_EVENT_BUFFILE_READ );
535
+ /*
536
+ * Read and decompress data from the temporary file
537
+ * The first reading loads size of the compressed block
538
+ * Second reading loads compressed data
539
+ */
540
+ } else {
541
+ int nread ;
542
+ int nbytes ;
543
+
544
+ nread = FileRead (thisfile ,
545
+ & nbytes ,
546
+ sizeof (nbytes ),
547
+ file -> curOffset ,
548
+ WAIT_EVENT_BUFFILE_READ );
549
+ /* if not EOF let's continue */
550
+ if (nread > 0 )
551
+ {
552
+ /* A long life buffer limits number of memory allocations */
553
+ char * buff = file -> cBuffer ;
554
+
555
+ Assert (file -> cBuffer != NULL );
556
+ /*
557
+ * Read compressed data, curOffset differs with pos
558
+ * It reads less data than it returns to caller
559
+ * So the curOffset must be advanced here based on compressed size
560
+ */
561
+ file -> curOffset += sizeof (nbytes );
562
+
563
+ nread = FileRead (thisfile ,
564
+ buff ,
565
+ nbytes ,
463
566
file -> curOffset ,
464
567
WAIT_EVENT_BUFFILE_READ );
568
+
569
+ switch (temp_file_compression )
570
+ {
571
+ case TEMP_LZ4_COMPRESSION :
572
+ #ifdef USE_LZ4
573
+ file -> nbytes = LZ4_decompress_safe (buff ,
574
+ file -> buffer .data ,nbytes ,sizeof (file -> buffer ));
575
+ #endif
576
+ break ;
577
+
578
+ case TEMP_PGLZ_COMPRESSION :
579
+ file -> nbytes = pglz_decompress (buff ,nbytes ,
580
+ file -> buffer .data ,sizeof (file -> buffer ),false);
581
+ break ;
582
+ }
583
+ file -> curOffset += nread ;
584
+
585
+ if (file -> nbytes < 0 )
586
+ ereport (ERROR ,
587
+ (errcode (ERRCODE_DATA_CORRUPTED ),
588
+ errmsg_internal ("compressed lz4 data is corrupt" )));
589
+ }
590
+
591
+ }
592
+
465
593
if (file -> nbytes < 0 )
466
594
{
467
595
file -> nbytes = 0 ;
@@ -494,9 +622,61 @@ static void
494
622
BufFileDumpBuffer (BufFile * file )
495
623
{
496
624
int wpos = 0 ;
497
- int bytestowrite ;
625
+ int bytestowrite = 0 ;
498
626
File thisfile ;
499
627
628
+
629
+ /* Save nbytes value because the size changes due to compression */
630
+ int nbytesOriginal = file -> nbytes ;
631
+
632
+ char * DataToWrite ;
633
+ DataToWrite = file -> buffer .data ;
634
+
635
+ /*
636
+ * Prepare compressed data to write
637
+ * size of compressed block needs to be added at the beggining of the
638
+ * compressed data
639
+ */
640
+
641
+
642
+ if (file -> compress ) {
643
+ char * cData ;
644
+ int cSize = 0 ;
645
+
646
+ Assert (file -> cBuffer != NULL );
647
+ cData = file -> cBuffer ;
648
+
649
+ switch (temp_file_compression )
650
+ {
651
+ case TEMP_LZ4_COMPRESSION :
652
+ {
653
+ #ifdef USE_LZ4
654
+ int cBufferSize = LZ4_compressBound (file -> nbytes );
655
+ /*
656
+ * Using stream compression would lead to the slight improvement in
657
+ * compression ratio
658
+ */
659
+ cSize = LZ4_compress_default (file -> buffer .data ,
660
+ cData + sizeof (int ),file -> nbytes , cBufferSize );
661
+ #endif
662
+ break ;
663
+ }
664
+ case TEMP_PGLZ_COMPRESSION :
665
+ cSize = pglz_compress (file -> buffer .data ,file -> nbytes ,
666
+ cData + sizeof (int ),PGLZ_strategy_always );
667
+ break ;
668
+ }
669
+
670
+
671
+ /* Write size of compressed block in front of compressed data
672
+ * It's used to determine amount of data to read within
673
+ * decompression process
674
+ */
675
+ memcpy (cData ,& cSize ,sizeof (int ));
676
+ file -> nbytes = cSize + sizeof (int );
677
+ DataToWrite = cData ;
678
+ }
679
+
500
680
/*
501
681
* Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
502
682
* crosses a component-file boundary; so we need a loop.
@@ -535,7 +715,7 @@ BufFileDumpBuffer(BufFile *file)
535
715
INSTR_TIME_SET_ZERO (io_start );
536
716
537
717
bytestowrite = FileWrite (thisfile ,
538
- file -> buffer . data + wpos ,
718
+ DataToWrite + wpos ,
539
719
bytestowrite ,
540
720
file -> curOffset ,
541
721
WAIT_EVENT_BUFFILE_WRITE );
@@ -564,7 +744,19 @@ BufFileDumpBuffer(BufFile *file)
564
744
* logical file position, ie, original value + pos, in case that is less
565
745
* (as could happen due to a small backwards seek in a dirty buffer!)
566
746
*/
567
- file -> curOffset -= (file -> nbytes - file -> pos );
747
+
748
+
749
+ if (!file -> compress )
750
+ file -> curOffset -= (file -> nbytes - file -> pos );
751
+ else
752
+ if (nbytesOriginal - file -> pos != 0 )
753
+ /* curOffset must be corrected also if compression is
754
+ * enabled, nbytes was changed by compression but we
755
+ * have to use the original value of nbytes
756
+ */
757
+ file -> curOffset -= bytestowrite ;
758
+
759
+
568
760
if (file -> curOffset < 0 ) /* handle possible segment crossing */
569
761
{
570
762
file -> curFile -- ;
@@ -577,6 +769,7 @@ BufFileDumpBuffer(BufFile *file)
577
769
*/
578
770
file -> pos = 0 ;
579
771
file -> nbytes = 0 ;
772
+
580
773
}
581
774
582
775
/*
@@ -602,8 +795,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
602
795
{
603
796
if (file -> pos >= file -> nbytes )
604
797
{
605
- /* Try to load more data into buffer. */
606
- file -> curOffset += file -> pos ;
798
+ /* Try to load more data into buffer.
799
+ *
800
+ * curOffset is moved within BufFileLoadBuffer
801
+ * because stored data size differs from loaded/
802
+ * decompressed size
803
+ * */
804
+ if (!file -> compress )
805
+ file -> curOffset += file -> pos ;
607
806
file -> pos = 0 ;
608
807
file -> nbytes = 0 ;
609
808
BufFileLoadBuffer (file );
0 commit comments