diff options
author | Koichi Sasada <[email protected]> | 2023-04-10 10:53:13 +0900 |
---|---|---|
committer | Koichi Sasada <[email protected]> | 2023-10-12 14:47:01 +0900 |
commit | be1bbd5b7d40ad863ab35097765d3754726bbd54 (patch) | |
tree | 2995a0859bea1d6b2903dcd324f41869dbef14a1 /thread_win32.c | |
parent | 096ee0648e215915a3019c2cd68ba220d94eca12 (diff) |
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system.
In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.
From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.
Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.
There are additional settings by environment variables:
`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).
`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).
This patch will be reverted soon if non-easy issues are found.
[Bug #19842]
Diffstat (limited to 'thread_win32.c')
-rw-r--r-- | thread_win32.c | 178 |
1 files changed, 150 insertions, 28 deletions
diff --git a/thread_win32.c b/thread_win32.c index 543a045bd8..4190d03abe 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -148,12 +148,13 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) } void -rb_thread_sched_init(struct rb_thread_sched *sched) +rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork) { if (GVL_DEBUG) fprintf(stderr, "sched init\n"); sched->lock = w32_mutex_create(); } +// per-ractor void rb_thread_sched_destroy(struct rb_thread_sched *sched) { @@ -202,6 +203,11 @@ Init_native_thread(rb_thread_t *main_th) main_th->nt->interrupt_event); } +void +ruby_mn_threads_params(void) +{ +} + static int w32_wait_events(HANDLE *events, int count, DWORD timeout, rb_thread_t *th) { @@ -637,20 +643,32 @@ thread_start_func_1(void *th_ptr) RUBY_DEBUG_LOG("thread created th:%u, thid: %p, event: %p", rb_th_serial(th), th->nt->thread_id, th->nt->interrupt_event); + thread_sched_to_running(TH_SCHED(th), th); + ruby_thread_set_native(th); + + // kick threads thread_start_func_2(th, th->ec->machine.stack_start); w32_close_handle(thread_id); RUBY_DEBUG_LOG("thread deleted th:%u", rb_th_serial(th)); + return 0; } static int native_thread_create(rb_thread_t *th) { - const size_t stack_size = th->vm->default_params.thread_machine_stack_size + th->vm->default_params.thread_vm_stack_size; + // setup nt + const size_t stack_size = th->vm->default_params.thread_machine_stack_size; th->nt = ZALLOC(struct rb_native_thread); th->nt->thread_id = w32_create_thread(stack_size, thread_start_func_1, th); + // setup vm stack + size_t vm_stack_word_size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); + void *vm_stack = ruby_xmalloc(vm_stack_word_size * sizeof(VALUE)); + th->sched.vm_stack = vm_stack; + rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_word_size); + if ((th->nt->thread_id) == 0) { return thread_errno; } @@ -763,12 +781,6 @@ rb_thread_wakeup_timer_thread(int sig) /* do nothing */ } -static VALUE -rb_thread_start_unblock_thread(void) -{ - return Qfalse; /* no-op */ -} - static void rb_thread_create_timer_thread(void) { @@ -841,26 +853,6 @@ rb_reserved_fd_p(int fd) return 0; } -int -rb_sigwait_fd_get(rb_thread_t *th) -{ - return -1; /* TODO */ -} - -NORETURN(void rb_sigwait_fd_put(rb_thread_t *, int)); -void -rb_sigwait_fd_put(rb_thread_t *th, int fd) -{ - rb_bug("not implemented, should not be called"); -} - -NORETURN(void rb_sigwait_sleep(const rb_thread_t *, int, const rb_hrtime_t *)); -void -rb_sigwait_sleep(const rb_thread_t *th, int fd, const rb_hrtime_t *rel) -{ - rb_bug("not implemented, should not be called"); -} - rb_nativethread_id_t rb_nativethread_self(void) { @@ -881,4 +873,134 @@ native_thread_native_thread_id(rb_thread_t *th) } #define USE_NATIVE_THREAD_NATIVE_THREAD_ID 1 +void +rb_add_running_thread(rb_thread_t *th){ + // do nothing +} + +void +rb_del_running_thread(rb_thread_t *th) +{ + // do nothing +} + +static bool +th_has_dedicated_nt(const rb_thread_t *th) +{ + return true; +} + +void +rb_threadptr_sched_free(rb_thread_t *th) +{ + ruby_xfree(th->nt); + ruby_xfree(th->sched.vm_stack); +} + +void +rb_threadptr_remove(rb_thread_t *th) +{ + // do nothing +} + +void +rb_thread_sched_mark_zombies(rb_vm_t *vm) +{ + // do nothing +} + +static bool +vm_barrier_finish_p(rb_vm_t *vm) +{ + RUBY_DEBUG_LOG("cnt:%u living:%u blocking:%u", + vm->ractor.blocking_cnt == vm->ractor.cnt, + vm->ractor.sync.barrier_cnt, + vm->ractor.cnt, + vm->ractor.blocking_cnt); + + VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt); + return vm->ractor.blocking_cnt == vm->ractor.cnt; +} + +void +rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) +{ + vm->ractor.sync.barrier_waiting = true; + + RUBY_DEBUG_LOG("barrier start. cnt:%u living:%u blocking:%u", + vm->ractor.sync.barrier_cnt, + vm->ractor.cnt, + vm->ractor.blocking_cnt); + + rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); + + // send signal + rb_ractor_t *r = 0; + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { + if (r != cr) { + rb_ractor_vm_barrier_interrupt_running_thread(r); + } + } + + // wait + while (!vm_barrier_finish_p(vm)) { + rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_cond); + } + + RUBY_DEBUG_LOG("cnt:%u barrier success", vm->ractor.sync.barrier_cnt); + + rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); + + vm->ractor.sync.barrier_waiting = false; + vm->ractor.sync.barrier_cnt++; + + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { + rb_native_cond_signal(&r->barrier_wait_cond); + } +} + +void +rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) +{ + vm->ractor.sync.lock_owner = cr; + unsigned int barrier_cnt = vm->ractor.sync.barrier_cnt; + rb_thread_t *th = GET_THREAD(); + bool running; + + RB_VM_SAVE_MACHINE_CONTEXT(th); + + if (rb_ractor_status_p(cr, ractor_running)) { + rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); + running = true; + } + else { + running = false; + } + VM_ASSERT(rb_ractor_status_p(cr, ractor_blocking)); + + if (vm_barrier_finish_p(vm)) { + RUBY_DEBUG_LOG("wakeup barrier owner"); + rb_native_cond_signal(&vm->ractor.sync.barrier_cond); + } + else { + RUBY_DEBUG_LOG("wait for barrier finish"); + } + + // wait for restart + while (barrier_cnt == vm->ractor.sync.barrier_cnt) { + vm->ractor.sync.lock_owner = NULL; + rb_native_cond_wait(&cr->barrier_wait_cond, &vm->ractor.sync.lock); + VM_ASSERT(vm->ractor.sync.lock_owner == NULL); + vm->ractor.sync.lock_owner = cr; + } + + RUBY_DEBUG_LOG("barrier is released. Acquire vm_lock"); + + if (running) { + rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); + } + + vm->ractor.sync.lock_owner = NULL; +} + #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ |