@@ -116,6 +116,7 @@ struct ReadStream
116
116
int16 pinned_buffers ;
117
117
int16 distance ;
118
118
bool advice_enabled ;
119
+ bool temporary ;
119
120
120
121
/*
121
122
* One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -225,7 +226,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
225
226
stream -> buffered_blocknum = blocknum ;
226
227
}
227
228
228
- static void
229
+ /*
230
+ * Start as much of the current pending read as we can. If we have to split it
231
+ * because of the per-backend buffer limit, or the buffer manager decides to
232
+ * split it, then the pending read is adjusted to hold the remaining portion.
233
+ *
234
+ * We can always start a read of at least size one if we have no progress yet.
235
+ * Otherwise it's possible that we can't start a read at all because of a lack
236
+ * of buffers, and then false is returned. Buffer shortages also reduce the
237
+ * distance to a level that prevents look-ahead until buffers are released.
238
+ */
239
+ static bool
229
240
read_stream_start_pending_read (ReadStream * stream , bool suppress_advice )
230
241
{
231
242
bool need_wait ;
@@ -234,12 +245,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234
245
int16 io_index ;
235
246
int16 overflow ;
236
247
int16 buffer_index ;
248
+ int16 buffer_limit ;
237
249
238
250
/* This should only be called with a pending read. */
239
251
Assert (stream -> pending_read_nblocks > 0 );
240
252
Assert (stream -> pending_read_nblocks <= stream -> io_combine_limit );
241
253
242
- /* We had better not exceed the pin limit by starting this read. */
254
+ /* We had better not exceed the per-stream buffer limit with this read. */
243
255
Assert (stream -> pinned_buffers + stream -> pending_read_nblocks <=
244
256
stream -> max_pinned_buffers );
245
257
@@ -260,10 +272,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
260
272
else
261
273
flags = 0 ;
262
274
263
- /* We say how many blocks we want to read, but may be smaller on return. */
275
+ /* Compute the remaining portion of the per-backend buffer limit. */
276
+ if (stream -> temporary )
277
+ buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
278
+ else
279
+ buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
280
+ if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
281
+ buffer_limit = 1 ; /* guarantee progress */
282
+
283
+ /* Does the per-backend buffer limit affect this read? */
284
+ nblocks = stream -> pending_read_nblocks ;
285
+ if (buffer_limit < nblocks )
286
+ {
287
+ int16 new_distance ;
288
+
289
+ /* Shrink distance: no more look-ahead until buffers are released. */
290
+ new_distance = stream -> pinned_buffers + buffer_limit ;
291
+ if (stream -> distance > new_distance )
292
+ stream -> distance = new_distance ;
293
+
294
+ /* If we've already made progress, just give up and wait for buffers. */
295
+ if (stream -> pinned_buffers > 0 )
296
+ return false;
297
+
298
+ /* A short read is required to make progress. */
299
+ nblocks = buffer_limit ;
300
+ }
301
+
302
+ /*
303
+ * We say how many blocks we want to read, but it may be smaller on return
304
+ * if the buffer manager decides it needs a short read at its level.
305
+ */
264
306
buffer_index = stream -> next_buffer_index ;
265
307
io_index = stream -> next_io_index ;
266
- nblocks = stream -> pending_read_nblocks ;
267
308
need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
268
309
& stream -> buffers [buffer_index ],
269
310
stream -> pending_read_blocknum ,
@@ -313,6 +354,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
313
354
/* Adjust the pending read to cover the remaining portion, if any. */
314
355
stream -> pending_read_blocknum += nblocks ;
315
356
stream -> pending_read_nblocks -= nblocks ;
357
+
358
+ return true;
316
359
}
317
360
318
361
static void
@@ -361,14 +404,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
361
404
/* We have to start the pending read before we can build another. */
362
405
while (stream -> pending_read_nblocks > 0 )
363
406
{
364
- read_stream_start_pending_read (stream , suppress_advice );
365
- suppress_advice = false;
366
- if (stream -> ios_in_progress == stream -> max_ios )
407
+ if (!read_stream_start_pending_read (stream , suppress_advice ) ||
408
+ stream -> ios_in_progress == stream -> max_ios )
367
409
{
368
- /* And we've hit the limit. Rewind, and stop here . */
410
+ /* And we've hit a buffer or I/O limit. Rewind and wait . */
369
411
read_stream_unget_block (stream , blocknum );
370
412
return ;
371
413
}
414
+
415
+ suppress_advice = false;
372
416
}
373
417
374
418
/* This is the start of a new pending read. */
@@ -382,15 +426,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
382
426
* io_combine_limit size once more buffers have been consumed. However,
383
427
* if we've already reached io_combine_limit, or we've reached the
384
428
* distance limit and there isn't anything pinned yet, or the callback has
385
- * signaled end-of-stream, we start the read immediately.
429
+ * signaled end-of-stream, we start the read immediately. Note that the
430
+ * pending read could even exceed the distance goal, if the latter was
431
+ * reduced on buffer limit exhaustion.
386
432
*/
387
433
if (stream -> pending_read_nblocks > 0 &&
388
434
(stream -> pending_read_nblocks == stream -> io_combine_limit ||
389
- (stream -> pending_read_nblocks = = stream -> distance &&
435
+ (stream -> pending_read_nblocks > = stream -> distance &&
390
436
stream -> pinned_buffers == 0 ) ||
391
437
stream -> distance == 0 ) &&
392
438
stream -> ios_in_progress < stream -> max_ios )
393
439
read_stream_start_pending_read (stream , suppress_advice );
440
+
441
+ /*
442
+ * There should always be something pinned when we leave this function,
443
+ * whether started by this call or not, unless we've hit the end of the
444
+ * stream. In the worst case we can always make progress one buffer at a
445
+ * time.
446
+ */
447
+ Assert (stream -> pinned_buffers > 0 || stream -> distance == 0 );
394
448
}
395
449
396
450
/*
@@ -420,6 +474,7 @@ read_stream_begin_impl(int flags,
420
474
int max_ios ;
421
475
int strategy_pin_limit ;
422
476
uint32 max_pinned_buffers ;
477
+ uint32 max_possible_buffer_limit ;
423
478
Oid tablespace_id ;
424
479
425
480
/*
@@ -475,12 +530,23 @@ read_stream_begin_impl(int flags,
475
530
strategy_pin_limit = GetAccessStrategyPinLimit (strategy );
476
531
max_pinned_buffers = Min (strategy_pin_limit , max_pinned_buffers );
477
532
478
- /* Don't allow this backend to pin more than its share of buffers. */
533
+ /*
534
+ * Also limit our queue to the maximum number of pins we could possibly
535
+ * ever be allowed to acquire according to the buffer manager. We may not
536
+ * really be able to use them all due to other pins held by this backend,
537
+ * but we'll check that later in read_stream_start_pending_read().
538
+ */
479
539
if (SmgrIsTemp (smgr ))
480
- LimitAdditionalLocalPins ( & max_pinned_buffers );
540
+ max_possible_buffer_limit = GetSoftLocalPinLimit ( );
481
541
else
482
- LimitAdditionalPins (& max_pinned_buffers );
483
- Assert (max_pinned_buffers > 0 );
542
+ max_possible_buffer_limit = GetSoftPinLimit ();
543
+ max_pinned_buffers = Min (max_pinned_buffers , max_possible_buffer_limit );
544
+
545
+ /*
546
+ * The soft limit might be zero on a system configured with more
547
+ * connections than buffers. We need at least one to make progress.
548
+ */
549
+ max_pinned_buffers = Max (1 , max_pinned_buffers );
484
550
485
551
/*
486
552
* We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +612,7 @@ read_stream_begin_impl(int flags,
546
612
stream -> callback = callback ;
547
613
stream -> callback_private_data = callback_private_data ;
548
614
stream -> buffered_blocknum = InvalidBlockNumber ;
615
+ stream -> temporary = SmgrIsTemp (smgr );
549
616
550
617
/*
551
618
* Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +741,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
674
741
* arbitrary I/O entry (they're all free). We don't have to
675
742
* adjust pinned_buffers because we're transferring one to caller
676
743
* but pinning one more.
744
+ *
745
+ * In the fast path we don't need to check the pin limit. We're
746
+ * always allowed at least one pin so that progress can be made,
747
+ * and that's all we need here. Although two pins are momentarily
748
+ * held at the same time, the model used here is that the stream
749
+ * holds only one, and the other now belongs to the caller.
677
750
*/
678
751
if (likely (!StartReadBuffer (& stream -> ios [0 ].op ,
679
752
& stream -> buffers [oldest_buffer_index ],
@@ -874,6 +947,9 @@ read_stream_reset(ReadStream *stream)
874
947
stream -> buffered_blocknum = InvalidBlockNumber ;
875
948
stream -> fast_path = false;
876
949
950
+ /* There is no point in reading whatever was pending. */
951
+ stream -> pending_read_nblocks = 0 ;
952
+
877
953
/* Unpin anything that wasn't consumed. */
878
954
while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
879
955
ReleaseBuffer (buffer );
0 commit comments