diff options
author | Benoit Daloze <[email protected]> | 2020-09-05 16:26:24 +1200 |
---|---|---|
committer | Samuel Williams <[email protected]> | 2020-09-14 16:44:09 +1200 |
commit | 178c1b0922dc727897d81d7cfe9c97d5ffa97fd9 (patch) | |
tree | 113600e7e6a196b779bcac7529535597858f78a7 /thread_sync.c | |
parent | 9e0a48c7a31ecd39be0596d0517b9d521ae75282 (diff) |
Make Mutex per-Fiber instead of per-Thread
* Enables Mutex to be used as synchronization between multiple Fibers
of the same Thread.
* With a Fiber scheduler we can yield to another Fiber on contended
Mutex#lock instead of blocking the entire thread.
* This also makes the behavior of Mutex consistent across CRuby, JRuby and TruffleRuby.
* [Feature #16792]
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/3434
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 106 |
1 files changed, 69 insertions, 37 deletions
diff --git a/thread_sync.c b/thread_sync.c index deb3858c31..cfdd62635a 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -7,6 +7,7 @@ static VALUE rb_eClosedQueueError; /* sync_waiter is always on-stack */ struct sync_waiter { rb_thread_t *th; + rb_fiber_t *fiber; struct list_node node; }; @@ -42,7 +43,9 @@ wakeup_all(struct list_head *head) /* Mutex */ typedef struct rb_mutex_struct { - rb_thread_t *th; + VALUE self; + + rb_fiber_t *fiber; struct rb_mutex_struct *next_mutex; struct list_head waitq; /* protected by GVL */ } rb_mutex_t; @@ -52,7 +55,7 @@ static void rb_mutex_abandon_all(rb_mutex_t *mutexes); static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); #endif -static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th); +static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber); /* * Document-class: Mutex @@ -93,13 +96,15 @@ rb_mutex_num_waiting(rb_mutex_t *mutex) return n; } +rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber); + static void mutex_free(void *ptr) { rb_mutex_t *mutex = ptr; - if (mutex->th) { + if (mutex->fiber) { /* rb_warn("free locked mutex"); */ - const char *err = rb_mutex_unlock_th(mutex, mutex->th); + const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber); if (err) rb_bug("%s", err); } ruby_xfree(ptr); @@ -145,6 +150,8 @@ mutex_alloc(VALUE klass) rb_mutex_t *mutex; obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); + + mutex->self = obj; list_head_init(&mutex->waitq); return obj; } @@ -178,7 +185,7 @@ rb_mutex_locked_p(VALUE self) { rb_mutex_t *mutex = mutex_ptr(self); - return mutex->th ? Qtrue : Qfalse; + return mutex->fiber ? Qtrue : Qfalse; } static void @@ -191,7 +198,7 @@ mutex_locked(rb_thread_t *th, VALUE self) } th->keeping_mutexes = mutex; - th->blocking += 1; + // th->blocking += 1; } /* @@ -207,9 +214,10 @@ rb_mutex_trylock(VALUE self) rb_mutex_t *mutex = mutex_ptr(self); VALUE locked = Qfalse; - if (mutex->th == 0) { + if (mutex->fiber == 0) { + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_thread_t *th = GET_THREAD(); - mutex->th = th; + mutex->fiber = fiber; locked = Qtrue; mutex_locked(th, self); @@ -226,9 +234,9 @@ rb_mutex_trylock(VALUE self) static const rb_thread_t *patrol_thread = NULL; static VALUE -mutex_owned_p(rb_thread_t *th, rb_mutex_t *mutex) +mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) { - if (mutex->th == th) { + if (mutex->fiber == fiber) { return Qtrue; } else { @@ -240,6 +248,8 @@ static VALUE do_mutex_lock(VALUE self, int interruptible_p) { rb_thread_t *th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_fiber_t *fiber = ec->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); /* When running trap handler */ @@ -249,15 +259,33 @@ do_mutex_lock(VALUE self, int interruptible_p) } if (rb_mutex_trylock(self) == Qfalse) { - struct sync_waiter w; + struct sync_waiter w = { + .th = th, + .fiber = fiber + }; - if (mutex->th == th) { + if (mutex->fiber == fiber) { rb_raise(rb_eThreadError, "deadlock; recursive locking"); } - w.th = th; + VALUE scheduler = rb_thread_current_scheduler(); + while (mutex->fiber != fiber) { + if (scheduler != Qnil) { + list_add_tail(&mutex->waitq, &w.node); + + rb_scheduler_mutex_lock(scheduler, self); - while (mutex->th != th) { + list_del(&w.node); + + if (!mutex->fiber) { + mutex->fiber = fiber; + break; + } else { + // Try again... + continue; + } + } + enum rb_thread_status prev_status = th->status; rb_hrtime_t *timeout = 0; rb_hrtime_t rel = rb_msec2hrtime(100); @@ -277,18 +305,20 @@ do_mutex_lock(VALUE self, int interruptible_p) } list_add_tail(&mutex->waitq, &w.node); - native_sleep(th, timeout); /* release GVL */ + + native_sleep(th, timeout); /* release GVL */ + list_del(&w.node); - if (!mutex->th) { - mutex->th = th; + if (!mutex->fiber) { + mutex->fiber = fiber; } if (patrol_thread == th) patrol_thread = NULL; th->locking_mutex = Qfalse; - if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { + if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { rb_check_deadlock(th->ractor); } if (th->status == THREAD_STOPPED_FOREVER) { @@ -299,22 +329,19 @@ do_mutex_lock(VALUE self, int interruptible_p) if (interruptible_p) { /* release mutex before checking for interrupts...as interrupt checking * code might call rb_raise() */ - if (mutex->th == th) mutex->th = 0; - + if (mutex->fiber == fiber) mutex->fiber = 0; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */ - if (!mutex->th) { - mutex->th = th; - mutex_locked(th, self); + if (!mutex->fiber) { + mutex->fiber = fiber; } } - else { - if (mutex->th == th) mutex_locked(th, self); - } } + + if (mutex->fiber == fiber) mutex_locked(th, self); } // assertion - if (mutex_owned_p(th, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); + if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); return self; } @@ -347,32 +374,37 @@ rb_mutex_lock(VALUE self) VALUE rb_mutex_owned_p(VALUE self) { - rb_thread_t *th = GET_THREAD(); + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); - return mutex_owned_p(th, mutex); + return mutex_owned_p(fiber, mutex); } static const char * -rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th) +rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) { const char *err = NULL; - if (mutex->th == 0) { + if (mutex->fiber == 0) { err = "Attempt to unlock a mutex which is not locked"; } - else if (mutex->th != th) { - err = "Attempt to unlock a mutex which is locked by another thread"; + else if (mutex->fiber != fiber) { + err = "Attempt to unlock a mutex which is locked by another thread/fiber"; } else { struct sync_waiter *cur = 0, *next; rb_mutex_t **th_mutex = &th->keeping_mutexes; - th->blocking -= 1; + // th->blocking -= 1; - mutex->th = 0; + mutex->fiber = 0; list_for_each_safe(&mutex->waitq, cur, next, node) { list_del_init(&cur->node); + + if (cur->th->scheduler != Qnil) { + rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber)); + } + switch (cur->th->status) { case THREAD_RUNNABLE: /* from someone else calling Thread#run */ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ @@ -411,7 +443,7 @@ rb_mutex_unlock(VALUE self) rb_mutex_t *mutex = mutex_ptr(self); rb_thread_t *th = GET_THREAD(); - err = rb_mutex_unlock_th(mutex, th); + err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr); if (err) rb_raise(rb_eThreadError, "%s", err); return self; @@ -444,7 +476,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) while (mutexes) { mutex = mutexes; mutexes = mutex->next_mutex; - mutex->th = 0; + mutex->fiber = 0; mutex->next_mutex = 0; list_head_init(&mutex->waitq); } |