diff options
author | Koichi Sasada <[email protected]> | 2020-03-10 02:22:11 +0900 |
---|---|---|
committer | Koichi Sasada <[email protected]> | 2020-09-03 21:11:06 +0900 |
commit | 79df14c04b452411b9d17e26a398e491bca1a811 (patch) | |
tree | 7598cee0f105439efd5bb328a727b0fe27d7c666 /ractor.rb | |
parent | eeb5325d3bfd71301896360c17e8f51abcb9a7e5 (diff) |
Introduce Ractor mechanism for parallel execution
This commit introduces Ractor mechanism to run Ruby program in
parallel. See doc/ractor.md for more details about Ractor.
See ticket [Feature #17100] to see the implementation details
and discussions.
[Feature #17100]
This commit does not complete the implementation. You can find
many bugs on using Ractor. Also the specification will be changed
so that this feature is experimental. You will see a warning when
you make the first Ractor with `Ractor.new`.
I hope this feature can help programmers from thread-safety issues.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/3365
Diffstat (limited to 'ractor.rb')
-rw-r--r-- | ractor.rb | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/ractor.rb b/ractor.rb new file mode 100644 index 0000000000..893a3f14c6 --- /dev/null +++ b/ractor.rb @@ -0,0 +1,162 @@ +class Ractor + # Create a new Ractor with args and a block. + # args are passed via incoming channel. + # A block (Proc) will be isolated (can't acccess to outer variables) + # + # A ractor has default two channels: + # an incoming channel and an outgoing channel. + # + # Other ractors send objects to the ractor via the incoming channel and + # the ractor receives them. + # The ractor send objects via the outgoing channel and other ractors can + # receive them. + # + # The result of the block is sent via the outgoing channel + # and other + # + # r = Ractor.new do + # Ractor.recv # recv via r's mailbox => 1 + # Ractor.recv # recv via r's mailbox => 2 + # Ractor.yield 3 # yield a message (3) and wait for taking by another ractor. + # 'ok' # the return value will be yielded. + # # and r's incoming/outgoing ports are closed automatically. + # end + # r.send 1 # send a message (1) into r's mailbox. + # r << 2 # << is an alias of `send`. + # p r.take # take a message from r's outgoing port #=> 3 + # p r.take # => 'ok' + # p r.take # raise Ractor::ClosedError + # + # other options: + # name: Ractor's name + # + def self.new *args, name: nil, &block + b = block # TODO: builtin bug + raise ArgumentError, "must be called with a block" unless block + loc = caller_locations(1, 1).first + loc = "#{loc.path}:#{loc.lineno}" + __builtin_ractor_create(loc, name, args, b) + end + + # return current Ractor + def self.current + __builtin_cexpr! %q{ + rb_ec_ractor_ptr(ec)->self + } + end + + def self.count + __builtin_cexpr! %q{ + ULONG2NUM(GET_VM()->ractor.cnt); + } + end + + # Multiplex multiple Ractor communications. + # + # r, obj = Ractor.select(r1, r2) + # #=> wait for taking from r1 or r2 + # # returned obj is a taken object from Ractor r + # + # r, obj = Ractor.select(r1, r2, Ractor.current) + # #=> wait for taking from r1 or r2 + # # or recv from incoming queue + # # If recv is succeed, then obj is received value + # # and r is :recv (Ractor.current) + # + # r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj) + # #=> wait for taking from r1 or r2 + # # or recv from incoming queue + # # or yield (Ractor.yield) obj + # # If yield is succeed, then obj is nil + # # and r is :yield + # + def self.select *ractors, yield_value: yield_unspecified = true, move: false + __builtin_cstmt! %q{ + const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors); + VALUE rv; + VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors), + yield_unspecified == Qtrue ? Qundef : yield_value, + (bool)RTEST(move) ? true : false, &rv); + return rb_ary_new_from_args(2, rv, v); + } + end + + # Receive an incoming message from Ractor's incoming queue. + def self.recv + __builtin_cexpr! %q{ + ractor_recv(ec, rb_ec_ractor_ptr(ec)) + } + end + + private def recv + __builtin_cexpr! %q{ + // TODO: check current actor + ractor_recv(ec, RACTOR_PTR(self)) + } + end + + # Send a message to a Ractor's incoming queue. + # + # # Example: + # r = Ractor.new do + # p Ractor.recv #=> 'ok' + # end + # r.send 'ok' # send to r's incoming queue. + def send obj, move: false + __builtin_cexpr! %q{ + ractor_send(ec, RACTOR_PTR(self), obj, move) + } + end + + # yield a message to the ractor's outgoing port. + def self.yield obj, move: false + __builtin_cexpr! %q{ + ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) + } + end + + # Take a message from ractor's outgoing port. + # + # Example: + # r = Ractor.new{ 'oK' } + # p r.take #=> 'ok' + def take + __builtin_cexpr! %q{ + ractor_take(ec, RACTOR_PTR(self)) + } + end + + alias << send + + def inspect + loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } + name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } + id = __builtin_cexpr! %q{ INT2FIX(RACTOR_PTR(self)->id) } + "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''}>" + end + + def name + __builtin_cexpr! %q{ RACTOR_PTR(self)->name } + end + + class RemoteError + attr_reader :ractor + end + + def close_incoming + __builtin_cexpr! %q{ + ractor_close_incoming(ec, RACTOR_PTR(self)); + } + end + + def close_outgoing + __builtin_cexpr! %q{ + ractor_close_outgoing(ec, RACTOR_PTR(self)); + } + end + + def close + close_incoming + close_outgoing + end +end |