[#44036] [ruby-trunk - Feature #6242][Open] Ruby should support lists — "shugo (Shugo Maeda)" <redmine@...>
[#44084] [ruby-trunk - Bug #6246][Open] 1.9.3-p125 intermittent segfault — "jshow (Jodi Showers)" <jodi@...>
[#44156] [ruby-trunk - Feature #6265][Open] Remove 'useless' 'concatenation' syntax — "rosenfeld (Rodrigo Rosenfeld Rosas)" <rr.rosas@...>
Hi,
(2012/04/09 14:19), Yukihiro Matsumoto wrote:
[#44163] [ruby-trunk - Bug #6266][Open] encoding related exception with recent integrated psych — "jonforums (Jon Forums)" <redmine@...>
[#44233] [ruby-trunk - Bug #6274][Open] Float addition incorrect — "swanboy (Michael Swan)" <swanyboy4@...>
[#44303] [ruby-trunk - Feature #6284][Open] Add composition for procs — "pabloh (Pablo Herrero)" <pablodherrero@...>
[#44329] [ruby-trunk - Feature #6287][Open] nested method should only be visible by nesting/enclosing method — "botp (bot pena)" <botpena@...>
[#44349] [ruby-trunk - Feature #6293][Open] new queue / blocking queues — "tenderlovemaking (Aaron Patterson)" <aaron@...>
On Sat, Apr 14, 2012 at 10:58:12AM +0900, mame (Yusuke Endoh) wrote:
Hi,
On Mon, Apr 16, 2012 at 06:25:59PM +0900, SASADA Koichi wrote:
[#44372] Possible merge error of code in Issue 4651 on to Ruby 1.9.3-p125? — "Blythe,Aaron" <ABLYTHE@...>
tl;dr I believe I have uncovered a merge error to ruby 1.9.3-p125 from Issu=
[#44431] [Backport93 - Backport #6314][Open] Backport r35374 and r35375 — "drbrain (Eric Hodel)" <[email protected]>
[#44432] [ruby-trunk - Feature #6315][Open] handler to trace output of each line of code executed — "ankopainting (Anko Painting)" <anko.com+ruby@...>
[#44533] [ruby-trunk - Bug #6341][Open] SIGSEGV: Thread.new { fork { GC.start } }.join — "rudolf (r stu3)" <redmine@...>
Hello,
On Mon, Apr 23, 2012 at 11:17 PM, Yusuke Endoh <[email protected]> wrote:
Hello,
(4/24/12 6:55 AM), Yusuke Endoh wrote:
> kosaki (Motohiro KOSAKI) wrote:
[#44540] [ruby-trunk - Bug #6343][Open] Improved Fiber documentation — "andhapp (Anuj Dutta)" <anuj@...>
[#44612] [ruby-trunk - Feature #6354][Open] Remove escape (break/return/redo/next support) from class/module scope — "ko1 (Koichi Sasada)" <redmine@...>
[#44630] [ruby-trunk - Feature #6361][Open] Bitwise string operations — "MartinBosslet (Martin Bosslet)" <Martin.Bosslet@...>
On Fri, Apr 27, 2012 at 8:53 PM, MartinBosslet (Martin Bosslet)
On Saturday, April 28, 2012 at 8:52 AM, KOSAKI Motohiro wrote:
[#44636] [ruby-trunk - Bug #6364][Open] Segmentation fault happend when running test_cptr.rb — "raylinn@... (ray linn)" <raylinn@...>
[#44667] possible YAML bug in ruby 1.9.3p125? — Young Hyun <youngh@...>
YAML in ruby 1.9.3p125 seems to have a bug reading in YAML from older =
[#44686] [BUG] not a node 0x07 — ronald braswell <rpbraswell@...>
Running ruby 1.8.6 on Solaris 10.
2012/4/28 ronald braswell <[email protected]>:
I have heard reports of this on 1.9.x. Do you know if this problem has
[#44704] [ruby-trunk - Feature #6373][Open] public #self — "trans (Thomas Sawyer)" <transfire@...>
Issue #6373 has been updated by Marc-Andre Lafortune.
[#44743] [ruby-trunk - Feature #6375][Open] Python notation for literal Hash — "alexeymuranov (Alexey Muranov)" <redmine@...>
[#44748] [ruby-trunk - Feature #6376][Open] Feature lookup and checking if feature is loaded — "trans (Thomas Sawyer)" <transfire@...>
On Thu, May 3, 2012 at 6:02 AM, mame (Yusuke Endoh) <[email protected]> wrote:
[ruby-core:44504] Re: [ruby-trunk - Feature #6293][Assigned] new queue / blocking queues
On Thu, Apr 19, 2012 at 03:20:50PM +0900, SASADA Koichi wrote: > Hi, >=20 > (2012/04/17 0:24), Aaron Patterson wrote: > >>> So calling pop() means we're doing a not not blocking call. :( > >>=20 > >> How about to add try_pop? > >=20 > > try_pop seems fine, but it still seems strange to combine blocking=20 > > and non-blocking queues (but maybe *I* am the one who is strange). > >=20 > > In the case of BlockingQueue#pop in the patch I submitted, it=20 > > allows a timeout. I don't think it's a feature that should be=20 > > abandoned. >=20 > I understand that ::Queue#pop should receive timeout extra parameter. So the interface would be: queue.pop(true, 10) # or queue.pop(false, 10) It seems confusing. I do not think it is as clear as: queue =3D BlockingQueue.new queue.pop 10 I'm not sure the clarity of ::Queue API really matters though. The queues I've submitted do not allow `nil`. In this way, we can have non-blocking reads that do not throw exceptions. I think the queues I've submitted have different (but better) semantics. > However, I'm not sure we need to separate (Blocking|Unblocking)Queue yet. I would like them so that I can change queue types without changing the code that pops off the queue. I simply change the queue I instantiate, and the rest of my code should not have to change. > >> How > >>>=20 > >> about to implement Queue#to_a method that generate array which=20 > >> contains queues containing objects? > >=20 > > That seems fine! Then we can eliminate Enumerable mixed in. :-) >=20 > Yes. And it is clear semantics. >=20 > ::Queue#each can be implemented on at least two semantics: > 1) block the end of queue. (like IO) > 2) return when reach end of queue. (like Array) >=20 > Against IO, Queue#each with semantics (1) can't stop. It is similar > to the cycle object (generated by Enumerator#cycle). I agree. I've attached an updated patch that uses to_a and removes Enumera= ble. --=20 Aaron Patterson http://tenderlovemaking.com/
Attachments (1)
diff --git a/lib/thread.rb b/lib/thread.rb
index 88e86ab..9857599 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -129,7 +129,7 @@ end
#
# consumer = Thread.new do
# 5.times do |i|
-# value = queue.pop
+# value = queue.shift
# sleep rand(i/2) # simulate expense
# puts "consumed #{value}"
# end
diff --git a/lib/thread/queue.rb b/lib/thread/queue.rb
new file mode 100644
index 0000000..3b8d436
--- /dev/null
+++ b/lib/thread/queue.rb
@@ -0,0 +1,187 @@
+require 'timeout'
+
+class Thread
+ # Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize
+ # communication between threads. This queue does not block when items are
+ # removed (see Thread::Queue#remove)
+ #
+ # This queue does not allow nil elements.
+ class Queue
+ class NoSuchElementError < StandardError
+ end
+
+ #
+ # Creates a new queue.
+ #
+ def initialize
+ @que = []
+ @que.taint # enable tainted communication
+ self.taint
+ @mutex = Mutex.new
+ end
+
+ def to_a
+ @que.dup
+ end
+
+ #
+ # Adds +obj+ to the head of the queue.
+ #
+ # Raises an ArgumentError if +obj+ is nil.
+ #
+ def add(obj)
+ raise ArgumentError if obj.nil?
+ @mutex.synchronize { @que.push obj }
+ self
+ end
+
+ alias :push :add
+ alias :<< :add
+
+ def offer(obj, timeout = nil)
+ add obj
+ end
+
+ # Retrieves data from the queue head, and removes it.
+ #
+ # Raises a NoSuchElementError if the queue is empty.
+ def remove
+ @mutex.synchronize {
+ raise NoSuchElementError if empty?
+ @que.shift
+ }
+ end
+
+ alias :pop :remove
+ alias :shift :remove
+ alias :deq :remove
+
+ # Retrieves data from the queue head, and removes it.
+ #
+ # Returns nil if this queue is empty.
+ def poll
+ @mutex.synchronize {
+ if empty?
+ nil
+ else
+ @que.shift
+ end
+ }
+ end
+
+ # Retrieves data from the queue head, but does not removes it.
+ #
+ # Returns nil if the queue is empty
+ def peek
+ @mutex.synchronize { @que.first }
+ end
+
+ # Retrieves data from the queue head, but does not removes it.
+ #
+ # Raises NoSuchElementError if the queue is empty.
+ def element
+ @mutex.synchronize {
+ if empty?
+ raise NoSuchElementError
+ else
+ @que.first
+ end
+ }
+ end
+
+ #
+ # Returns +true+ if the queue is empty.
+ #
+ def empty?
+ @que.empty?
+ end
+
+ #
+ # Removes all objects from the queue.
+ #
+ def clear
+ @que.clear
+ end
+
+ #
+ # Returns the length of the queue.
+ #
+ def length
+ @que.length
+ end
+ alias :size :length
+ end
+
+ # Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize
+ # communication between threads.
+ #
+ # This queue does not allow nil elements.
+ class BlockingQueue < Queue
+ def initialize
+ @waiting = []
+ @waiting.taint
+ super
+ end
+
+ # Retrieves data from the queue head, and removes it.
+ #
+ # If the queue is empty, remove will block until there is something
+ # in the queue.
+ def take
+ @mutex.synchronize {
+ while true
+ if @que.empty?
+ # @waiting.include? check is necessary for avoiding a race against
+ # Thread.wakeup [Bug 5195]
+ @waiting.push Thread.current unless @waiting.include?(Thread.current)
+ @mutex.sleep
+ else
+ return @que.shift
+ end
+ end
+ }
+ end
+
+ alias :pop :take
+ alias :shift :take
+ alias :deq :take
+
+ # Adds +obj+ to the head of the queue.
+ #
+ # Raises an ArgumentError if +obj+ is nil.
+ #
+ def add(obj)
+ raise ArgumentError if obj.nil?
+
+ @mutex.synchronize {
+ @que.push obj
+ begin
+ t = @waiting.shift
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ end
+ }
+ self
+ end
+
+ alias :push :add
+ alias :<< :add
+
+ # Retrieves data from the queue head, and removes it.
+ #
+ # Blocks for +timeout+ seconds if the queue is empty, and returns nil if
+ # the timeout expires.
+ def poll(timeout = nil)
+ return super() unless timeout
+
+ begin
+ Timeout.timeout(timeout) do
+ take
+ end
+ rescue TimeoutError
+ nil
+ end
+ end
+ end
+end
diff --git a/test/thread/helper.rb b/test/thread/helper.rb
new file mode 100644
index 0000000..d3c1258
--- /dev/null
+++ b/test/thread/helper.rb
@@ -0,0 +1,102 @@
+require 'minitest/autorun'
+require 'thread/queue'
+
+class Thread
+ class TestCase < MiniTest::Unit::TestCase
+ class Latch
+ def initialize
+ @mutex = Mutex.new
+ @cond = ConditionVariable.new
+ end
+
+ def release
+ @mutex.synchronize { @cond.broadcast }
+ end
+
+ def await
+ @mutex.synchronize { @cond.wait @mutex }
+ end
+ end
+
+ attr_reader :queue
+
+ POISON = Object.new
+
+ def grind(num_threads, num_objects, num_iterations, klass, *args)
+ from_workers = klass.new(*args)
+ to_workers = klass.new(*args)
+
+ to_consumers = num_threads.times.map {
+ Thread.new {
+ while object = to_workers.pop
+ break if object == POISON
+ from_workers.push object
+ end
+ }
+ }
+
+ from_consumer = Thread.new {
+ num_iterations.times {
+ num_objects.times { from_workers.pop }
+ }
+ }
+
+ num_iterations.times {
+ num_objects.times { to_workers.push 99 }
+ }
+ num_threads.times { to_workers.push POISON }
+
+ to_consumers.each { |t| t.join }
+
+ from_consumer.join
+
+ assert_equal 0, from_workers.size
+ assert_equal 0, to_workers.size
+ end
+
+ def non_block_grind(num_threads, num_objects, num_iterations, klass, *args)
+ from_workers = klass.new(*args)
+ to_workers = klass.new(*args)
+
+ to_latch = Latch.new
+ from_latch = Latch.new
+
+ to_consumers = num_threads.times.map {
+ Thread.new {
+ to_latch.await
+
+ while object = to_workers.pop
+ break if object == POISON
+ from_workers.push object
+ end
+ }
+ }
+
+ from_consumer = Thread.new {
+ from_latch.await
+
+ num_iterations.times {
+ num_objects.times { from_workers.pop }
+ }
+ }
+
+ num_iterations.times {
+ num_objects.times { to_workers.push 99 }
+ }
+ num_threads.times { to_workers.push POISON }
+
+ Thread.pass until to_consumers.all? { |c| c.status == "sleep" }
+ Thread.pass until from_consumer.status == "sleep"
+
+ to_latch.release
+
+ to_consumers.each { |t| t.join }
+
+ from_latch.release
+ from_consumer.join
+
+ assert_equal 0, from_workers.size
+ assert_equal 0, to_workers.size
+ end
+ end
+end
diff --git a/test/thread/test_blocking_queue.rb b/test/thread/test_blocking_queue.rb
new file mode 100644
index 0000000..f80b063
--- /dev/null
+++ b/test/thread/test_blocking_queue.rb
@@ -0,0 +1,93 @@
+require 'helper'
+
+class Thread
+ class TestBlockingQueue < TestCase
+ attr_reader :queue
+
+ def setup
+ @queue = Thread::BlockingQueue.new
+ super
+ end
+
+ def test_add_returns_self
+ assert_equal queue, queue.add(1)
+ end
+
+ def test_queue
+ grind(5, 1000, 15, Thread::BlockingQueue)
+ end
+
+ def test_offer
+ assert queue.offer(1)
+ assert_equal 1, queue.length
+ end
+
+ def test_clear
+ 10.times { |i| queue << i }
+ assert_equal 10, queue.length
+ queue.clear
+ assert_equal 0, queue.length
+ assert queue.empty?
+ end
+
+ def test_add
+ queue.add "foo"
+ assert_equal "foo", queue.take
+ assert queue.empty?
+ end
+
+ def test_add_nil
+ assert_raises(ArgumentError) do
+ queue.add nil
+ end
+ end
+
+ def test_remove_empty
+ assert queue.empty?
+ t = Thread.new { queue.take }
+ queue << 1
+ assert_equal 1, t.join.value
+ end
+
+ def test_poll
+ queue.add "foo"
+ assert_equal "foo", queue.poll
+ end
+
+ def test_poll_empty
+ assert_nil queue.poll
+ end
+
+ def test_poll_timeout
+ assert_nil queue.poll(1)
+
+ t = Thread.new { queue.poll(10) }
+ queue << "foo"
+ assert_equal "foo", t.join.value
+ end
+
+ def test_peek
+ queue.add "foo"
+ assert_equal "foo", queue.peek
+ assert_equal "foo", queue.take
+ end
+
+ def test_peek_empty
+ assert queue.empty?
+ assert_nil queue.peek
+ end
+
+ def test_element
+ queue.add "foo"
+ assert_equal "foo", queue.element
+ assert_equal "foo", queue.take
+ end
+
+ def test_element_empty
+ assert queue.empty?
+ assert_raises(Queue::NoSuchElementError) do
+ queue.element
+ end
+ end
+ end
+end
diff --git a/test/thread/test_non_block_queue.rb b/test/thread/test_non_block_queue.rb
new file mode 100644
index 0000000..fa22a74
--- /dev/null
+++ b/test/thread/test_non_block_queue.rb
@@ -0,0 +1,90 @@
+require 'helper'
+
+class Thread
+ class TestQueue < TestCase
+ alias :grind :non_block_grind
+
+ def setup
+ super
+ @queue = Thread::Queue.new
+ end
+
+ def test_queue
+ grind(5, 1000, 15, Thread::Queue)
+ end
+
+ def test_add_returns_self
+ assert_equal queue, queue.add(1)
+ end
+
+ def test_offer
+ assert queue.offer(1)
+ end
+
+ def test_clear
+ 10.times { |i| queue << i }
+ assert_equal 10, queue.length
+ queue.clear
+ assert_equal 0, queue.length
+ assert queue.empty?
+ end
+
+ def test_add
+ queue.add "foo"
+ assert_equal "foo", queue.remove
+ assert queue.empty?
+ end
+
+ def test_add_nil
+ assert_raises(ArgumentError) do
+ queue.add nil
+ end
+ end
+
+ def test_remove_empty
+ assert queue.empty?
+ assert_raises(Queue::NoSuchElementError) do
+ queue.remove
+ end
+ end
+
+ def test_poll
+ queue.add "foo"
+ assert_equal "foo", queue.poll
+ end
+
+ def test_poll_empty
+ assert_nil queue.poll
+ end
+
+ def test_peek
+ queue.add "foo"
+ assert_equal "foo", queue.peek
+ assert_equal "foo", queue.remove
+ end
+
+ def test_peek_empty
+ assert queue.empty?
+ assert_nil queue.peek
+ end
+
+ def test_element
+ queue.add "foo"
+ assert_equal "foo", queue.element
+ assert_equal "foo", queue.remove
+ end
+
+ def test_element_empty
+ assert queue.empty?
+ assert_raises(Queue::NoSuchElementError) do
+ queue.element
+ end
+ end
+
+ def test_offer_optionally_takes_timeout
+ assert queue.empty?
+ queue.offer 0, 10
+ assert_equal 1, queue.length
+ end
+ end
+end