|
41 | 41 | #include "storage/latch.h"
|
42 | 42 | #include "storage/lwlock.h"
|
43 | 43 | #include "storage/procsignal.h"
|
| 44 | +#include "storage/read_stream.h" |
44 | 45 | #include "storage/smgr.h"
|
45 | 46 | #include "tcop/tcopprot.h"
|
46 | 47 | #include "utils/guc.h"
|
@@ -75,6 +76,28 @@ typedef struct AutoPrewarmSharedState
|
75 | 76 | int prewarmed_blocks;
|
76 | 77 | } AutoPrewarmSharedState;
|
77 | 78 |
|
| 79 | +/* |
| 80 | + * Private data passed through the read stream API for our use in the |
| 81 | + * callback. |
| 82 | + */ |
| 83 | +typedef struct AutoPrewarmReadStreamData |
| 84 | +{ |
| 85 | + /* The array of records containing the blocks we should prewarm. */ |
| 86 | + BlockInfoRecord *block_info; |
| 87 | + |
| 88 | + /* |
| 89 | + * pos is the read stream callback's index into block_info. Because the |
| 90 | + * read stream may read ahead, pos is likely to be ahead of the index in |
| 91 | + * the main loop in autoprewarm_database_main(). |
| 92 | + */ |
| 93 | + int pos; |
| 94 | + Oid tablespace; |
| 95 | + RelFileNumber filenumber; |
| 96 | + ForkNumber forknum; |
| 97 | + BlockNumber nblocks; |
| 98 | +} AutoPrewarmReadStreamData; |
| 99 | + |
| 100 | + |
78 | 101 | PGDLLEXPORT void autoprewarm_main(Datum main_arg);
|
79 | 102 | PGDLLEXPORT void autoprewarm_database_main(Datum main_arg);
|
80 | 103 |
|
@@ -422,6 +445,54 @@ apw_load_buffers(void)
|
422 | 445 | apw_state->prewarmed_blocks, num_elements)));
|
423 | 446 | }
|
424 | 447 |
|
| 448 | +/* |
| 449 | + * Return the next block number of a specific relation and fork to read |
| 450 | + * according to the array of BlockInfoRecord. |
| 451 | + */ |
| 452 | +static BlockNumber |
| 453 | +apw_read_stream_next_block(ReadStream *stream, |
| 454 | + void *callback_private_data, |
| 455 | + void *per_buffer_data) |
| 456 | +{ |
| 457 | + AutoPrewarmReadStreamData *p = callback_private_data; |
| 458 | + |
| 459 | + CHECK_FOR_INTERRUPTS(); |
| 460 | + |
| 461 | + while (p->pos < apw_state->prewarm_stop_idx) |
| 462 | + { |
| 463 | + BlockInfoRecord blk = p->block_info[p->pos]; |
| 464 | + |
| 465 | + if (!have_free_buffer()) |
| 466 | + { |
| 467 | + p->pos = apw_state->prewarm_stop_idx; |
| 468 | + return InvalidBlockNumber; |
| 469 | + } |
| 470 | + |
| 471 | + if (blk.tablespace != p->tablespace) |
| 472 | + return InvalidBlockNumber; |
| 473 | + |
| 474 | + if (blk.filenumber != p->filenumber) |
| 475 | + return InvalidBlockNumber; |
| 476 | + |
| 477 | + if (blk.forknum != p->forknum) |
| 478 | + return InvalidBlockNumber; |
| 479 | + |
| 480 | + p->pos++; |
| 481 | + |
| 482 | + /* |
| 483 | + * Check whether blocknum is valid and within fork file size. |
| 484 | + * Fast-forward through any invalid blocks. We want p->pos to reflect |
| 485 | + * the location of the next relation or fork before ending the stream. |
| 486 | + */ |
| 487 | + if (blk.blocknum >= p->nblocks) |
| 488 | + continue; |
| 489 | + |
| 490 | + return blk.blocknum; |
| 491 | + } |
| 492 | + |
| 493 | + return InvalidBlockNumber; |
| 494 | +} |
| 495 | + |
425 | 496 | /*
|
426 | 497 | * Prewarm all blocks for one database (and possibly also global objects, if
|
427 | 498 | * those got grouped with this database).
|
@@ -462,8 +533,6 @@ autoprewarm_database_main(Datum main_arg)
|
462 | 533 | Oid reloid;
|
463 | 534 | Relation rel;
|
464 | 535 |
|
465 |
| - CHECK_FOR_INTERRUPTS(); |
466 |
| - |
467 | 536 | /*
|
468 | 537 | * All blocks between prewarm_start_idx and prewarm_stop_idx should
|
469 | 538 | * belong either to global objects or the same database.
|
@@ -510,6 +579,8 @@ autoprewarm_database_main(Datum main_arg)
|
510 | 579 | {
|
511 | 580 | ForkNumber forknum = blk.forknum;
|
512 | 581 | BlockNumber nblocks;
|
| 582 | + struct AutoPrewarmReadStreamData p; |
| 583 | + ReadStream *stream; |
513 | 584 | Buffer buf;
|
514 | 585 |
|
515 | 586 | /*
|
@@ -540,32 +611,40 @@ autoprewarm_database_main(Datum main_arg)
|
540 | 611 |
|
541 | 612 | nblocks = RelationGetNumberOfBlocksInFork(rel, blk.forknum);
|
542 | 613 |
|
543 |
| - /* Prewarm buffers. */ |
544 |
| - while (i < apw_state->prewarm_stop_idx && |
545 |
| - blk.tablespace == tablespace && |
546 |
| - blk.filenumber == filenumber && |
547 |
| - blk.forknum == forknum && |
548 |
| - have_free_buffer()) |
| 614 | + p = (struct AutoPrewarmReadStreamData) |
549 | 615 | {
|
550 |
| - CHECK_FOR_INTERRUPTS(); |
551 |
| - |
552 |
| - /* Check whether blocknum is valid and within fork file size. */ |
553 |
| - if (blk.blocknum >= nblocks) |
554 |
| - { |
555 |
| - blk = block_info[++i]; |
556 |
| - continue; |
557 |
| - } |
558 |
| - |
559 |
| - buf = ReadBufferExtended(rel, blk.forknum, blk.blocknum, RBM_NORMAL, |
560 |
| - NULL); |
561 |
| - |
562 |
| - blk = block_info[++i]; |
563 |
| - if (!BufferIsValid(buf)) |
564 |
| - break; |
| 616 | + .block_info = block_info, |
| 617 | + .pos = i, |
| 618 | + .tablespace = tablespace, |
| 619 | + .filenumber = filenumber, |
| 620 | + .forknum = forknum, |
| 621 | + .nblocks = nblocks, |
| 622 | + }; |
| 623 | + |
| 624 | + stream = read_stream_begin_relation(READ_STREAM_FULL, |
| 625 | + NULL, |
| 626 | + rel, |
| 627 | + p.forknum, |
| 628 | + apw_read_stream_next_block, |
| 629 | + &p, |
| 630 | + 0); |
565 | 631 |
|
| 632 | + /* |
| 633 | + * Loop until we've prewarmed all the blocks from this fork. The |
| 634 | + * read stream callback will check that we still have free buffers |
| 635 | + * before requesting each block from the read stream API. |
| 636 | + */ |
| 637 | + while ((buf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) |
| 638 | + { |
566 | 639 | apw_state->prewarmed_blocks++;
|
567 | 640 | ReleaseBuffer(buf);
|
568 | 641 | }
|
| 642 | + |
| 643 | + read_stream_end(stream); |
| 644 | + |
| 645 | + /* Advance i past all the blocks just prewarmed. */ |
| 646 | + i = p.pos; |
| 647 | + blk = block_info[i]; |
569 | 648 | }
|
570 | 649 |
|
571 | 650 | relation_close(rel, AccessShareLock);
|
|
0 commit comments