diff options
author | Misaki Shioi <[email protected]> | 2024-11-12 10:06:48 +0900 |
---|---|---|
committer | GitHub <[email protected]> | 2024-11-12 10:06:48 +0900 |
commit | 4c270200dbc2a3a4511e8b793a033078afd6fb31 (patch) | |
tree | 75c802ceff5051e434701bba83dd965c1357a05d /ext/socket/ipsocket.c | |
parent | 821a5b966fbc2926dc3bf88b6ba09879fa35318e (diff) |
[Feature #120782] Introduction of Happy Eyeballs Version 2 (RFC8305) in TCPSocket.new (#11653)
* Introduction of Happy Eyeballs Version 2 (RFC8305) in TCPSocket.new
This is an implementation of Happy Eyeballs version 2 (RFC 8305) in `TCPSocket.new`.
See https://github.com/ruby/ruby/pull/11653
1. Background
Prior to this implementation, I implemented Happy Eyeballs Version 2 (HEv2) for `Socket.tcp` in https://github.com/ruby/ruby/pull/9374.
HEv2 is an algorithm defined in [RFC 8305](https://datatracker.ietf.org/doc/html/rfc8305), aimed at improving network connectivity.
For more details on the specific cases that HEv2 helps, please refer to https://bugs.ruby-lang.org/issues/20108.
2. Proposal & Outcome
This proposal implements the same HEv2 algorithm in `TCPSocket.new`.
Since `TCPSocket.new` is used more widely than `Socket.tcp`, this change is expected to broaden the impact of HEv2's benefits.
Like `Socket.tcp`, I have also added `fast_fallback` keyword argument to `TCPSocket.new`.
This option is set to true by default, enabling the HEv2 functionality.
However, users can explicitly set it to false to disable HEv2 and use the previous behavior of `TCPSocket.new`.
It should be noted that HEv2 is enabled only in environments where pthreads are available.
This specification follows the approach taken in https://bugs.ruby-lang.org/issues/19965 , where name resolution can be interrupted.
(In environments where pthreads are not available, the `fast_fallback` option is ignored.)
3. Performance
Below is the benchmark of 100 requests to `www.ruby-lang.org` with the fast_fallback option set to true and false, respectively.
While there is a slight performance degradation when HEv2 is enabled, the degradation is smaller compared to that seen in `Socket.tcp`.
```
~/s/build ❯❯❯ ../install/bin/ruby ../ruby/test.rb
Rehearsal --------------------------------------------------------
fast_fallback: true 0.017588 0.097045 0.114633 ( 1.460664)
fast_fallback: false 0.014033 0.078984 0.093017 ( 1.413951)
----------------------------------------------- total: 0.207650sec
user system total real
fast_fallback: true 0.020891 0.124054 0.144945 ( 1.473816)
fast_fallback: false 0.018392 0.110852 0.129244 ( 1.466014)
```
* Update debug prints
Co-authored-by: Nobuyoshi Nakada <[email protected]>
* Remove debug prints
* misc
* Disable HEv2 in Win
* Raise resolution error with hostname resolution
* Fix to handle errors
* Remove warnings
* Errors that do not need to be handled
* misc
* Improve doc
* Fix bug on cancellation
* Avoid EAI_ADDRFAMILY for resolving IPv6
* Follow upstream
* misc
* Refactor connection_attempt_fds management
- Introduced allocate_connection_attempt_fds and reallocate_connection_attempt_fds for improved memory allocation of connection_attempt_fds
- Added remove_connection_attempt_fd to resize connection_attempt_fds dynamically.
- Simplified the in_progress_fds function to only check the size of connection_attempt_fds.
* Rename do_pthread_create to raddrinfo_pthread_create to avoid conflicting
---------
Co-authored-by: Nobuyoshi Nakada <[email protected]>
Notes
Notes:
Merged-By: shioimm <[email protected]>
Diffstat (limited to 'ext/socket/ipsocket.c')
-rw-r--r-- | ext/socket/ipsocket.c | 1125 |
1 files changed, 1124 insertions, 1 deletions
diff --git a/ext/socket/ipsocket.c b/ext/socket/ipsocket.c index 3de8445435..f3c00b518d 100644 --- a/ext/socket/ipsocket.c +++ b/ext/socket/ipsocket.c @@ -169,9 +169,1129 @@ init_inetsock_internal(VALUE v) return io; } +#if FAST_FALLBACK_INIT_INETSOCK_IMPL == 0 + +VALUE +rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE _fast_fallback, VALUE _test_mode_settings) +{ + struct inetsock_arg arg; + arg.self = self; + arg.io = Qnil; + arg.remote.host = remote_host; + arg.remote.serv = remote_serv; + arg.remote.res = 0; + arg.local.host = local_host; + arg.local.serv = local_serv; + arg.local.res = 0; + arg.type = type; + arg.resolv_timeout = resolv_timeout; + arg.connect_timeout = connect_timeout; + return rb_ensure(init_inetsock_internal, (VALUE)&arg, + inetsock_cleanup, (VALUE)&arg); +} + +#elif FAST_FALLBACK_INIT_INETSOCK_IMPL == 1 + +#define IPV6_ENTRY_POS 0 +#define IPV4_ENTRY_POS 1 +#define RESOLUTION_ERROR 0 +#define SYSCALL_ERROR 1 + +static int +is_specified_ip_address(const char *hostname) +{ + if (!hostname) return false; + + struct in_addr ipv4addr; + struct in6_addr ipv6addr; + + return (inet_pton(AF_INET6, hostname, &ipv6addr) == 1 || + inet_pton(AF_INET, hostname, &ipv4addr) == 1); +} + +struct fast_fallback_inetsock_arg +{ + VALUE self; + VALUE io; + + struct { + VALUE host, serv; + struct rb_addrinfo *res; + } remote, local; + int type; + VALUE resolv_timeout; + VALUE connect_timeout; + + const char *hostp, *portp; + int *families; + int family_size; + int additional_flags; + int cancelled; + rb_nativethread_lock_t *lock; + struct fast_fallback_getaddrinfo_entry *getaddrinfo_entries[2]; + struct fast_fallback_getaddrinfo_shared *getaddrinfo_shared; + int connection_attempt_fds_size; + int *connection_attempt_fds; + VALUE test_mode_settings; +}; + +static struct fast_fallback_getaddrinfo_shared * +allocate_fast_fallback_getaddrinfo_shared(void) +{ + struct fast_fallback_getaddrinfo_shared *shared; + + shared = (struct fast_fallback_getaddrinfo_shared *)calloc( + 1, + sizeof(struct fast_fallback_getaddrinfo_shared) + ); + + return shared; +} + +static struct fast_fallback_getaddrinfo_entry * +allocate_fast_fallback_getaddrinfo_entry(void) +{ + struct fast_fallback_getaddrinfo_entry *entry; + + entry = (struct fast_fallback_getaddrinfo_entry *)calloc( + 1, + sizeof(struct fast_fallback_getaddrinfo_entry) + ); + + return entry; +} + +static void +allocate_fast_fallback_getaddrinfo_hints(struct addrinfo *hints, int family, int remote_addrinfo_hints, int additional_flags) +{ + MEMZERO(hints, struct addrinfo, 1); + hints->ai_family = family; + hints->ai_socktype = SOCK_STREAM; + hints->ai_protocol = IPPROTO_TCP; + hints->ai_flags = remote_addrinfo_hints; + hints->ai_flags |= additional_flags; +} + +static int* +allocate_connection_attempt_fds(int additional_capacity) +{ + int *fds = (int *)malloc(additional_capacity * sizeof(int)); + if (!fds) rb_syserr_fail(errno, "malloc(3)"); + for (int i = 0; i < additional_capacity; i++) fds[i] = -1; + return fds; +} + +static int +reallocate_connection_attempt_fds(int *fds, int current_capacity, int additional_capacity) +{ + int new_capacity = current_capacity + additional_capacity; + + if (realloc(fds, new_capacity * sizeof(int)) == NULL) { + rb_syserr_fail(errno, "realloc(3)"); + } + + for (int i = current_capacity; i < new_capacity; i++) fds[i] = -1; + return new_capacity; +} + +struct wait_fast_fallback_arg +{ + int status, nfds; + fd_set *readfds, *writefds; + struct timeval *delay; + int *cancelled; +}; + +static void * +wait_fast_fallback(void *ptr) +{ + struct wait_fast_fallback_arg *arg = (struct wait_fast_fallback_arg *)ptr; + int status; + status = select(arg->nfds, arg->readfds, arg->writefds, NULL, arg->delay); + arg->status = status; + if (errno == EINTR) *arg->cancelled = true; + return 0; +} + +static void +cancel_fast_fallback(void *ptr) +{ + if (!ptr) return; + + struct fast_fallback_getaddrinfo_shared *arg = (struct fast_fallback_getaddrinfo_shared *)ptr; + + rb_nativethread_lock_lock(arg->lock); + { + *arg->cancelled = true; + char notification = SELECT_CANCELLED; + if ((write(arg->notify, ¬ification, 1)) < 0) { + rb_syserr_fail(errno, "write(2)"); + } + } + rb_nativethread_lock_unlock(arg->lock); +} + +struct hostname_resolution_result +{ + struct addrinfo *ai; + int finished; + int has_error; +}; + +struct hostname_resolution_store +{ + struct hostname_resolution_result v6; + struct hostname_resolution_result v4; + int is_all_finised; +}; + +static int +any_addrinfos(struct hostname_resolution_store *resolution_store) +{ + return resolution_store->v6.ai || resolution_store->v4.ai; +} + +static struct timespec +current_clocktime_ts(void) +{ + struct timespec ts; + if ((clock_gettime(CLOCK_MONOTONIC, &ts)) < 0) { + rb_syserr_fail(errno, "clock_gettime(2)"); + } + return ts; +} + +static void +set_timeout_tv(struct timeval *tv, long ms, struct timespec from) +{ + long sec = ms / 1000; + long nsec = (ms % 1000) * 1000000; + long result_sec = from.tv_sec + sec; + long result_nsec = from.tv_nsec + nsec; + + result_sec += result_nsec / 1000000000; + result_nsec = result_nsec % 1000000000; + + tv->tv_sec = result_sec; + tv->tv_usec = (int)(result_nsec / 1000); +} + +static struct timeval +add_ts_to_tv(struct timeval tv, struct timespec ts) +{ + long ts_usec = ts.tv_nsec / 1000; + tv.tv_sec += ts.tv_sec; + tv.tv_usec += ts_usec; + + if (tv.tv_usec >= 1000000) { + tv.tv_sec += tv.tv_usec / 1000000; + tv.tv_usec = tv.tv_usec % 1000000; + } + + return tv; +} + +static VALUE +tv_to_seconds(struct timeval *timeout) { + if (timeout == NULL) return Qnil; + + double seconds = (double)timeout->tv_sec + (double)timeout->tv_usec / 1000000.0; + + return DBL2NUM(seconds); +} + +static int +is_infinity(struct timeval tv) +{ + // { -1, -1 } as infinity + return tv.tv_sec == -1 || tv.tv_usec == -1; +} + +static int +is_timeout_tv(struct timeval *timeout_tv, struct timespec now) { + if (!timeout_tv) return false; + if (timeout_tv->tv_sec == -1 && timeout_tv->tv_usec == -1) return false; + + struct timespec ts; + ts.tv_sec = timeout_tv->tv_sec; + ts.tv_nsec = timeout_tv->tv_usec * 1000; + + if (now.tv_sec > ts.tv_sec) return true; + if (now.tv_sec == ts.tv_sec && now.tv_nsec >= ts.tv_nsec) return true; + return false; +} + +static struct timeval * +select_expires_at( + struct hostname_resolution_store *resolution_store, + struct timeval *resolution_delay, + struct timeval *connection_attempt_delay, + struct timeval *user_specified_resolv_timeout_at, + struct timeval *user_specified_connect_timeout_at +) { + if (any_addrinfos(resolution_store)) { + return resolution_delay ? resolution_delay : connection_attempt_delay; + } + + struct timeval *timeout = NULL; + + if (user_specified_resolv_timeout_at) { + if (is_infinity(*user_specified_resolv_timeout_at)) return NULL; + timeout = user_specified_resolv_timeout_at; + } + + if (user_specified_connect_timeout_at) { + if (is_infinity(*user_specified_connect_timeout_at)) return NULL; + if (!timeout || timercmp(user_specified_connect_timeout_at, timeout, >)) { + return user_specified_connect_timeout_at; + } + } + + return timeout; +} + +static struct timeval +tv_to_timeout(struct timeval *ends_at, struct timespec now) +{ + struct timeval delay; + struct timespec expires_at; + expires_at.tv_sec = ends_at->tv_sec; + expires_at.tv_nsec = ends_at->tv_usec * 1000; + + struct timespec diff; + diff.tv_sec = expires_at.tv_sec - now.tv_sec; + + if (expires_at.tv_nsec >= now.tv_nsec) { + diff.tv_nsec = expires_at.tv_nsec - now.tv_nsec; + } else { + diff.tv_sec -= 1; + diff.tv_nsec = (1000000000 + expires_at.tv_nsec) - now.tv_nsec; + } + + delay.tv_sec = diff.tv_sec; + delay.tv_usec = (int)diff.tv_nsec / 1000; + + return delay; +} + +static struct addrinfo * +pick_addrinfo(struct hostname_resolution_store *resolution_store, int last_family) +{ + int priority_on_v6[2] = { AF_INET6, AF_INET }; + int priority_on_v4[2] = { AF_INET, AF_INET6 }; + int *precedences = last_family == AF_INET6 ? priority_on_v4 : priority_on_v6; + struct addrinfo *selected_ai = NULL; + + for (int i = 0; i < 2; i++) { + if (precedences[i] == AF_INET6) { + selected_ai = resolution_store->v6.ai; + if (selected_ai) { + resolution_store->v6.ai = selected_ai->ai_next; + break; + } + } else { + selected_ai = resolution_store->v4.ai; + if (selected_ai) { + resolution_store->v4.ai = selected_ai->ai_next; + break; + } + } + } + return selected_ai; +} + +static void +socket_nonblock_set(int fd) +{ + int flags = fcntl(fd, F_GETFL); + + if (flags < 0) rb_syserr_fail(errno, "fcntl(2)"); + if ((flags & O_NONBLOCK) != 0) return; + + flags |= O_NONBLOCK; + + if (fcntl(fd, F_SETFL, flags) < 0) rb_syserr_fail(errno, "fcntl(2)"); + return; +} + +static int +in_progress_fds(int fds_size) +{ + return fds_size > 0; +} + +static void +remove_connection_attempt_fd(int *fds, int *fds_size, int removing_fd) { + int i, j; + + for (i = 0; i < *fds_size; i++) { + if (fds[i] != removing_fd) continue; + + for (j = i; j < *fds_size - 1; j++) { + fds[j] = fds[j + 1]; + } + + (*fds_size)--; + fds[*fds_size] = -1; + break; + } +} + +struct fast_fallback_error +{ + int type; + int ecode; +}; + +static VALUE +init_fast_fallback_inetsock_internal(VALUE v) +{ + struct fast_fallback_inetsock_arg *arg = (void *)v; + VALUE io = arg->io; + VALUE resolv_timeout = arg->resolv_timeout; + VALUE connect_timeout = arg->connect_timeout; + VALUE test_mode_settings = arg->test_mode_settings; + struct addrinfo *remote_ai = NULL, *local_ai = NULL; + int connected_fd = -1, status = 0, local_status = 0; + int remote_addrinfo_hints = 0; + struct fast_fallback_error last_error = { 0, 0 }; + const char *syscall = 0; + VALUE host, serv; + + #ifdef HAVE_CONST_AI_ADDRCONFIG + remote_addrinfo_hints |= AI_ADDRCONFIG; + #endif + + pthread_t threads[arg->family_size]; + char resolved_type[2]; + ssize_t resolved_type_size; + int hostname_resolution_waiter = 0, hostname_resolution_notifier = 0; + int pipefd[2]; + fd_set readfds, writefds; + + struct wait_fast_fallback_arg wait_arg; + struct timeval *ends_at = NULL; + struct timeval delay = (struct timeval){ -1, -1 }; + wait_arg.nfds = 0; + wait_arg.writefds = NULL; + wait_arg.status = 0; + + struct hostname_resolution_store resolution_store; + resolution_store.is_all_finised = false; + resolution_store.v6.ai = NULL; + resolution_store.v6.finished = false; + resolution_store.v6.has_error = false; + resolution_store.v4.ai = NULL; + resolution_store.v4.finished = false; + resolution_store.v4.has_error = false; + + int last_family = 0; + + int additional_capacity = 10; + int current_capacity = additional_capacity; + arg->connection_attempt_fds = allocate_connection_attempt_fds(additional_capacity); + arg->connection_attempt_fds_size = 0; + + struct timeval resolution_delay_storage; + struct timeval *resolution_delay_expires_at = NULL; + struct timeval connection_attempt_delay_strage; + struct timeval *connection_attempt_delay_expires_at = NULL; + struct timeval user_specified_resolv_timeout_storage; + struct timeval *user_specified_resolv_timeout_at = NULL; + struct timeval user_specified_connect_timeout_storage; + struct timeval *user_specified_connect_timeout_at = NULL; + struct timespec now = current_clocktime_ts(); + + /* start of hostname resolution */ + if (arg->family_size == 1) { + int family = arg->families[0]; + arg->remote.res = rsock_addrinfo( + arg->remote.host, + arg->remote.serv, + family, + SOCK_STREAM, + 0 + ); + + if (family == AF_INET6) { + resolution_store.v6.ai = arg->remote.res->ai; + resolution_store.v6.finished = true; + resolution_store.v4.finished = true; + } else if (family == AF_INET) { + resolution_store.v4.ai = arg->remote.res->ai; + resolution_store.v4.finished = true; + resolution_store.v6.finished = true; + } + resolution_store.is_all_finised = true; + wait_arg.readfds = NULL; + } else { + if (pipe(pipefd) != 0) rb_syserr_fail(errno, "pipe(2)"); + hostname_resolution_waiter = pipefd[0]; + int waiter_flags = fcntl(hostname_resolution_waiter, F_GETFL, 0); + if (waiter_flags < 0) rb_syserr_fail(errno, "fcntl(2)"); + if ((fcntl(hostname_resolution_waiter, F_SETFL, waiter_flags | O_NONBLOCK)) < 0) { + rb_syserr_fail(errno, "fcntl(2)"); + } + + hostname_resolution_notifier = pipefd[1]; + wait_arg.readfds = &readfds; + + arg->getaddrinfo_shared = allocate_fast_fallback_getaddrinfo_shared(); + if (!arg->getaddrinfo_shared) rb_syserr_fail(errno, "calloc(3)"); + + arg->getaddrinfo_shared->lock = calloc(1, sizeof(rb_nativethread_lock_t)); + if (!arg->getaddrinfo_shared->lock) rb_syserr_fail(errno, "calloc(3)"); + rb_nativethread_lock_initialize(arg->getaddrinfo_shared->lock); + + arg->getaddrinfo_shared->node = arg->hostp ? strdup(arg->hostp) : NULL; + arg->getaddrinfo_shared->service = strdup(arg->portp); + arg->getaddrinfo_shared->refcount = arg->family_size + 1; + arg->getaddrinfo_shared->notify = hostname_resolution_notifier; + arg->getaddrinfo_shared->wait = hostname_resolution_waiter; + arg->getaddrinfo_shared->connection_attempt_fds = arg->connection_attempt_fds; + arg->getaddrinfo_shared->connection_attempt_fds_size = arg->connection_attempt_fds_size; + arg->getaddrinfo_shared->cancelled = &arg->cancelled; + wait_arg.cancelled = &arg->cancelled; + + for (int i = 0; i < arg->family_size; i++) { + arg->getaddrinfo_entries[i] = allocate_fast_fallback_getaddrinfo_entry(); + if (!(arg->getaddrinfo_entries[i])) rb_syserr_fail(errno, "calloc(3)"); + arg->getaddrinfo_entries[i]->shared = arg->getaddrinfo_shared; + + struct addrinfo getaddrinfo_hints[arg->family_size]; + + allocate_fast_fallback_getaddrinfo_hints( + &getaddrinfo_hints[i], + arg->families[i], + remote_addrinfo_hints, + arg->additional_flags + ); + + arg->getaddrinfo_entries[i]->hints = getaddrinfo_hints[i]; + arg->getaddrinfo_entries[i]->ai = NULL; + arg->getaddrinfo_entries[i]->family = arg->families[i]; + arg->getaddrinfo_entries[i]->refcount = 2; + arg->getaddrinfo_entries[i]->has_syserr = false; + arg->getaddrinfo_entries[i]->test_sleep_ms = 0; + arg->getaddrinfo_entries[i]->test_ecode = 0; + + /* for testing HEv2 */ + if (!NIL_P(test_mode_settings) && RB_TYPE_P(test_mode_settings, T_HASH)) { + const char *family_sym = arg->families[i] == AF_INET6 ? "ipv6" : "ipv4"; + + VALUE test_delay_setting = rb_hash_aref(test_mode_settings, ID2SYM(rb_intern("delay"))); + if (!NIL_P(test_delay_setting)) { + VALUE rb_test_delay_ms = rb_hash_aref(test_delay_setting, ID2SYM(rb_intern(family_sym))); + long test_delay_ms = NIL_P(rb_test_delay_ms) ? 0 : rb_test_delay_ms; + arg->getaddrinfo_entries[i]->test_sleep_ms = test_delay_ms; + } + + VALUE test_error_setting = rb_hash_aref(test_mode_settings, ID2SYM(rb_intern("error"))); + if (!NIL_P(test_error_setting)) { + VALUE rb_test_ecode = rb_hash_aref(test_error_setting, ID2SYM(rb_intern(family_sym))); + if (!NIL_P(rb_test_ecode)) { + arg->getaddrinfo_entries[i]->test_ecode = NUM2INT(rb_test_ecode); + } + } + } + + if (raddrinfo_pthread_create(&threads[i], do_fast_fallback_getaddrinfo, arg->getaddrinfo_entries[i]) != 0) { + rsock_raise_resolution_error("getaddrinfo(3)", EAI_AGAIN); + } + pthread_detach(threads[i]); + } + + if (NIL_P(resolv_timeout)) { + user_specified_resolv_timeout_storage = (struct timeval){ -1, -1 }; + } else { + struct timeval resolv_timeout_tv = rb_time_interval(resolv_timeout); + user_specified_resolv_timeout_storage = add_ts_to_tv(resolv_timeout_tv, now); + } + user_specified_resolv_timeout_at = &user_specified_resolv_timeout_storage; + } + + while (true) { + /* start of connection */ + if (any_addrinfos(&resolution_store) && + !resolution_delay_expires_at && + !connection_attempt_delay_expires_at) { + while ((remote_ai = pick_addrinfo(&resolution_store, last_family))) { + int fd = -1; + + #if !defined(INET6) && defined(AF_INET6) + if (remote_ai->ai_family == AF_INET6) { + if (any_addrinfos(&resolution_store)) continue; + if (!in_progress_fds(arg->connection_attempt_fds_size)) break; + if (resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + #endif + + local_ai = NULL; + + if (arg->local.res) { + for (local_ai = arg->local.res->ai; local_ai; local_ai = local_ai->ai_next) { + if (local_ai->ai_family == remote_ai->ai_family) break; + } + if (!local_ai) { + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + /* Use a different family local address if no choice, this + * will cause EAFNOSUPPORT. */ + rsock_syserr_fail_host_port(EAFNOSUPPORT, syscall, arg->local.host, arg->local.serv); + } + } + + status = rsock_socket(remote_ai->ai_family, remote_ai->ai_socktype, remote_ai->ai_protocol); + syscall = "socket(2)"; + + if (status < 0) { + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + + fd = status; + + if (local_ai) { + #if !defined(_WIN32) && !defined(__CYGWIN__) + status = 1; + if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&status, (socklen_t)sizeof(status))) < 0) { + rb_syserr_fail(errno, "setsockopt(2)"); + } + #endif + status = bind(fd, local_ai->ai_addr, local_ai->ai_addrlen); + local_status = status; + syscall = "bind(2)"; + + if (status < 0) { + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + close(fd); + + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + } + + syscall = "connect(2)"; + + if (any_addrinfos(&resolution_store) || + in_progress_fds(arg->connection_attempt_fds_size) || + !resolution_store.is_all_finised) { + socket_nonblock_set(fd); + status = connect(fd, remote_ai->ai_addr, remote_ai->ai_addrlen); + last_family = remote_ai->ai_family; + } else { + if (!NIL_P(connect_timeout)) { + user_specified_connect_timeout_storage = rb_time_interval(connect_timeout); + user_specified_connect_timeout_at = &user_specified_connect_timeout_storage; + } + + VALUE timeout = + (user_specified_connect_timeout_at && is_infinity(*user_specified_connect_timeout_at)) ? + Qnil : tv_to_seconds(user_specified_connect_timeout_at); + io = arg->io = rsock_init_sock(arg->self, fd); + status = rsock_connect(io, remote_ai->ai_addr, remote_ai->ai_addrlen, 0, timeout); + } + + if (status == 0) { + connected_fd = fd; + break; + } + + if (errno == EINPROGRESS) { + if (current_capacity == arg->connection_attempt_fds_size) { + current_capacity = reallocate_connection_attempt_fds( + arg->connection_attempt_fds, + current_capacity, + additional_capacity + ); + } + arg->connection_attempt_fds[arg->connection_attempt_fds_size] = fd; + (arg->connection_attempt_fds_size)++; + wait_arg.writefds = &writefds; + + set_timeout_tv(&connection_attempt_delay_strage, 250, now); + connection_attempt_delay_expires_at = &connection_attempt_delay_strage; + + if (!any_addrinfos(&resolution_store)) { + if (NIL_P(connect_timeout)) { + user_specified_connect_timeout_storage = (struct timeval){ -1, -1 }; + } else { + struct timeval connect_timeout_tv = rb_time_interval(connect_timeout); + user_specified_connect_timeout_storage = add_ts_to_tv(connect_timeout_tv, now); + } + user_specified_connect_timeout_at = &user_specified_connect_timeout_storage; + } + + break; + } + + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + + if (NIL_P(io)) { + close(fd); + } else { + rb_io_close(io); + } + + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + } + + if (connected_fd >= 0) break; + + ends_at = select_expires_at( + &resolution_store, + resolution_delay_expires_at, + connection_attempt_delay_expires_at, + user_specified_resolv_timeout_at, + user_specified_connect_timeout_at + ); + if (ends_at) { + delay = tv_to_timeout(ends_at, now); + wait_arg.delay = &delay; + } else { + wait_arg.delay = NULL; + } + + if (arg->connection_attempt_fds_size) { + FD_ZERO(wait_arg.writefds); + int n = 0; + for (int i = 0; i < arg->connection_attempt_fds_size; i++) { + int cfd = arg->connection_attempt_fds[i]; + if (cfd < 0) continue; + if (cfd > n) n = cfd; + FD_SET(cfd, wait_arg.writefds); + } + if (n > 0) n++; + wait_arg.nfds = n; + } else { + wait_arg.writefds = NULL; + } + + FD_ZERO(wait_arg.readfds); + FD_SET(hostname_resolution_waiter, wait_arg.readfds); + if ((hostname_resolution_waiter + 1) > wait_arg.nfds) { + wait_arg.nfds = hostname_resolution_waiter + 1; + } + + rb_thread_call_without_gvl2( + wait_fast_fallback, + &wait_arg, + cancel_fast_fallback, + arg->getaddrinfo_shared + ); + rb_thread_check_ints(); + if (errno == EINTR || arg->cancelled) break; + + status = wait_arg.status; + syscall = "select(2)"; + + now = current_clocktime_ts(); + if (is_timeout_tv(resolution_delay_expires_at, now)) { + resolution_delay_expires_at = NULL; + } + if (is_timeout_tv(connection_attempt_delay_expires_at, now)) { + connection_attempt_delay_expires_at = NULL; + } + + if (status < 0 && (errno && errno != EINTR)) rb_syserr_fail(errno, "select(2)"); + + if (status > 0) { + /* check for connection */ + for (int i = 0; i < arg->connection_attempt_fds_size; i++) { + int fd = arg->connection_attempt_fds[i]; + if (fd < 0 || !FD_ISSET(fd, wait_arg.writefds)) continue; + + int err; + socklen_t len = sizeof(err); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) { + if (err == 0) { /* success */ + remove_connection_attempt_fd( + arg->connection_attempt_fds, + &arg->connection_attempt_fds_size, + fd + ); + connected_fd = fd; + break; + }; + + /* fail */ + errno = err; + close(fd); + remove_connection_attempt_fd( + arg->connection_attempt_fds, + &arg->connection_attempt_fds_size, + fd + ); + continue; + } + } + + if (connected_fd >= 0) break; + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + + if (any_addrinfos(&resolution_store) || + in_progress_fds(arg->connection_attempt_fds_size) || + !resolution_store.is_all_finised) { + if (!in_progress_fds(arg->connection_attempt_fds_size)) { + user_specified_connect_timeout_at = NULL; + } + } else { + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + + /* check for hostname resolution */ + if (!resolution_store.is_all_finised && FD_ISSET(hostname_resolution_waiter, wait_arg.readfds)) { + while (true) { + resolved_type_size = read( + hostname_resolution_waiter, + resolved_type, + sizeof(resolved_type) - 1 + ); + + if (resolved_type_size > 0) { + resolved_type[resolved_type_size] = '\0'; + + if (resolved_type[0] == IPV6_HOSTNAME_RESOLVED) { + resolution_store.v6.finished = true; + + if (arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err && + arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err != EAI_ADDRFAMILY) { + last_error.type = RESOLUTION_ERROR; + last_error.ecode = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err; + syscall = "getaddrinfo(3)"; + resolution_store.v6.has_error = true; + } else { + resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai; + } + if (resolution_store.v4.finished) { + resolution_store.is_all_finised = true; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + break; + } + } else if (resolved_type[0] == IPV4_HOSTNAME_RESOLVED) { + resolution_store.v4.finished = true; + + if (arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err) { + last_error.type = RESOLUTION_ERROR; + last_error.ecode = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err; + syscall = "getaddrinfo(3)"; + resolution_store.v4.has_error = true; + } else { + resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai; + } + + if (resolution_store.v6.finished) { + resolution_store.is_all_finised = true; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + break; + } + } else { + /* Retry to read from hostname_resolution_waiter */ + } + } else if (resolved_type_size < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + errno = 0; + break; + } else { + /* Retry to read from hostname_resolution_waiter */ + } + + if (!resolution_store.v6.finished && + resolution_store.v4.finished && + !resolution_store.v4.has_error) { + set_timeout_tv(&resolution_delay_storage, 50, now); + resolution_delay_expires_at = &resolution_delay_storage; + } + } + } + + status = wait_arg.status = 0; + } + + if (!any_addrinfos(&resolution_store)) { + if (!in_progress_fds(arg->connection_attempt_fds_size) && + resolution_store.is_all_finised) { + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + + if ((is_timeout_tv(user_specified_resolv_timeout_at, now) || + resolution_store.is_all_finised) && + (is_timeout_tv(user_specified_connect_timeout_at, now) || + !in_progress_fds(arg->connection_attempt_fds_size))) { + VALUE errno_module = rb_const_get(rb_cObject, rb_intern("Errno")); + VALUE etimedout_error = rb_const_get(errno_module, rb_intern("ETIMEDOUT")); + rb_raise(etimedout_error, "user specified timeout"); + } + } + + if (!resolution_store.is_all_finised) { + if (!resolution_store.v6.finished && arg->getaddrinfo_entries[IPV6_ENTRY_POS]->has_syserr) { + resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai; + resolution_store.v6.finished = true; + + if (resolution_store.v4.finished) { + resolution_store.is_all_finised = true; + wait_arg.readfds = NULL; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + } + } + if (!resolution_store.v4.finished && arg->getaddrinfo_entries[IPV4_ENTRY_POS]->has_syserr) { + resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai; + resolution_store.v4.finished = true; + + if (resolution_store.v6.finished) { + resolution_store.is_all_finised = true; + wait_arg.readfds = NULL; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + } else { + set_timeout_tv(&resolution_delay_storage, 50, now); + resolution_delay_expires_at = &resolution_delay_storage; + } + } + } + } + + rb_thread_check_ints(); + + if (NIL_P(arg->io)) { + /* create new instance */ + arg->io = rsock_init_sock(arg->self, connected_fd); + } + + return arg->io; +} + +static VALUE +fast_fallback_inetsock_cleanup(VALUE v) +{ + struct fast_fallback_inetsock_arg *arg = (void *)v; + struct fast_fallback_getaddrinfo_shared *getaddrinfo_shared = arg->getaddrinfo_shared; + + if (arg->remote.res) { + rb_freeaddrinfo(arg->remote.res); + arg->remote.res = 0; + } + if (arg->local.res) { + rb_freeaddrinfo(arg->local.res); + arg->local.res = 0; + } + + if (getaddrinfo_shared) { + int shared_need_free = 0; + int need_free[2] = { 0, 0 }; + + rb_nativethread_lock_lock(getaddrinfo_shared->lock); + { + for (int i = 0; i < arg->family_size; i++) { + if (arg->getaddrinfo_entries[i] && --(arg->getaddrinfo_entries[i]->refcount) == 0) { + need_free[i] = 1; + } + } + if (--(getaddrinfo_shared->refcount) == 0) { + shared_need_free = 1; + } + } + rb_nativethread_lock_unlock(getaddrinfo_shared->lock); + + for (int i = 0; i < arg->family_size; i++) { + if (need_free[i]) free_fast_fallback_getaddrinfo_entry(&arg->getaddrinfo_entries[i]); + } + if (shared_need_free) free_fast_fallback_getaddrinfo_shared(&getaddrinfo_shared); + } + + int connection_attempt_fd; + + for (int i = 0; i < arg->connection_attempt_fds_size; i++) { + connection_attempt_fd = arg->connection_attempt_fds[i]; + + if (connection_attempt_fd >= 0) { + int error = 0; + socklen_t len = sizeof(error); + getsockopt(connection_attempt_fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (error == 0) shutdown(connection_attempt_fd, SHUT_RDWR); + close(connection_attempt_fd); + } + } + + if (arg->connection_attempt_fds) { + free(arg->connection_attempt_fds); + arg->connection_attempt_fds = NULL; + } + + return Qnil; +} + VALUE -rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout) +rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE fast_fallback, VALUE test_mode_settings) { + if (type == INET_CLIENT && FAST_FALLBACK_INIT_INETSOCK_IMPL == 1 && RTEST(fast_fallback)) { + struct rb_addrinfo *local_res = NULL; + char *hostp, *portp; + char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV]; + int additional_flags = 0; + hostp = host_str(remote_host, hbuf, sizeof(hbuf), &additional_flags); + portp = port_str(remote_serv, pbuf, sizeof(pbuf), &additional_flags); + + if (!is_specified_ip_address(hostp)) { + int target_families[2] = { 0, 0 }; + int resolving_family_size = 0; + + /* + * Maybe also accept a local address + */ + if (!NIL_P(local_host) || !NIL_P(local_serv)) { + local_res = rsock_addrinfo( + local_host, + local_serv, + AF_UNSPEC, + SOCK_STREAM, + 0 + ); + + struct addrinfo *tmp_p = local_res->ai; + for (tmp_p; tmp_p != NULL; tmp_p = tmp_p->ai_next) { + if (target_families[0] == 0 && tmp_p->ai_family == AF_INET6) { + target_families[0] = AF_INET6; + resolving_family_size++; + } + if (target_families[1] == 0 && tmp_p->ai_family == AF_INET) { + target_families[1] = AF_INET; + resolving_family_size++; + } + } + } else { + resolving_family_size = 2; + target_families[0] = AF_INET6; + target_families[1] = AF_INET; + } + + struct fast_fallback_inetsock_arg fast_fallback_arg; + memset(&fast_fallback_arg, 0, sizeof(fast_fallback_arg)); + + fast_fallback_arg.self = self; + fast_fallback_arg.io = Qnil; + fast_fallback_arg.remote.host = remote_host; + fast_fallback_arg.remote.serv = remote_serv; + fast_fallback_arg.remote.res = 0; + fast_fallback_arg.local.host = local_host; + fast_fallback_arg.local.serv = local_serv; + fast_fallback_arg.local.res = local_res; + fast_fallback_arg.type = type; + fast_fallback_arg.resolv_timeout = resolv_timeout; + fast_fallback_arg.connect_timeout = connect_timeout; + fast_fallback_arg.hostp = hostp; + fast_fallback_arg.portp = portp; + fast_fallback_arg.additional_flags = additional_flags; + fast_fallback_arg.cancelled = false; + + int resolving_families[resolving_family_size]; + int resolving_family_index = 0; + for (int i = 0; 2 > i; i++) { + if (target_families[i] != 0) { + resolving_families[resolving_family_index] = target_families[i]; + resolving_family_index++; + } + } + fast_fallback_arg.families = resolving_families; + fast_fallback_arg.family_size = resolving_family_size; + fast_fallback_arg.test_mode_settings = test_mode_settings; + + return rb_ensure(init_fast_fallback_inetsock_internal, (VALUE)&fast_fallback_arg, + fast_fallback_inetsock_cleanup, (VALUE)&fast_fallback_arg); + } + } + struct inetsock_arg arg; arg.self = self; arg.io = Qnil; @@ -184,10 +1304,13 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca arg.type = type; arg.resolv_timeout = resolv_timeout; arg.connect_timeout = connect_timeout; + return rb_ensure(init_inetsock_internal, (VALUE)&arg, inetsock_cleanup, (VALUE)&arg); } +#endif + static ID id_numeric, id_hostname; int |