diff options
author | Koichi Sasada <[email protected]> | 2023-04-10 10:53:13 +0900 |
---|---|---|
committer | Koichi Sasada <[email protected]> | 2023-10-12 14:47:01 +0900 |
commit | be1bbd5b7d40ad863ab35097765d3754726bbd54 (patch) | |
tree | 2995a0859bea1d6b2903dcd324f41869dbef14a1 /thread.c | |
parent | 096ee0648e215915a3019c2cd68ba220d94eca12 (diff) |
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system.
In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.
From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.
Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.
There are additional settings by environment variables:
`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).
`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).
This patch will be reverted soon if non-easy issues are found.
[Bug #19842]
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 287 |
1 files changed, 118 insertions, 169 deletions
@@ -147,7 +147,6 @@ static const char *thread_status_name(rb_thread_t *th, int detail); static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); static int consume_communication_pipe(int fd); -static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); static volatile int system_working = 1; @@ -260,12 +259,6 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start))); -static void -ubf_sigwait(void *ignore) -{ - rb_thread_wakeup_timer_thread(0); -} - #include THREAD_IMPL_SRC /* @@ -646,20 +639,13 @@ static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start) { STACK_GROW_DIR_DETECTION; - enum ruby_tag_type state; - VALUE errinfo = Qnil; - size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); - rb_thread_t *ractor_main_th = th->ractor->threads.main; - VALUE * vm_stack = NULL; - VM_ASSERT(th != th->vm->ractor.main_thread); RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + VM_ASSERT(th != th->vm->ractor.main_thread); - // setup native thread - thread_sched_to_running(TH_SCHED(th), th); - ruby_thread_set_native(th); - - RUBY_DEBUG_LOG("got lock. th:%u", rb_th_serial(th)); + enum ruby_tag_type state; + VALUE errinfo = Qnil; + rb_thread_t *ractor_main_th = th->ractor->threads.main; // setup ractor if (rb_ractor_status_p(th->ractor, ractor_blocking)) { @@ -674,17 +660,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) RB_VM_UNLOCK(); } - // This assertion is not passed on win32 env. Check it later. - // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize); - - // setup VM and machine stack - vm_stack = alloca(size * sizeof(VALUE)); - VM_ASSERT(vm_stack); - - rb_ec_initialize_vm_stack(th->ec, vm_stack, size); - th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack); - th->ec->machine.stack_maxsize -= size * sizeof(VALUE); - // Ensure that we are not joinable. VM_ASSERT(UNDEF_P(th->value)); @@ -990,11 +965,11 @@ rb_thread_create(VALUE (*fn)(void *), void *arg) } VALUE -rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) +rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc) { struct thread_create_params params = { .type = thread_invoke_type_ractor_proc, - .g = g, + .g = r, .args = args, .proc = proc, }; @@ -1375,14 +1350,14 @@ sleep_forever(rb_thread_t *th, unsigned int fl) void rb_thread_sleep_forever(void) { - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("forever"); sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK); } void rb_thread_sleep_deadly(void) { - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("deadly"); sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK); } @@ -1394,7 +1369,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hr rb_fiber_scheduler_block(scheduler, blocker, timeout); } else { - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("..."); if (end) { sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK); } @@ -1491,7 +1466,7 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, th->status = THREAD_STOPPED; rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id); RB_VM_SAVE_MACHINE_CONTEXT(th); thread_sched_to_waiting(TH_SCHED(th), th); @@ -1519,8 +1494,12 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) th->status = region->prev_status; } - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("end"); + +#ifndef _WIN32 + // GET_THREAD() clears WSAGetLastError() VM_ASSERT(th == GET_THREAD()); +#endif } void * @@ -1544,14 +1523,11 @@ rb_nogvl(void *(*func)(void *), void *data1, if (flags & RB_NOGVL_UBF_ASYNC_SAFE) { vm->ubf_async_safe = 1; } - else { - ubf_th = rb_thread_start_unblock_thread(); - } } BLOCKING_REGION(th, { val = func(data1); - saved_errno = errno; + saved_errno = rb_errno(); }, ubf, data2, flags & RB_NOGVL_INTR_FAIL); if (is_main_thread) vm->ubf_async_safe = 0; @@ -1564,7 +1540,7 @@ rb_nogvl(void *(*func)(void *), void *data1, thread_value(rb_thread_kill(ubf_th)); } - errno = saved_errno; + rb_errno_set(saved_errno); return val; } @@ -1689,11 +1665,31 @@ rb_thread_io_wake_pending_closer(struct waiting_fd *wfd) } } +static int +waitfd_to_waiting_flag(int wfd_event) +{ + return wfd_event << 1; +} + VALUE -rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events) { - volatile VALUE val = Qundef; /* shouldn't be used */ rb_execution_context_t * volatile ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); + + RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events); + +#ifdef RUBY_THREAD_PTHREAD_H + if (events && !th_has_dedicated_nt(th)) { + VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT); + + // wait readable/writable + thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL); + RUBY_VM_CHECK_INTS_BLOCKING(ec); + } +#endif + + volatile VALUE val = Qundef; /* shouldn't be used */ volatile int saved_errno = 0; enum ruby_tag_type state; @@ -1746,6 +1742,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) return val; } +VALUE +rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +{ + return rb_thread_io_blocking_call(func, data1, fd, 0); +} + /* * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release. * @@ -2379,15 +2381,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) /* signal handling */ if (trap_interrupt && (th == th->vm->ractor.main_thread)) { enum rb_thread_status prev_status = th->status; - int sigwait_fd = rb_sigwait_fd_get(th); - if (sigwait_fd >= 0) { - (void)consume_communication_pipe(sigwait_fd); - rb_sigwait_fd_put(th, sigwait_fd); - } th->status = THREAD_RUNNABLE; - while ((sig = rb_get_next_signal()) != 0) { - ret |= rb_signal_exec(th, sig); + { + while ((sig = rb_get_next_signal()) != 0) { + ret |= rb_signal_exec(th, sig); + } } th->status = prev_status; } @@ -2432,7 +2431,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) limits_us >>= -th->priority; if (th->status == THREAD_RUNNABLE) - th->running_time_us += TIME_QUANTUM_USEC; + th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro VM_ASSERT(th->ec->cfp); EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self, @@ -3362,7 +3361,7 @@ rb_thread_setname(VALUE thread, VALUE name) name = rb_str_new_frozen(name); } target_th->name = name; - if (threadptr_initialized(target_th)) { + if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) { native_set_another_thread_name(target_th->nt->thread_id, name); } return name; @@ -4148,7 +4147,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) struct select_set { int max; - int sigwait_fd; rb_thread_t *th; rb_fdset_t *rset; rb_fdset_t *wset; @@ -4164,10 +4162,6 @@ select_set_free(VALUE p) { struct select_set *set = (struct select_set *)p; - if (set->sigwait_fd >= 0) { - rb_sigwait_fd_put(set->th, set->sigwait_fd); - } - rb_fd_term(&set->orig_rset); rb_fd_term(&set->orig_wset); rb_fd_term(&set->orig_eset); @@ -4175,24 +4169,6 @@ select_set_free(VALUE p) return Qfalse; } -static const rb_hrtime_t * -sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig, - int *drained_p) -{ - static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000; - - if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { - *drained_p = check_signals_nogvl(th, sigwait_fd); - if (!orig || *orig > quantum) - return &quantum; - } - - return orig; -} - -#define sigwait_signals_fd(result, cond, sigwait_fd) \ - (result > 0 && (cond) ? (result--, (sigwait_fd)) : -1) - static VALUE do_select(VALUE p) { @@ -4211,28 +4187,18 @@ do_select(VALUE p) TRUE) do { - int drained; lerrno = 0; BLOCKING_REGION(set->th, { - const rb_hrtime_t *sto; struct timeval tv; - sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained); if (!RUBY_VM_INTERRUPTED(set->th->ec)) { - result = native_fd_select(set->max, set->rset, set->wset, - set->eset, - rb_hrtime2timeval(&tv, sto), set->th); + result = native_fd_select(set->max, + set->rset, set->wset, set->eset, + rb_hrtime2timeval(&tv, to), set->th); if (result < 0) lerrno = errno; } - }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE); - - if (set->sigwait_fd >= 0) { - int fd = sigwait_signals_fd(result, - rb_fd_isset(set->sigwait_fd, set->rset), - set->sigwait_fd); - (void)check_signals_nogvl(set->th, fd); - } + }, ubf_select, set->th, TRUE); RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ } while (wait_retryable(&result, lerrno, to, end) && do_select_update()); @@ -4244,18 +4210,6 @@ do_select(VALUE p) return (VALUE)result; } -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - if (fd < 0) { - return 0; - } - rb_fd_init(fds); - rb_fd_set(fd, fds); - - return fds; -} - int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) @@ -4279,16 +4233,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return 0; } - set.sigwait_fd = rb_sigwait_fd_get(set.th); - if (set.sigwait_fd >= 0) { - if (set.rset) - rb_fd_set(set.sigwait_fd, set.rset); - else - set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); - if (set.sigwait_fd >= set.max) { - set.max = set.sigwait_fd + 1; - } - } #define fd_init_copy(f) do { \ if (set.f) { \ rb_fd_resize(set.max - 1, set.f); \ @@ -4325,19 +4269,35 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * int rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) { - struct pollfd fds[2]; + struct pollfd fds[1]; int result = 0; - int drained; nfds_t nfds; - rb_unblock_function_t *ubf; struct waiting_fd wfd; int state; volatile int lerrno; - wfd.th = GET_THREAD(); + rb_thread_t *th = wfd.th = GET_THREAD(); wfd.fd = fd; wfd.busy = NULL; +#ifdef RUBY_THREAD_PTHREAD_H + if (!th->nt->dedicated) { + rb_hrtime_t rel, *prel; + + if (timeout) { + rel = rb_timeval2hrtime(timeout); + prel = &rel; + } + else { + prel = NULL; + } + + if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) { + return 0; // timeout + } + } +#endif + RB_VM_LOCK_ENTER(); { ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); @@ -4353,36 +4313,18 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) fds[0].events = (short)events; fds[0].revents = 0; do { - fds[1].fd = rb_sigwait_fd_get(wfd.th); - - if (fds[1].fd >= 0) { - fds[1].events = POLLIN; - fds[1].revents = 0; - nfds = 2; - ubf = ubf_sigwait; - } - else { - nfds = 1; - ubf = ubf_select; - } + nfds = 1; lerrno = 0; BLOCKING_REGION(wfd.th, { - const rb_hrtime_t *sto; struct timespec ts; - sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained); if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) { - result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0); + result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0); if (result < 0) lerrno = errno; } - }, ubf, wfd.th, TRUE); + }, ubf_select, wfd.th, TRUE); - if (fds[1].fd >= 0) { - int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd); - (void)check_signals_nogvl(wfd.th, fd1); - rb_sigwait_fd_put(wfd.th, fds[1].fd); - } RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); } while (wait_retryable(&result, lerrno, to, end)); } @@ -4470,6 +4412,18 @@ select_single_cleanup(VALUE ptr) return (VALUE)-1; } +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) +{ + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); + + return fds; +} + int rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) { @@ -4552,16 +4506,13 @@ consume_communication_pipe(int fd) ssize_t result; int ret = FALSE; /* for rb_sigwait_sleep */ - /* - * disarm UBF_TIMER before we read, because it can become - * re-armed at any time via sighandler and the pipe will refill - * We can disarm it because this thread is now processing signals - * and we do not want unnecessary SIGVTALRM - */ - ubf_timer_disarm(); - while (1) { result = read(fd, buff, sizeof(buff)); +#if USE_EVENTFD + RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]); +#else + RUBY_DEBUG_LOG("result:%d", (int)result); +#endif if (result > 0) { ret = TRUE; if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) { @@ -4588,24 +4539,6 @@ consume_communication_pipe(int fd) } } -static int -check_signals_nogvl(rb_thread_t *th, int sigwait_fd) -{ - rb_vm_t *vm = GET_VM(); /* th may be 0 */ - int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE; - ubf_wakeup_all_threads(); - if (rb_signal_buff_size()) { - if (th == vm->ractor.main_thread) { - /* no need to lock + wakeup if already in main thread */ - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - } - else { - threadptr_trap_interrupt(vm->ractor.main_thread); - } - } - return ret; -} - void rb_thread_stop_timer_thread(void) { @@ -4702,6 +4635,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r rb_ractor_sleeper_threads_clear(th->ractor); rb_clear_coverages(); + // restart timer thread (timer threads access to `vm->waitpid_lock` and so on. + rb_thread_reset_timer_thread(); + rb_thread_start_timer_thread(); + VM_ASSERT(vm->ractor.blocking_cnt == 0); VM_ASSERT(vm->ractor.cnt == 1); } @@ -5467,8 +5404,16 @@ Init_Thread(void) /* main thread setting */ { /* acquire global vm lock */ - struct rb_thread_sched *sched = TH_SCHED(th); - thread_sched_to_running(sched, th); +#ifdef HAVE_PTHREAD_NP_H + VM_ASSERT(TH_SCHED(th)->running == th); +#endif + // thread_sched_to_running() should not be called because + // it assumes blocked by thread_sched_to_waiting(). + // thread_sched_to_running(sched, th); + +#ifdef RB_INTERNAL_THREAD_HOOK + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED); +#endif th->pending_interrupt_queue = rb_ary_hidden_new(0); th->pending_interrupt_queue_checked = 0; @@ -5481,7 +5426,7 @@ Init_Thread(void) Init_thread_sync(); // TODO: Suppress unused function warning for now - if (0) rb_thread_sched_destroy(NULL); + // if (0) rb_thread_sched_destroy(NULL); } int @@ -5511,7 +5456,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) ccan_list_for_each(&r->threads.set, th, lt_node) { rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " "native:%p int:%u", - th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag); + th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag); if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); @@ -5537,14 +5482,18 @@ rb_check_deadlock(rb_ractor_t *r) { if (GET_THREAD()->vm->thread_ignore_deadlock) return; - int found = 0; - rb_thread_t *th = NULL; +#ifdef RUBY_THREAD_PTHREAD_H + if (r->threads.sched.readyq_cnt > 0) return; +#endif + int sleeper_num = rb_ractor_sleeper_thread_num(r); int ltnum = rb_ractor_living_thread_num(r); if (ltnum > sleeper_num) return; if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); - if (patrol_thread && patrol_thread != GET_THREAD()) return; + + int found = 0; + rb_thread_t *th = NULL; ccan_list_for_each(&r->threads.set, th, lt_node) { if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { |