@@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
96
96
static void logicalrep_worker_onexit (int code , Datum arg );
97
97
static void logicalrep_worker_detach (void );
98
98
static void logicalrep_worker_cleanup (LogicalRepWorker * worker );
99
- static int logicalrep_pa_worker_count (Oid subid );
99
+ static void logicalrep_worker_count (Oid subid , int * nsync , int * nparallelapply );
100
100
static void logicalrep_launcher_attach_dshmem (void );
101
101
static void ApplyLauncherSetWorkerStartTime (Oid subid , TimestampTz start_time );
102
102
static TimestampTz ApplyLauncherGetWorkerStartTime (Oid subid );
@@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
336
336
*/
337
337
LWLockAcquire (LogicalRepWorkerLock , LW_EXCLUSIVE );
338
338
339
- retry :
340
339
/* Find unused worker slot. */
341
340
for (i = 0 ; i < max_logical_replication_workers ; i ++ )
342
341
{
@@ -350,16 +349,21 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
350
349
}
351
350
}
352
351
353
- nsyncworkers = logicalrep_sync_worker_count (subid );
352
+ logicalrep_worker_count (subid , & nsyncworkers , & nparallelapplyworkers );
354
353
355
354
now = GetCurrentTimestamp ();
356
355
357
356
/*
358
- * If we didn't find a free slot, try to do garbage collection. The
359
- * reason we do this is because if some worker failed to start up and its
360
- * parent has crashed while waiting, the in_use state was never cleared.
357
+ * If we can't start a new logical replication background worker because
358
+ * no free slot is available, or because the number of sync workers or
359
+ * parallel apply workers has reached the limit per subscriptoin, try
360
+ * running garbage collection. The reason we do this is because if some
361
+ * workers failed to start up and their parent has crashed while waiting,
362
+ * the in_use state was never cleared. By freeing up these stale worker
363
+ * slots, we may be able to start a new worker.
361
364
*/
362
- if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription )
365
+ if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription ||
366
+ nparallelapplyworkers >= max_parallel_apply_workers_per_subscription )
363
367
{
364
368
bool did_cleanup = false;
365
369
@@ -381,11 +385,21 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
381
385
382
386
logicalrep_worker_cleanup (w );
383
387
did_cleanup = true;
388
+
389
+ if (worker == NULL )
390
+ {
391
+ worker = w ;
392
+ slot = i ;
393
+ }
384
394
}
385
395
}
386
396
397
+ /*
398
+ * Count the current number of sync and parallel apply workers again,
399
+ * since garbage collection may have changed it.
400
+ */
387
401
if (did_cleanup )
388
- goto retry ;
402
+ logicalrep_worker_count ( subid , & nsyncworkers , & nparallelapplyworkers ) ;
389
403
}
390
404
391
405
/*
@@ -399,8 +413,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
399
413
return false;
400
414
}
401
415
402
- nparallelapplyworkers = logicalrep_pa_worker_count (subid );
403
-
404
416
/*
405
417
* Return false if the number of parallel apply workers reached the limit
406
418
* per subscription.
@@ -844,48 +856,42 @@ logicalrep_worker_onexit(int code, Datum arg)
844
856
int
845
857
logicalrep_sync_worker_count (Oid subid )
846
858
{
847
- int i ;
848
859
int res = 0 ;
849
860
850
- Assert (LWLockHeldByMe (LogicalRepWorkerLock ));
851
-
852
- /* Search for attached worker for a given subscription id. */
853
- for (i = 0 ; i < max_logical_replication_workers ; i ++ )
854
- {
855
- LogicalRepWorker * w = & LogicalRepCtx -> workers [i ];
856
-
857
- if (isTablesyncWorker (w ) && w -> subid == subid )
858
- res ++ ;
859
- }
860
-
861
+ logicalrep_worker_count (subid , & res , NULL );
861
862
return res ;
862
863
}
863
864
864
865
/*
865
- * Count the number of registered (but not necessarily running) parallel apply
866
- * workers for a subscription.
866
+ * Count the number of registered (but not necessarily running) sync workers
867
+ * and parallel apply workers for a subscription.
867
868
*/
868
- static int
869
- logicalrep_pa_worker_count (Oid subid )
869
+ static void
870
+ logicalrep_worker_count (Oid subid , int * nsync , int * nparallelapply )
870
871
{
871
- int i ;
872
- int res = 0 ;
873
-
874
872
Assert (LWLockHeldByMe (LogicalRepWorkerLock ));
875
873
874
+ if (nsync != NULL )
875
+ * nsync = 0 ;
876
+ if (nparallelapply != NULL )
877
+ * nparallelapply = 0 ;
878
+
876
879
/*
877
- * Scan all attached parallel apply workers, only counting those which
878
- * have the given subscription id.
880
+ * Scan all attached sync and parallel apply workers, only counting those
881
+ * which have the given subscription id.
879
882
*/
880
- for (i = 0 ; i < max_logical_replication_workers ; i ++ )
883
+ for (int i = 0 ; i < max_logical_replication_workers ; i ++ )
881
884
{
882
885
LogicalRepWorker * w = & LogicalRepCtx -> workers [i ];
883
886
884
- if (isParallelApplyWorker (w ) && w -> subid == subid )
885
- res ++ ;
887
+ if (w -> subid == subid )
888
+ {
889
+ if (nsync != NULL && isTablesyncWorker (w ))
890
+ (* nsync )++ ;
891
+ if (nparallelapply != NULL && isParallelApplyWorker (w ))
892
+ (* nparallelapply )++ ;
893
+ }
886
894
}
887
-
888
- return res ;
889
895
}
890
896
891
897
/*
0 commit comments