diff options
-rw-r--r-- | queue/zset/run_with_threads.rb | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/queue/zset/run_with_threads.rb b/queue/zset/run_with_threads.rb new file mode 100644 index 0000000..cba3eda --- /dev/null +++ b/queue/zset/run_with_threads.rb @@ -0,0 +1,75 @@ + +#!/usr/bin/ruby + +require "redis" +require 'pp' + +class Arse + + def initialize(name) + @name = name + @redis = Redis.new(:host => "127.0.0.1") + end + + def fetch(timeout = 1) + loop do + job = nil + @redis.watch("zset") + + job = @redis.zrange('zset', '0', '0') + + if job.is_a?(Array) and !job.empty? + # We only have one entry in our array + job = job[0] + + res = @redis.multi do + # Remove from the queue + @redis.zrem('zset', job ); + end + job = nil if res.nil? + end + + @redis.unwatch + + return job if job.is_a?(String) + + sleep(timeout) + end + end + + def run + Thread.new do + while( x = fetch() ) + print "\n" if x == "test 1" + print "#{@name}:#{x}.. " + + $count[x] += 1 + + if ( rand(10) > 5 ) + sleep 1 + end + + end + + end + end + +end + +$count = Hash.new{|h,k| h[k] = 0} +$threads = [] + +Signal.trap("INT") do + pp $count + exit +end + +$threads = [Arse.new("a").run, Arse.new("b").run, Arse.new("c").run] + +while $threads.any?{|t| t.alive?} do + $threads.each do |t| + next if t.alive? + t.join + end + sleep 1 +end |