Ruby: Queue with thread-safe “each” method

Ruby has blessed us with many a feature and cool ways to use them, one of them being the Queue.
However, you can’t iterate a Queue, you need to pop it and get one element at a time, which has been the de facto way of doing things for a long time.

You see, Queues are especially useful with threads.
They are thread-safe so they take care of synchronization themselves.

But what if you want to take advantage of that while keeping their elements in them?
You can’t do that with “pop”; sure you can fill the Queue after pop’ing all its elements but that’s not thread safe.

Well, in all actuality that’s the main reason you want to use a Queue, put a number of tasks in them and let the threads remove each task and take care it if.

When your thread has been emptied out this means that you’re work is done.

However, wouldn’t it be great if Queue had a thread-safe “each” method?
So I figured what the hell, let’s see if I can remedy that.
Keep in mind that this is kind of a rape of the queue data structure, I’m not sure that you can call it a queue anymore.

Anyways, here’s MyQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#
# MyQueue extends Queue adding a thread safe "each" method.
#
# If you want to use the "each" method MyQueue must be terminated<br />
# with the "MyQueue::EOQ" constant.
#
# Example:
#
#  require 'thread'
#  require 'myqueue'
#
#  @q = MyQueue.new
#  
#  producer = Thread.new do
#    
#    10.times {
#       |i|
#       @q < < i
#    }
#    
#    @q << MyQueue::EOQ
#  
#  end
#  
#  consumers = []
#  2.times {
#      |i|
#      consumers << Thread.new {
#          @q.each( ) {
#              |elem|
#              puts "Thread #{i.to_s}: " + elem.to_s
#          }
#          puts
#      }
#  
#  }
#  
#  consumers.each { |t| t.join }
#  
#  pp @q
#
#
class  MyQueue < Queue

    # this is the End-Of-Queue signal
    EOQ = self.hash
   
    #
    # Iterates through each element in the queue
    #
    # Use MyQueue::EOQ to terminate the queue.
    #
    # @param  [Bool]  non_block   If 'true' an exception will be raised
    #                               if the queue has been exhausted.<br/>
    #                               Otherwise the calling thread will be
    #                               suspended until data is pushed into the queue.
    #
    def each( non_block = false )
        @mutex.synchronize {
           
            cnt = 0
            sz  = self.size - 1

            while true
               
                if cnt > sz
                   
                    raise ThreadError, "queue exhausted" if non_block
                    @waiting.push Thread.current
                   
                    # recalculate the queue size
                    sz = self.size - 1
                   
                    @mutex.sleep
                else
                    break if @que[cnt] == EOQ
                   
                    yield @que[cnt]
                    cnt += 1
                end
               
            end
        }
    end  

end

Here’s an example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 require 'thread'
 require 'myqueue'

 @q = MyQueue.new

 producer = Thread.new do

   10.times {
      |i|
      @q < < i
   }

   @q << MyQueue::EOQ

 end

 consumers = []
 2.times {
     |i|
     consumers << Thread.new {
         @q.each( ) {
             |elem|
             puts "Thread #{i.to_s}: " + elem.to_s
         }
         puts
     }

 }

 consumers.each { |t| t.join }

 pp @q

And here’s the output:

Thread 0: 0
Thread 0: 1
Thread 0: 2
Thread 0: 3
Thread 0: 4
Thread 0: 5
Thread 0: 6
Thread 0: 7
Thread 0: 8
Thread 0: 9

Thread 1: 0
Thread 1: 1
Thread 1: 2
Thread 1: 3
Thread 1: 4
Thread 1: 5
Thread 1: 6
Thread 1: 7
Thread 1: 8
Thread 1: 9

#<MyQueue:0x00000000eace50
 @mutex=#<Mutex:0x00000000eacda8>,
 @que=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 7694316],
 @waiting=[]>

SociBook del.icio.us Digg Facebook Google Yahoo Buzz StumbleUpon

Posted in: Programming, Ruby, Temple of Knowledge

Tags: , , , ,



addLeave a comment