Ruby: Queue with thread-safe “each” method
Ruby has blessed us with many a feature and cool ways to use them, one of them being the Queue.
However, you can’t iterate a Queue, you need to pop it and get one element at a time, which has been the de facto way of doing things for a long time.
You see, Queues are especially useful with threads.
They are thread-safe so they take care of synchronization themselves.
But what if you want to take advantage of that while keeping their elements in them?
You can’t do that with “pop”; sure you can fill the Queue after pop’ing all its elements but that’s not thread safe.
Well, in all actuality that’s the main reason you want to use a Queue, put a number of tasks in them and let the threads remove each task and take care it if.
When your thread has been emptied out this means that you’re work is done.
However, wouldn’t it be great if Queue had a thread-safe “each” method?
So I figured what the hell, let’s see if I can remedy that.
Keep in mind that this is kind of a rape of the queue data structure, I’m not sure that you can call it a queue anymore.
Anyways, here’s MyQueue:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | # # MyQueue extends Queue adding a thread safe "each" method. # # If you want to use the "each" method MyQueue must be terminated<br /> # with the "MyQueue::EOQ" constant. # # Example: # # require 'thread' # require 'myqueue' # # @q = MyQueue.new # # producer = Thread.new do # # 10.times { # |i| # @q < < i # } # # @q << MyQueue::EOQ # # end # # consumers = [] # 2.times { # |i| # consumers << Thread.new { # @q.each( ) { # |elem| # puts "Thread #{i.to_s}: " + elem.to_s # } # puts # } # # } # # consumers.each { |t| t.join } # # pp @q # # class MyQueue < Queue # this is the End-Of-Queue signal EOQ = self.hash # # Iterates through each element in the queue # # Use MyQueue::EOQ to terminate the queue. # # @param [Bool] non_block If 'true' an exception will be raised # if the queue has been exhausted.<br/> # Otherwise the calling thread will be # suspended until data is pushed into the queue. # def each( non_block = false ) @mutex.synchronize { cnt = 0 sz = self.size - 1 while true if cnt > sz raise ThreadError, "queue exhausted" if non_block @waiting.push Thread.current # recalculate the queue size sz = self.size - 1 @mutex.sleep else break if @que[cnt] == EOQ yield @que[cnt] cnt += 1 end end } end end |
Here’s an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | require 'thread' require 'myqueue' @q = MyQueue.new producer = Thread.new do 10.times { |i| @q < < i } @q << MyQueue::EOQ end consumers = [] 2.times { |i| consumers << Thread.new { @q.each( ) { |elem| puts "Thread #{i.to_s}: " + elem.to_s } puts } } consumers.each { |t| t.join } pp @q |
And here’s the output:
Thread 0: 0 Thread 0: 1 Thread 0: 2 Thread 0: 3 Thread 0: 4 Thread 0: 5 Thread 0: 6 Thread 0: 7 Thread 0: 8 Thread 0: 9 Thread 1: 0 Thread 1: 1 Thread 1: 2 Thread 1: 3 Thread 1: 4 Thread 1: 5 Thread 1: 6 Thread 1: 7 Thread 1: 8 Thread 1: 9 #<MyQueue:0x00000000eace50 @mutex=#<Mutex:0x00000000eacda8>, @que=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 7694316], @waiting=[]>
Posted in: Programming, Ruby, Temple of Knowledge
Leave a comment








