diff options
author | Misaki Shioi <[email protected]> | 2024-11-12 10:06:48 +0900 |
---|---|---|
committer | GitHub <[email protected]> | 2024-11-12 10:06:48 +0900 |
commit | 4c270200dbc2a3a4511e8b793a033078afd6fb31 (patch) | |
tree | 75c802ceff5051e434701bba83dd965c1357a05d /ext/socket | |
parent | 821a5b966fbc2926dc3bf88b6ba09879fa35318e (diff) |
[Feature #120782] Introduction of Happy Eyeballs Version 2 (RFC8305) in TCPSocket.new (#11653)
* Introduction of Happy Eyeballs Version 2 (RFC8305) in TCPSocket.new
This is an implementation of Happy Eyeballs version 2 (RFC 8305) in `TCPSocket.new`.
See https://github.com/ruby/ruby/pull/11653
1. Background
Prior to this implementation, I implemented Happy Eyeballs Version 2 (HEv2) for `Socket.tcp` in https://github.com/ruby/ruby/pull/9374.
HEv2 is an algorithm defined in [RFC 8305](https://datatracker.ietf.org/doc/html/rfc8305), aimed at improving network connectivity.
For more details on the specific cases that HEv2 helps, please refer to https://bugs.ruby-lang.org/issues/20108.
2. Proposal & Outcome
This proposal implements the same HEv2 algorithm in `TCPSocket.new`.
Since `TCPSocket.new` is used more widely than `Socket.tcp`, this change is expected to broaden the impact of HEv2's benefits.
Like `Socket.tcp`, I have also added `fast_fallback` keyword argument to `TCPSocket.new`.
This option is set to true by default, enabling the HEv2 functionality.
However, users can explicitly set it to false to disable HEv2 and use the previous behavior of `TCPSocket.new`.
It should be noted that HEv2 is enabled only in environments where pthreads are available.
This specification follows the approach taken in https://bugs.ruby-lang.org/issues/19965 , where name resolution can be interrupted.
(In environments where pthreads are not available, the `fast_fallback` option is ignored.)
3. Performance
Below is the benchmark of 100 requests to `www.ruby-lang.org` with the fast_fallback option set to true and false, respectively.
While there is a slight performance degradation when HEv2 is enabled, the degradation is smaller compared to that seen in `Socket.tcp`.
```
~/s/build ❯❯❯ ../install/bin/ruby ../ruby/test.rb
Rehearsal --------------------------------------------------------
fast_fallback: true 0.017588 0.097045 0.114633 ( 1.460664)
fast_fallback: false 0.014033 0.078984 0.093017 ( 1.413951)
----------------------------------------------- total: 0.207650sec
user system total real
fast_fallback: true 0.020891 0.124054 0.144945 ( 1.473816)
fast_fallback: false 0.018392 0.110852 0.129244 ( 1.466014)
```
* Update debug prints
Co-authored-by: Nobuyoshi Nakada <[email protected]>
* Remove debug prints
* misc
* Disable HEv2 in Win
* Raise resolution error with hostname resolution
* Fix to handle errors
* Remove warnings
* Errors that do not need to be handled
* misc
* Improve doc
* Fix bug on cancellation
* Avoid EAI_ADDRFAMILY for resolving IPv6
* Follow upstream
* misc
* Refactor connection_attempt_fds management
- Introduced allocate_connection_attempt_fds and reallocate_connection_attempt_fds for improved memory allocation of connection_attempt_fds
- Added remove_connection_attempt_fd to resize connection_attempt_fds dynamically.
- Simplified the in_progress_fds function to only check the size of connection_attempt_fds.
* Rename do_pthread_create to raddrinfo_pthread_create to avoid conflicting
---------
Co-authored-by: Nobuyoshi Nakada <[email protected]>
Notes
Notes:
Merged-By: shioimm <[email protected]>
Diffstat (limited to 'ext/socket')
-rw-r--r-- | ext/socket/ipsocket.c | 1125 | ||||
-rw-r--r-- | ext/socket/raddrinfo.c | 112 | ||||
-rw-r--r-- | ext/socket/rubysocket.h | 41 | ||||
-rw-r--r-- | ext/socket/sockssocket.c | 2 | ||||
-rw-r--r-- | ext/socket/tcpserver.c | 2 | ||||
-rw-r--r-- | ext/socket/tcpsocket.c | 49 |
6 files changed, 1315 insertions, 16 deletions
diff --git a/ext/socket/ipsocket.c b/ext/socket/ipsocket.c index 3de8445435..f3c00b518d 100644 --- a/ext/socket/ipsocket.c +++ b/ext/socket/ipsocket.c @@ -169,9 +169,1129 @@ init_inetsock_internal(VALUE v) return io; } +#if FAST_FALLBACK_INIT_INETSOCK_IMPL == 0 + +VALUE +rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE _fast_fallback, VALUE _test_mode_settings) +{ + struct inetsock_arg arg; + arg.self = self; + arg.io = Qnil; + arg.remote.host = remote_host; + arg.remote.serv = remote_serv; + arg.remote.res = 0; + arg.local.host = local_host; + arg.local.serv = local_serv; + arg.local.res = 0; + arg.type = type; + arg.resolv_timeout = resolv_timeout; + arg.connect_timeout = connect_timeout; + return rb_ensure(init_inetsock_internal, (VALUE)&arg, + inetsock_cleanup, (VALUE)&arg); +} + +#elif FAST_FALLBACK_INIT_INETSOCK_IMPL == 1 + +#define IPV6_ENTRY_POS 0 +#define IPV4_ENTRY_POS 1 +#define RESOLUTION_ERROR 0 +#define SYSCALL_ERROR 1 + +static int +is_specified_ip_address(const char *hostname) +{ + if (!hostname) return false; + + struct in_addr ipv4addr; + struct in6_addr ipv6addr; + + return (inet_pton(AF_INET6, hostname, &ipv6addr) == 1 || + inet_pton(AF_INET, hostname, &ipv4addr) == 1); +} + +struct fast_fallback_inetsock_arg +{ + VALUE self; + VALUE io; + + struct { + VALUE host, serv; + struct rb_addrinfo *res; + } remote, local; + int type; + VALUE resolv_timeout; + VALUE connect_timeout; + + const char *hostp, *portp; + int *families; + int family_size; + int additional_flags; + int cancelled; + rb_nativethread_lock_t *lock; + struct fast_fallback_getaddrinfo_entry *getaddrinfo_entries[2]; + struct fast_fallback_getaddrinfo_shared *getaddrinfo_shared; + int connection_attempt_fds_size; + int *connection_attempt_fds; + VALUE test_mode_settings; +}; + +static struct fast_fallback_getaddrinfo_shared * +allocate_fast_fallback_getaddrinfo_shared(void) +{ + struct fast_fallback_getaddrinfo_shared *shared; + + shared = (struct fast_fallback_getaddrinfo_shared *)calloc( + 1, + sizeof(struct fast_fallback_getaddrinfo_shared) + ); + + return shared; +} + +static struct fast_fallback_getaddrinfo_entry * +allocate_fast_fallback_getaddrinfo_entry(void) +{ + struct fast_fallback_getaddrinfo_entry *entry; + + entry = (struct fast_fallback_getaddrinfo_entry *)calloc( + 1, + sizeof(struct fast_fallback_getaddrinfo_entry) + ); + + return entry; +} + +static void +allocate_fast_fallback_getaddrinfo_hints(struct addrinfo *hints, int family, int remote_addrinfo_hints, int additional_flags) +{ + MEMZERO(hints, struct addrinfo, 1); + hints->ai_family = family; + hints->ai_socktype = SOCK_STREAM; + hints->ai_protocol = IPPROTO_TCP; + hints->ai_flags = remote_addrinfo_hints; + hints->ai_flags |= additional_flags; +} + +static int* +allocate_connection_attempt_fds(int additional_capacity) +{ + int *fds = (int *)malloc(additional_capacity * sizeof(int)); + if (!fds) rb_syserr_fail(errno, "malloc(3)"); + for (int i = 0; i < additional_capacity; i++) fds[i] = -1; + return fds; +} + +static int +reallocate_connection_attempt_fds(int *fds, int current_capacity, int additional_capacity) +{ + int new_capacity = current_capacity + additional_capacity; + + if (realloc(fds, new_capacity * sizeof(int)) == NULL) { + rb_syserr_fail(errno, "realloc(3)"); + } + + for (int i = current_capacity; i < new_capacity; i++) fds[i] = -1; + return new_capacity; +} + +struct wait_fast_fallback_arg +{ + int status, nfds; + fd_set *readfds, *writefds; + struct timeval *delay; + int *cancelled; +}; + +static void * +wait_fast_fallback(void *ptr) +{ + struct wait_fast_fallback_arg *arg = (struct wait_fast_fallback_arg *)ptr; + int status; + status = select(arg->nfds, arg->readfds, arg->writefds, NULL, arg->delay); + arg->status = status; + if (errno == EINTR) *arg->cancelled = true; + return 0; +} + +static void +cancel_fast_fallback(void *ptr) +{ + if (!ptr) return; + + struct fast_fallback_getaddrinfo_shared *arg = (struct fast_fallback_getaddrinfo_shared *)ptr; + + rb_nativethread_lock_lock(arg->lock); + { + *arg->cancelled = true; + char notification = SELECT_CANCELLED; + if ((write(arg->notify, ¬ification, 1)) < 0) { + rb_syserr_fail(errno, "write(2)"); + } + } + rb_nativethread_lock_unlock(arg->lock); +} + +struct hostname_resolution_result +{ + struct addrinfo *ai; + int finished; + int has_error; +}; + +struct hostname_resolution_store +{ + struct hostname_resolution_result v6; + struct hostname_resolution_result v4; + int is_all_finised; +}; + +static int +any_addrinfos(struct hostname_resolution_store *resolution_store) +{ + return resolution_store->v6.ai || resolution_store->v4.ai; +} + +static struct timespec +current_clocktime_ts(void) +{ + struct timespec ts; + if ((clock_gettime(CLOCK_MONOTONIC, &ts)) < 0) { + rb_syserr_fail(errno, "clock_gettime(2)"); + } + return ts; +} + +static void +set_timeout_tv(struct timeval *tv, long ms, struct timespec from) +{ + long sec = ms / 1000; + long nsec = (ms % 1000) * 1000000; + long result_sec = from.tv_sec + sec; + long result_nsec = from.tv_nsec + nsec; + + result_sec += result_nsec / 1000000000; + result_nsec = result_nsec % 1000000000; + + tv->tv_sec = result_sec; + tv->tv_usec = (int)(result_nsec / 1000); +} + +static struct timeval +add_ts_to_tv(struct timeval tv, struct timespec ts) +{ + long ts_usec = ts.tv_nsec / 1000; + tv.tv_sec += ts.tv_sec; + tv.tv_usec += ts_usec; + + if (tv.tv_usec >= 1000000) { + tv.tv_sec += tv.tv_usec / 1000000; + tv.tv_usec = tv.tv_usec % 1000000; + } + + return tv; +} + +static VALUE +tv_to_seconds(struct timeval *timeout) { + if (timeout == NULL) return Qnil; + + double seconds = (double)timeout->tv_sec + (double)timeout->tv_usec / 1000000.0; + + return DBL2NUM(seconds); +} + +static int +is_infinity(struct timeval tv) +{ + // { -1, -1 } as infinity + return tv.tv_sec == -1 || tv.tv_usec == -1; +} + +static int +is_timeout_tv(struct timeval *timeout_tv, struct timespec now) { + if (!timeout_tv) return false; + if (timeout_tv->tv_sec == -1 && timeout_tv->tv_usec == -1) return false; + + struct timespec ts; + ts.tv_sec = timeout_tv->tv_sec; + ts.tv_nsec = timeout_tv->tv_usec * 1000; + + if (now.tv_sec > ts.tv_sec) return true; + if (now.tv_sec == ts.tv_sec && now.tv_nsec >= ts.tv_nsec) return true; + return false; +} + +static struct timeval * +select_expires_at( + struct hostname_resolution_store *resolution_store, + struct timeval *resolution_delay, + struct timeval *connection_attempt_delay, + struct timeval *user_specified_resolv_timeout_at, + struct timeval *user_specified_connect_timeout_at +) { + if (any_addrinfos(resolution_store)) { + return resolution_delay ? resolution_delay : connection_attempt_delay; + } + + struct timeval *timeout = NULL; + + if (user_specified_resolv_timeout_at) { + if (is_infinity(*user_specified_resolv_timeout_at)) return NULL; + timeout = user_specified_resolv_timeout_at; + } + + if (user_specified_connect_timeout_at) { + if (is_infinity(*user_specified_connect_timeout_at)) return NULL; + if (!timeout || timercmp(user_specified_connect_timeout_at, timeout, >)) { + return user_specified_connect_timeout_at; + } + } + + return timeout; +} + +static struct timeval +tv_to_timeout(struct timeval *ends_at, struct timespec now) +{ + struct timeval delay; + struct timespec expires_at; + expires_at.tv_sec = ends_at->tv_sec; + expires_at.tv_nsec = ends_at->tv_usec * 1000; + + struct timespec diff; + diff.tv_sec = expires_at.tv_sec - now.tv_sec; + + if (expires_at.tv_nsec >= now.tv_nsec) { + diff.tv_nsec = expires_at.tv_nsec - now.tv_nsec; + } else { + diff.tv_sec -= 1; + diff.tv_nsec = (1000000000 + expires_at.tv_nsec) - now.tv_nsec; + } + + delay.tv_sec = diff.tv_sec; + delay.tv_usec = (int)diff.tv_nsec / 1000; + + return delay; +} + +static struct addrinfo * +pick_addrinfo(struct hostname_resolution_store *resolution_store, int last_family) +{ + int priority_on_v6[2] = { AF_INET6, AF_INET }; + int priority_on_v4[2] = { AF_INET, AF_INET6 }; + int *precedences = last_family == AF_INET6 ? priority_on_v4 : priority_on_v6; + struct addrinfo *selected_ai = NULL; + + for (int i = 0; i < 2; i++) { + if (precedences[i] == AF_INET6) { + selected_ai = resolution_store->v6.ai; + if (selected_ai) { + resolution_store->v6.ai = selected_ai->ai_next; + break; + } + } else { + selected_ai = resolution_store->v4.ai; + if (selected_ai) { + resolution_store->v4.ai = selected_ai->ai_next; + break; + } + } + } + return selected_ai; +} + +static void +socket_nonblock_set(int fd) +{ + int flags = fcntl(fd, F_GETFL); + + if (flags < 0) rb_syserr_fail(errno, "fcntl(2)"); + if ((flags & O_NONBLOCK) != 0) return; + + flags |= O_NONBLOCK; + + if (fcntl(fd, F_SETFL, flags) < 0) rb_syserr_fail(errno, "fcntl(2)"); + return; +} + +static int +in_progress_fds(int fds_size) +{ + return fds_size > 0; +} + +static void +remove_connection_attempt_fd(int *fds, int *fds_size, int removing_fd) { + int i, j; + + for (i = 0; i < *fds_size; i++) { + if (fds[i] != removing_fd) continue; + + for (j = i; j < *fds_size - 1; j++) { + fds[j] = fds[j + 1]; + } + + (*fds_size)--; + fds[*fds_size] = -1; + break; + } +} + +struct fast_fallback_error +{ + int type; + int ecode; +}; + +static VALUE +init_fast_fallback_inetsock_internal(VALUE v) +{ + struct fast_fallback_inetsock_arg *arg = (void *)v; + VALUE io = arg->io; + VALUE resolv_timeout = arg->resolv_timeout; + VALUE connect_timeout = arg->connect_timeout; + VALUE test_mode_settings = arg->test_mode_settings; + struct addrinfo *remote_ai = NULL, *local_ai = NULL; + int connected_fd = -1, status = 0, local_status = 0; + int remote_addrinfo_hints = 0; + struct fast_fallback_error last_error = { 0, 0 }; + const char *syscall = 0; + VALUE host, serv; + + #ifdef HAVE_CONST_AI_ADDRCONFIG + remote_addrinfo_hints |= AI_ADDRCONFIG; + #endif + + pthread_t threads[arg->family_size]; + char resolved_type[2]; + ssize_t resolved_type_size; + int hostname_resolution_waiter = 0, hostname_resolution_notifier = 0; + int pipefd[2]; + fd_set readfds, writefds; + + struct wait_fast_fallback_arg wait_arg; + struct timeval *ends_at = NULL; + struct timeval delay = (struct timeval){ -1, -1 }; + wait_arg.nfds = 0; + wait_arg.writefds = NULL; + wait_arg.status = 0; + + struct hostname_resolution_store resolution_store; + resolution_store.is_all_finised = false; + resolution_store.v6.ai = NULL; + resolution_store.v6.finished = false; + resolution_store.v6.has_error = false; + resolution_store.v4.ai = NULL; + resolution_store.v4.finished = false; + resolution_store.v4.has_error = false; + + int last_family = 0; + + int additional_capacity = 10; + int current_capacity = additional_capacity; + arg->connection_attempt_fds = allocate_connection_attempt_fds(additional_capacity); + arg->connection_attempt_fds_size = 0; + + struct timeval resolution_delay_storage; + struct timeval *resolution_delay_expires_at = NULL; + struct timeval connection_attempt_delay_strage; + struct timeval *connection_attempt_delay_expires_at = NULL; + struct timeval user_specified_resolv_timeout_storage; + struct timeval *user_specified_resolv_timeout_at = NULL; + struct timeval user_specified_connect_timeout_storage; + struct timeval *user_specified_connect_timeout_at = NULL; + struct timespec now = current_clocktime_ts(); + + /* start of hostname resolution */ + if (arg->family_size == 1) { + int family = arg->families[0]; + arg->remote.res = rsock_addrinfo( + arg->remote.host, + arg->remote.serv, + family, + SOCK_STREAM, + 0 + ); + + if (family == AF_INET6) { + resolution_store.v6.ai = arg->remote.res->ai; + resolution_store.v6.finished = true; + resolution_store.v4.finished = true; + } else if (family == AF_INET) { + resolution_store.v4.ai = arg->remote.res->ai; + resolution_store.v4.finished = true; + resolution_store.v6.finished = true; + } + resolution_store.is_all_finised = true; + wait_arg.readfds = NULL; + } else { + if (pipe(pipefd) != 0) rb_syserr_fail(errno, "pipe(2)"); + hostname_resolution_waiter = pipefd[0]; + int waiter_flags = fcntl(hostname_resolution_waiter, F_GETFL, 0); + if (waiter_flags < 0) rb_syserr_fail(errno, "fcntl(2)"); + if ((fcntl(hostname_resolution_waiter, F_SETFL, waiter_flags | O_NONBLOCK)) < 0) { + rb_syserr_fail(errno, "fcntl(2)"); + } + + hostname_resolution_notifier = pipefd[1]; + wait_arg.readfds = &readfds; + + arg->getaddrinfo_shared = allocate_fast_fallback_getaddrinfo_shared(); + if (!arg->getaddrinfo_shared) rb_syserr_fail(errno, "calloc(3)"); + + arg->getaddrinfo_shared->lock = calloc(1, sizeof(rb_nativethread_lock_t)); + if (!arg->getaddrinfo_shared->lock) rb_syserr_fail(errno, "calloc(3)"); + rb_nativethread_lock_initialize(arg->getaddrinfo_shared->lock); + + arg->getaddrinfo_shared->node = arg->hostp ? strdup(arg->hostp) : NULL; + arg->getaddrinfo_shared->service = strdup(arg->portp); + arg->getaddrinfo_shared->refcount = arg->family_size + 1; + arg->getaddrinfo_shared->notify = hostname_resolution_notifier; + arg->getaddrinfo_shared->wait = hostname_resolution_waiter; + arg->getaddrinfo_shared->connection_attempt_fds = arg->connection_attempt_fds; + arg->getaddrinfo_shared->connection_attempt_fds_size = arg->connection_attempt_fds_size; + arg->getaddrinfo_shared->cancelled = &arg->cancelled; + wait_arg.cancelled = &arg->cancelled; + + for (int i = 0; i < arg->family_size; i++) { + arg->getaddrinfo_entries[i] = allocate_fast_fallback_getaddrinfo_entry(); + if (!(arg->getaddrinfo_entries[i])) rb_syserr_fail(errno, "calloc(3)"); + arg->getaddrinfo_entries[i]->shared = arg->getaddrinfo_shared; + + struct addrinfo getaddrinfo_hints[arg->family_size]; + + allocate_fast_fallback_getaddrinfo_hints( + &getaddrinfo_hints[i], + arg->families[i], + remote_addrinfo_hints, + arg->additional_flags + ); + + arg->getaddrinfo_entries[i]->hints = getaddrinfo_hints[i]; + arg->getaddrinfo_entries[i]->ai = NULL; + arg->getaddrinfo_entries[i]->family = arg->families[i]; + arg->getaddrinfo_entries[i]->refcount = 2; + arg->getaddrinfo_entries[i]->has_syserr = false; + arg->getaddrinfo_entries[i]->test_sleep_ms = 0; + arg->getaddrinfo_entries[i]->test_ecode = 0; + + /* for testing HEv2 */ + if (!NIL_P(test_mode_settings) && RB_TYPE_P(test_mode_settings, T_HASH)) { + const char *family_sym = arg->families[i] == AF_INET6 ? "ipv6" : "ipv4"; + + VALUE test_delay_setting = rb_hash_aref(test_mode_settings, ID2SYM(rb_intern("delay"))); + if (!NIL_P(test_delay_setting)) { + VALUE rb_test_delay_ms = rb_hash_aref(test_delay_setting, ID2SYM(rb_intern(family_sym))); + long test_delay_ms = NIL_P(rb_test_delay_ms) ? 0 : rb_test_delay_ms; + arg->getaddrinfo_entries[i]->test_sleep_ms = test_delay_ms; + } + + VALUE test_error_setting = rb_hash_aref(test_mode_settings, ID2SYM(rb_intern("error"))); + if (!NIL_P(test_error_setting)) { + VALUE rb_test_ecode = rb_hash_aref(test_error_setting, ID2SYM(rb_intern(family_sym))); + if (!NIL_P(rb_test_ecode)) { + arg->getaddrinfo_entries[i]->test_ecode = NUM2INT(rb_test_ecode); + } + } + } + + if (raddrinfo_pthread_create(&threads[i], do_fast_fallback_getaddrinfo, arg->getaddrinfo_entries[i]) != 0) { + rsock_raise_resolution_error("getaddrinfo(3)", EAI_AGAIN); + } + pthread_detach(threads[i]); + } + + if (NIL_P(resolv_timeout)) { + user_specified_resolv_timeout_storage = (struct timeval){ -1, -1 }; + } else { + struct timeval resolv_timeout_tv = rb_time_interval(resolv_timeout); + user_specified_resolv_timeout_storage = add_ts_to_tv(resolv_timeout_tv, now); + } + user_specified_resolv_timeout_at = &user_specified_resolv_timeout_storage; + } + + while (true) { + /* start of connection */ + if (any_addrinfos(&resolution_store) && + !resolution_delay_expires_at && + !connection_attempt_delay_expires_at) { + while ((remote_ai = pick_addrinfo(&resolution_store, last_family))) { + int fd = -1; + + #if !defined(INET6) && defined(AF_INET6) + if (remote_ai->ai_family == AF_INET6) { + if (any_addrinfos(&resolution_store)) continue; + if (!in_progress_fds(arg->connection_attempt_fds_size)) break; + if (resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + #endif + + local_ai = NULL; + + if (arg->local.res) { + for (local_ai = arg->local.res->ai; local_ai; local_ai = local_ai->ai_next) { + if (local_ai->ai_family == remote_ai->ai_family) break; + } + if (!local_ai) { + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + /* Use a different family local address if no choice, this + * will cause EAFNOSUPPORT. */ + rsock_syserr_fail_host_port(EAFNOSUPPORT, syscall, arg->local.host, arg->local.serv); + } + } + + status = rsock_socket(remote_ai->ai_family, remote_ai->ai_socktype, remote_ai->ai_protocol); + syscall = "socket(2)"; + + if (status < 0) { + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + + fd = status; + + if (local_ai) { + #if !defined(_WIN32) && !defined(__CYGWIN__) + status = 1; + if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&status, (socklen_t)sizeof(status))) < 0) { + rb_syserr_fail(errno, "setsockopt(2)"); + } + #endif + status = bind(fd, local_ai->ai_addr, local_ai->ai_addrlen); + local_status = status; + syscall = "bind(2)"; + + if (status < 0) { + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + close(fd); + + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + } + + syscall = "connect(2)"; + + if (any_addrinfos(&resolution_store) || + in_progress_fds(arg->connection_attempt_fds_size) || + !resolution_store.is_all_finised) { + socket_nonblock_set(fd); + status = connect(fd, remote_ai->ai_addr, remote_ai->ai_addrlen); + last_family = remote_ai->ai_family; + } else { + if (!NIL_P(connect_timeout)) { + user_specified_connect_timeout_storage = rb_time_interval(connect_timeout); + user_specified_connect_timeout_at = &user_specified_connect_timeout_storage; + } + + VALUE timeout = + (user_specified_connect_timeout_at && is_infinity(*user_specified_connect_timeout_at)) ? + Qnil : tv_to_seconds(user_specified_connect_timeout_at); + io = arg->io = rsock_init_sock(arg->self, fd); + status = rsock_connect(io, remote_ai->ai_addr, remote_ai->ai_addrlen, 0, timeout); + } + + if (status == 0) { + connected_fd = fd; + break; + } + + if (errno == EINPROGRESS) { + if (current_capacity == arg->connection_attempt_fds_size) { + current_capacity = reallocate_connection_attempt_fds( + arg->connection_attempt_fds, + current_capacity, + additional_capacity + ); + } + arg->connection_attempt_fds[arg->connection_attempt_fds_size] = fd; + (arg->connection_attempt_fds_size)++; + wait_arg.writefds = &writefds; + + set_timeout_tv(&connection_attempt_delay_strage, 250, now); + connection_attempt_delay_expires_at = &connection_attempt_delay_strage; + + if (!any_addrinfos(&resolution_store)) { + if (NIL_P(connect_timeout)) { + user_specified_connect_timeout_storage = (struct timeval){ -1, -1 }; + } else { + struct timeval connect_timeout_tv = rb_time_interval(connect_timeout); + user_specified_connect_timeout_storage = add_ts_to_tv(connect_timeout_tv, now); + } + user_specified_connect_timeout_at = &user_specified_connect_timeout_storage; + } + + break; + } + + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + + if (NIL_P(io)) { + close(fd); + } else { + rb_io_close(io); + } + + if (any_addrinfos(&resolution_store)) continue; + if (in_progress_fds(arg->connection_attempt_fds_size)) break; + if (!resolution_store.is_all_finised) break; + + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + } + + if (connected_fd >= 0) break; + + ends_at = select_expires_at( + &resolution_store, + resolution_delay_expires_at, + connection_attempt_delay_expires_at, + user_specified_resolv_timeout_at, + user_specified_connect_timeout_at + ); + if (ends_at) { + delay = tv_to_timeout(ends_at, now); + wait_arg.delay = &delay; + } else { + wait_arg.delay = NULL; + } + + if (arg->connection_attempt_fds_size) { + FD_ZERO(wait_arg.writefds); + int n = 0; + for (int i = 0; i < arg->connection_attempt_fds_size; i++) { + int cfd = arg->connection_attempt_fds[i]; + if (cfd < 0) continue; + if (cfd > n) n = cfd; + FD_SET(cfd, wait_arg.writefds); + } + if (n > 0) n++; + wait_arg.nfds = n; + } else { + wait_arg.writefds = NULL; + } + + FD_ZERO(wait_arg.readfds); + FD_SET(hostname_resolution_waiter, wait_arg.readfds); + if ((hostname_resolution_waiter + 1) > wait_arg.nfds) { + wait_arg.nfds = hostname_resolution_waiter + 1; + } + + rb_thread_call_without_gvl2( + wait_fast_fallback, + &wait_arg, + cancel_fast_fallback, + arg->getaddrinfo_shared + ); + rb_thread_check_ints(); + if (errno == EINTR || arg->cancelled) break; + + status = wait_arg.status; + syscall = "select(2)"; + + now = current_clocktime_ts(); + if (is_timeout_tv(resolution_delay_expires_at, now)) { + resolution_delay_expires_at = NULL; + } + if (is_timeout_tv(connection_attempt_delay_expires_at, now)) { + connection_attempt_delay_expires_at = NULL; + } + + if (status < 0 && (errno && errno != EINTR)) rb_syserr_fail(errno, "select(2)"); + + if (status > 0) { + /* check for connection */ + for (int i = 0; i < arg->connection_attempt_fds_size; i++) { + int fd = arg->connection_attempt_fds[i]; + if (fd < 0 || !FD_ISSET(fd, wait_arg.writefds)) continue; + + int err; + socklen_t len = sizeof(err); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) { + if (err == 0) { /* success */ + remove_connection_attempt_fd( + arg->connection_attempt_fds, + &arg->connection_attempt_fds_size, + fd + ); + connected_fd = fd; + break; + }; + + /* fail */ + errno = err; + close(fd); + remove_connection_attempt_fd( + arg->connection_attempt_fds, + &arg->connection_attempt_fds_size, + fd + ); + continue; + } + } + + if (connected_fd >= 0) break; + last_error.type = SYSCALL_ERROR; + last_error.ecode = errno; + + if (any_addrinfos(&resolution_store) || + in_progress_fds(arg->connection_attempt_fds_size) || + !resolution_store.is_all_finised) { + if (!in_progress_fds(arg->connection_attempt_fds_size)) { + user_specified_connect_timeout_at = NULL; + } + } else { + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + + /* check for hostname resolution */ + if (!resolution_store.is_all_finised && FD_ISSET(hostname_resolution_waiter, wait_arg.readfds)) { + while (true) { + resolved_type_size = read( + hostname_resolution_waiter, + resolved_type, + sizeof(resolved_type) - 1 + ); + + if (resolved_type_size > 0) { + resolved_type[resolved_type_size] = '\0'; + + if (resolved_type[0] == IPV6_HOSTNAME_RESOLVED) { + resolution_store.v6.finished = true; + + if (arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err && + arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err != EAI_ADDRFAMILY) { + last_error.type = RESOLUTION_ERROR; + last_error.ecode = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err; + syscall = "getaddrinfo(3)"; + resolution_store.v6.has_error = true; + } else { + resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai; + } + if (resolution_store.v4.finished) { + resolution_store.is_all_finised = true; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + break; + } + } else if (resolved_type[0] == IPV4_HOSTNAME_RESOLVED) { + resolution_store.v4.finished = true; + + if (arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err) { + last_error.type = RESOLUTION_ERROR; + last_error.ecode = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err; + syscall = "getaddrinfo(3)"; + resolution_store.v4.has_error = true; + } else { + resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai; + } + + if (resolution_store.v6.finished) { + resolution_store.is_all_finised = true; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + break; + } + } else { + /* Retry to read from hostname_resolution_waiter */ + } + } else if (resolved_type_size < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + errno = 0; + break; + } else { + /* Retry to read from hostname_resolution_waiter */ + } + + if (!resolution_store.v6.finished && + resolution_store.v4.finished && + !resolution_store.v4.has_error) { + set_timeout_tv(&resolution_delay_storage, 50, now); + resolution_delay_expires_at = &resolution_delay_storage; + } + } + } + + status = wait_arg.status = 0; + } + + if (!any_addrinfos(&resolution_store)) { + if (!in_progress_fds(arg->connection_attempt_fds_size) && + resolution_store.is_all_finised) { + if (local_status < 0) { + host = arg->local.host; + serv = arg->local.serv; + } else { + host = arg->remote.host; + serv = arg->remote.serv; + } + if (last_error.type == RESOLUTION_ERROR) { + rsock_raise_resolution_error(syscall, last_error.ecode); + } else { + rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv); + } + } + + if ((is_timeout_tv(user_specified_resolv_timeout_at, now) || + resolution_store.is_all_finised) && + (is_timeout_tv(user_specified_connect_timeout_at, now) || + !in_progress_fds(arg->connection_attempt_fds_size))) { + VALUE errno_module = rb_const_get(rb_cObject, rb_intern("Errno")); + VALUE etimedout_error = rb_const_get(errno_module, rb_intern("ETIMEDOUT")); + rb_raise(etimedout_error, "user specified timeout"); + } + } + + if (!resolution_store.is_all_finised) { + if (!resolution_store.v6.finished && arg->getaddrinfo_entries[IPV6_ENTRY_POS]->has_syserr) { + resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai; + resolution_store.v6.finished = true; + + if (resolution_store.v4.finished) { + resolution_store.is_all_finised = true; + wait_arg.readfds = NULL; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + } + } + if (!resolution_store.v4.finished && arg->getaddrinfo_entries[IPV4_ENTRY_POS]->has_syserr) { + resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai; + resolution_store.v4.finished = true; + + if (resolution_store.v6.finished) { + resolution_store.is_all_finised = true; + wait_arg.readfds = NULL; + resolution_delay_expires_at = NULL; + user_specified_resolv_timeout_at = NULL; + } else { + set_timeout_tv(&resolution_delay_storage, 50, now); + resolution_delay_expires_at = &resolution_delay_storage; + } + } + } + } + + rb_thread_check_ints(); + + if (NIL_P(arg->io)) { + /* create new instance */ + arg->io = rsock_init_sock(arg->self, connected_fd); + } + + return arg->io; +} + +static VALUE +fast_fallback_inetsock_cleanup(VALUE v) +{ + struct fast_fallback_inetsock_arg *arg = (void *)v; + struct fast_fallback_getaddrinfo_shared *getaddrinfo_shared = arg->getaddrinfo_shared; + + if (arg->remote.res) { + rb_freeaddrinfo(arg->remote.res); + arg->remote.res = 0; + } + if (arg->local.res) { + rb_freeaddrinfo(arg->local.res); + arg->local.res = 0; + } + + if (getaddrinfo_shared) { + int shared_need_free = 0; + int need_free[2] = { 0, 0 }; + + rb_nativethread_lock_lock(getaddrinfo_shared->lock); + { + for (int i = 0; i < arg->family_size; i++) { + if (arg->getaddrinfo_entries[i] && --(arg->getaddrinfo_entries[i]->refcount) == 0) { + need_free[i] = 1; + } + } + if (--(getaddrinfo_shared->refcount) == 0) { + shared_need_free = 1; + } + } + rb_nativethread_lock_unlock(getaddrinfo_shared->lock); + + for (int i = 0; i < arg->family_size; i++) { + if (need_free[i]) free_fast_fallback_getaddrinfo_entry(&arg->getaddrinfo_entries[i]); + } + if (shared_need_free) free_fast_fallback_getaddrinfo_shared(&getaddrinfo_shared); + } + + int connection_attempt_fd; + + for (int i = 0; i < arg->connection_attempt_fds_size; i++) { + connection_attempt_fd = arg->connection_attempt_fds[i]; + + if (connection_attempt_fd >= 0) { + int error = 0; + socklen_t len = sizeof(error); + getsockopt(connection_attempt_fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (error == 0) shutdown(connection_attempt_fd, SHUT_RDWR); + close(connection_attempt_fd); + } + } + + if (arg->connection_attempt_fds) { + free(arg->connection_attempt_fds); + arg->connection_attempt_fds = NULL; + } + + return Qnil; +} + VALUE -rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout) +rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE fast_fallback, VALUE test_mode_settings) { + if (type == INET_CLIENT && FAST_FALLBACK_INIT_INETSOCK_IMPL == 1 && RTEST(fast_fallback)) { + struct rb_addrinfo *local_res = NULL; + char *hostp, *portp; + char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV]; + int additional_flags = 0; + hostp = host_str(remote_host, hbuf, sizeof(hbuf), &additional_flags); + portp = port_str(remote_serv, pbuf, sizeof(pbuf), &additional_flags); + + if (!is_specified_ip_address(hostp)) { + int target_families[2] = { 0, 0 }; + int resolving_family_size = 0; + + /* + * Maybe also accept a local address + */ + if (!NIL_P(local_host) || !NIL_P(local_serv)) { + local_res = rsock_addrinfo( + local_host, + local_serv, + AF_UNSPEC, + SOCK_STREAM, + 0 + ); + + struct addrinfo *tmp_p = local_res->ai; + for (tmp_p; tmp_p != NULL; tmp_p = tmp_p->ai_next) { + if (target_families[0] == 0 && tmp_p->ai_family == AF_INET6) { + target_families[0] = AF_INET6; + resolving_family_size++; + } + if (target_families[1] == 0 && tmp_p->ai_family == AF_INET) { + target_families[1] = AF_INET; + resolving_family_size++; + } + } + } else { + resolving_family_size = 2; + target_families[0] = AF_INET6; + target_families[1] = AF_INET; + } + + struct fast_fallback_inetsock_arg fast_fallback_arg; + memset(&fast_fallback_arg, 0, sizeof(fast_fallback_arg)); + + fast_fallback_arg.self = self; + fast_fallback_arg.io = Qnil; + fast_fallback_arg.remote.host = remote_host; + fast_fallback_arg.remote.serv = remote_serv; + fast_fallback_arg.remote.res = 0; + fast_fallback_arg.local.host = local_host; + fast_fallback_arg.local.serv = local_serv; + fast_fallback_arg.local.res = local_res; + fast_fallback_arg.type = type; + fast_fallback_arg.resolv_timeout = resolv_timeout; + fast_fallback_arg.connect_timeout = connect_timeout; + fast_fallback_arg.hostp = hostp; + fast_fallback_arg.portp = portp; + fast_fallback_arg.additional_flags = additional_flags; + fast_fallback_arg.cancelled = false; + + int resolving_families[resolving_family_size]; + int resolving_family_index = 0; + for (int i = 0; 2 > i; i++) { + if (target_families[i] != 0) { + resolving_families[resolving_family_index] = target_families[i]; + resolving_family_index++; + } + } + fast_fallback_arg.families = resolving_families; + fast_fallback_arg.family_size = resolving_family_size; + fast_fallback_arg.test_mode_settings = test_mode_settings; + + return rb_ensure(init_fast_fallback_inetsock_internal, (VALUE)&fast_fallback_arg, + fast_fallback_inetsock_cleanup, (VALUE)&fast_fallback_arg); + } + } + struct inetsock_arg arg; arg.self = self; arg.io = Qnil; @@ -184,10 +1304,13 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca arg.type = type; arg.resolv_timeout = resolv_timeout; arg.connect_timeout = connect_timeout; + return rb_ensure(init_inetsock_internal, (VALUE)&arg, inetsock_cleanup, (VALUE)&arg); } +#endif + static ID id_numeric, id_hostname; int diff --git a/ext/socket/raddrinfo.c b/ext/socket/raddrinfo.c index 7ec700075e..a7cf1211ba 100644 --- a/ext/socket/raddrinfo.c +++ b/ext/socket/raddrinfo.c @@ -469,8 +469,8 @@ cancel_getaddrinfo(void *ptr) rb_nativethread_lock_unlock(&arg->lock); } -static int -do_pthread_create(pthread_t *th, void *(*start_routine) (void *), void *arg) +int +raddrinfo_pthread_create(pthread_t *th, void *(*start_routine) (void *), void *arg) { int limit = 3, ret; do { @@ -505,7 +505,7 @@ start: } pthread_t th; - if (do_pthread_create(&th, fork_safe_do_getaddrinfo, arg) != 0) { + if (raddrinfo_pthread_create(&th, fork_safe_do_getaddrinfo, arg) != 0) { int err = errno; free_getaddrinfo_arg(arg); errno = err; @@ -726,7 +726,7 @@ start: } pthread_t th; - if (do_pthread_create(&th, do_getnameinfo, arg) != 0) { + if (raddrinfo_pthread_create(&th, do_getnameinfo, arg) != 0) { int err = errno; free_getnameinfo_arg(arg); errno = err; @@ -822,7 +822,7 @@ str_is_number(const char *p) ((ptr)[0] == name[0] && \ rb_strlen_lit(name) == (len) && memcmp(ptr, name, len) == 0) -static char* +char* host_str(VALUE host, char *hbuf, size_t hbuflen, int *flags_ptr) { if (NIL_P(host)) { @@ -861,7 +861,7 @@ host_str(VALUE host, char *hbuf, size_t hbuflen, int *flags_ptr) } } -static char* +char* port_str(VALUE port, char *pbuf, size_t pbuflen, int *flags_ptr) { if (NIL_P(port)) { @@ -3024,6 +3024,106 @@ rsock_io_socket_addrinfo(VALUE io, struct sockaddr *addr, socklen_t len) UNREACHABLE_RETURN(Qnil); } +#if FAST_FALLBACK_INIT_INETSOCK_IMPL == 1 + +void +free_fast_fallback_getaddrinfo_shared(struct fast_fallback_getaddrinfo_shared **shared) +{ + free((*shared)->node); + (*shared)->node = NULL; + free((*shared)->service); + (*shared)->service = NULL; + close((*shared)->notify); + close((*shared)->wait); + rb_nativethread_lock_destroy((*shared)->lock); + free(*shared); + *shared = NULL; +} + +void +free_fast_fallback_getaddrinfo_entry(struct fast_fallback_getaddrinfo_entry **entry) +{ + if ((*entry)->ai) { + freeaddrinfo((*entry)->ai); + (*entry)->ai = NULL; + } + free(*entry); + *entry = NULL; +} + +void * +do_fast_fallback_getaddrinfo(void *ptr) +{ + struct fast_fallback_getaddrinfo_entry *entry = (struct fast_fallback_getaddrinfo_entry *)ptr; + struct fast_fallback_getaddrinfo_shared *shared = entry->shared; + int err = 0, need_free = 0, shared_need_free = 0; + + err = numeric_getaddrinfo(shared->node, shared->service, &entry->hints, &entry->ai); + + if (err != 0) { + err = getaddrinfo(shared->node, shared->service, &entry->hints, &entry->ai); + #ifdef __linux__ + /* On Linux (mainly Ubuntu 13.04) /etc/nsswitch.conf has mdns4 and + * it cause getaddrinfo to return EAI_SYSTEM/ENOENT. [ruby-list:49420] + */ + if (err == EAI_SYSTEM && errno == ENOENT) + err = EAI_NONAME; + #endif + } + + /* for testing HEv2 */ + if (entry->test_sleep_ms > 0) { + struct timespec sleep_ts; + sleep_ts.tv_sec = entry->test_sleep_ms / 1000; + sleep_ts.tv_nsec = (entry->test_sleep_ms % 1000) * 1000000L; + if (sleep_ts.tv_nsec >= 1000000000L) { + sleep_ts.tv_sec += sleep_ts.tv_nsec / 1000000000L; + sleep_ts.tv_nsec = sleep_ts.tv_nsec % 1000000000L; + } + nanosleep(&sleep_ts, NULL); + } + if (entry->test_ecode != 0) { + err = entry->test_ecode; + if (entry->ai) { + freeaddrinfo(entry->ai); + entry->ai = NULL; + } + } + + rb_nativethread_lock_lock(shared->lock); + { + entry->err = err; + if (*shared->cancelled) { + if (entry->ai) { + freeaddrinfo(entry->ai); + entry->ai = NULL; + } + } else { + const char notification = entry->family == AF_INET6 ? + IPV6_HOSTNAME_RESOLVED : IPV4_HOSTNAME_RESOLVED; + + if ((write(shared->notify, ¬ification, strlen(¬ification))) < 0) { + entry->err = errno; + entry->has_syserr = true; + } + } + if (--(entry->refcount) == 0) need_free = 1; + if (--(shared->refcount) == 0) shared_need_free = 1; + } + rb_nativethread_lock_unlock(shared->lock); + + if (need_free && entry) { + free_fast_fallback_getaddrinfo_entry(&entry); + } + if (shared_need_free && shared) { + free_fast_fallback_getaddrinfo_shared(&shared); + } + + return 0; +} + +#endif + /* * Addrinfo class */ diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index b4daa66071..97f3bc55d7 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -354,7 +354,7 @@ int rsock_socket(int domain, int type, int proto); int rsock_detect_cloexec(int fd); VALUE rsock_init_sock(VALUE sock, int fd); VALUE rsock_sock_s_socketpair(int argc, VALUE *argv, VALUE klass); -VALUE rsock_init_inetsock(VALUE sock, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout); +VALUE rsock_init_inetsock(VALUE sock, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE fast_fallback, VALUE test_mode_settings); VALUE rsock_init_unixsock(VALUE sock, VALUE path, int server); struct rsock_send_arg { @@ -413,6 +413,45 @@ ssize_t rsock_recvmsg(int socket, struct msghdr *message, int flags); void rsock_discard_cmsg_resource(struct msghdr *mh, int msg_peek_p); #endif +char *host_str(VALUE host, char *hbuf, size_t hbuflen, int *flags_ptr); +char *port_str(VALUE port, char *pbuf, size_t pbuflen, int *flags_ptr); + +#ifndef FAST_FALLBACK_INIT_INETSOCK_IMPL +# if !defined(HAVE_PTHREAD_CREATE) || !defined(HAVE_PTHREAD_DETACH) || defined(__MINGW32__) || defined(__MINGW64__) +# define FAST_FALLBACK_INIT_INETSOCK_IMPL 0 +# else +# include "ruby/thread_native.h" +# define FAST_FALLBACK_INIT_INETSOCK_IMPL 1 +# define IPV6_HOSTNAME_RESOLVED '1' +# define IPV4_HOSTNAME_RESOLVED '2' +# define SELECT_CANCELLED '3' + +struct fast_fallback_getaddrinfo_shared +{ + int wait, notify, refcount, connection_attempt_fds_size; + int *connection_attempt_fds, *cancelled; + char *node, *service; + rb_nativethread_lock_t *lock; +}; + +struct fast_fallback_getaddrinfo_entry +{ + int family, err, refcount; + struct addrinfo hints; + struct addrinfo *ai; + struct fast_fallback_getaddrinfo_shared *shared; + int has_syserr; + long test_sleep_ms; + int test_ecode; +}; + +int raddrinfo_pthread_create(pthread_t *th, void *(*start_routine) (void *), void *arg); +void *do_fast_fallback_getaddrinfo(void *ptr); +void free_fast_fallback_getaddrinfo_entry(struct fast_fallback_getaddrinfo_entry **entry); +void free_fast_fallback_getaddrinfo_shared(struct fast_fallback_getaddrinfo_shared **shared); +# endif +#endif + void rsock_init_basicsocket(void); void rsock_init_ipsocket(void); void rsock_init_tcpsocket(void); diff --git a/ext/socket/sockssocket.c b/ext/socket/sockssocket.c index 1e94186cd0..1031812bef 100644 --- a/ext/socket/sockssocket.c +++ b/ext/socket/sockssocket.c @@ -34,7 +34,7 @@ socks_init(VALUE sock, VALUE host, VALUE port) init = 1; } - return rsock_init_inetsock(sock, host, port, Qnil, Qnil, INET_SOCKS, Qnil, Qnil); + return rsock_init_inetsock(sock, host, port, Qnil, Qnil, INET_SOCKS, Qnil, Qnil, Qfalse, Qnil); } #ifdef SOCKS5 diff --git a/ext/socket/tcpserver.c b/ext/socket/tcpserver.c index 04e5a0bb51..8206fe46a9 100644 --- a/ext/socket/tcpserver.c +++ b/ext/socket/tcpserver.c @@ -36,7 +36,7 @@ tcp_svr_init(int argc, VALUE *argv, VALUE sock) VALUE hostname, port; rb_scan_args(argc, argv, "011", &hostname, &port); - return rsock_init_inetsock(sock, hostname, port, Qnil, Qnil, INET_SERVER, Qnil, Qnil); + return rsock_init_inetsock(sock, hostname, port, Qnil, Qnil, INET_SERVER, Qnil, Qnil, Qfalse, Qnil); } /* diff --git a/ext/socket/tcpsocket.c b/ext/socket/tcpsocket.c index 03787272f3..3c03e89e0d 100644 --- a/ext/socket/tcpsocket.c +++ b/ext/socket/tcpsocket.c @@ -12,13 +12,43 @@ /* * call-seq: - * TCPSocket.new(remote_host, remote_port, local_host=nil, local_port=nil, connect_timeout: nil) + * TCPSocket.new(remote_host, remote_port, local_host=nil, local_port=nil, resolv_timeout: nil, connect_timeout: nil, fast_fallback: true) * * Opens a TCP connection to +remote_host+ on +remote_port+. If +local_host+ * and +local_port+ are specified, then those parameters are used on the local * end to establish the connection. * - * [:connect_timeout] specify the timeout in seconds. + * Starting from Ruby 3.4, this method operates according to the + * Happy Eyeballs Version 2 ({RFC 8305}[https://datatracker.ietf.org/doc/html/rfc8305]) + * algorithm by default, except on Windows. + * + * To make it behave the same as in Ruby 3.3 and earlier, + * explicitly specify the option +fast_fallback:false+. + * + * Happy Eyeballs Version 2 is not provided on Windows, + * and it behaves the same as in Ruby 3.3 and earlier. + * + * [:resolv_timeout] Specifies the timeout in seconds from when the hostname resolution starts. + * [:connect_timeout] This method sequentially attempts connecting to all candidate destination addresses.<br>The +connect_timeout+ specifies the timeout in seconds from the start of the connection attempt to the last candidate.<br>By default, all connection attempts continue until the timeout occurs.<br>When +fast_fallback:false+ is explicitly specified,<br>a timeout is set for each connection attempt and any connection attempt that exceeds its timeout will be canceled. + * [:fast_fallback] Enables the Happy Eyeballs Version 2 algorithm (enabled by default). + * + * === Happy Eyeballs Version 2 + * Happy Eyeballs Version 2 ({RFC 8305}[https://datatracker.ietf.org/doc/html/rfc8305]) + * is an algorithm designed to improve client socket connectivity.<br> + * It aims for more reliable and efficient connections by performing hostname resolution + * and connection attempts in parallel, instead of serially. + * + * Starting from Ruby 3.4, this method operates as follows with this algorithm except on Windows: + * + * 1. Start resolving both IPv6 and IPv4 addresses concurrently. + * 2. Start connecting to the one of the addresses that are obtained first.<br>If IPv4 addresses are obtained first, + * the method waits 50 ms for IPv6 name resolution to prioritize IPv6 connections. + * 3. After starting a connection attempt, wait 250 ms for the connection to be established.<br> + * If no connection is established within this time, a new connection is started every 250 ms<br> + * until a connection is established or there are no more candidate addresses.<br> + * (Although RFC 8305 strictly specifies sorting addresses,<br> + * this method only alternates between IPv6 / IPv4 addresses due to the performance concerns) + * 4. Once a connection is established, all remaining connection attempts are canceled. */ static VALUE tcp_init(int argc, VALUE *argv, VALUE sock) @@ -26,28 +56,35 @@ tcp_init(int argc, VALUE *argv, VALUE sock) VALUE remote_host, remote_serv; VALUE local_host, local_serv; VALUE opt; - static ID keyword_ids[2]; - VALUE kwargs[2]; + static ID keyword_ids[4]; + VALUE kwargs[4]; VALUE resolv_timeout = Qnil; VALUE connect_timeout = Qnil; + VALUE fast_fallback = Qtrue; + VALUE test_mode_settings = Qnil; if (!keyword_ids[0]) { CONST_ID(keyword_ids[0], "resolv_timeout"); CONST_ID(keyword_ids[1], "connect_timeout"); + CONST_ID(keyword_ids[2], "fast_fallback"); + CONST_ID(keyword_ids[3], "test_mode_settings"); } rb_scan_args(argc, argv, "22:", &remote_host, &remote_serv, &local_host, &local_serv, &opt); if (!NIL_P(opt)) { - rb_get_kwargs(opt, keyword_ids, 0, 2, kwargs); + rb_get_kwargs(opt, keyword_ids, 0, 4, kwargs); if (kwargs[0] != Qundef) { resolv_timeout = kwargs[0]; } if (kwargs[1] != Qundef) { connect_timeout = kwargs[1]; } + if (kwargs[2] != Qundef) { fast_fallback = kwargs[2]; } + if (kwargs[3] != Qundef) { test_mode_settings = kwargs[3]; } } return rsock_init_inetsock(sock, remote_host, remote_serv, local_host, local_serv, INET_CLIENT, - resolv_timeout, connect_timeout); + resolv_timeout, connect_timeout, fast_fallback, + test_mode_settings); } static VALUE |