diff options
| author | Patrick J Cherry <patrick@bytemark.co.uk> | 2015-12-21 17:49:27 +0000 | 
|---|---|---|
| committer | Patrick J Cherry <patrick@bytemark.co.uk> | 2015-12-21 17:49:27 +0000 | 
| commit | 02747d3567b1d393ca177365e97c209624a3aa10 (patch) | |
| tree | 4539d09139337e6fefa7909edbd28f77dca616c6 /queue/zset | |
| parent | bf480214a66d6df62c9bfcf5c448a375a67b3482 (diff) | |
Added a threaded PoC for a "zpop" implementation.
This follows the instructions from
http://redis.io/topics/transactions#using-a-hrefcommandswatchwatcha-to-implement-zpop
Diffstat (limited to 'queue/zset')
| -rw-r--r-- | queue/zset/run_with_threads.rb | 75 | 
1 files changed, 75 insertions, 0 deletions
diff --git a/queue/zset/run_with_threads.rb b/queue/zset/run_with_threads.rb new file mode 100644 index 0000000..cba3eda --- /dev/null +++ b/queue/zset/run_with_threads.rb @@ -0,0 +1,75 @@ + +#!/usr/bin/ruby + +require "redis" +require 'pp' + +class Arse + +  def initialize(name) +    @name = name +    @redis = Redis.new(:host => "127.0.0.1") +  end + +  def fetch(timeout = 1) +    loop do +      job = nil +      @redis.watch("zset") + +      job = @redis.zrange('zset', '0', '0') + +      if job.is_a?(Array) and !job.empty? +        # We only have one entry in our array +        job = job[0] + +        res = @redis.multi do +         # Remove from the queue +         @redis.zrem('zset', job ); +        end +        job = nil if res.nil? +      end + +      @redis.unwatch + +      return job if job.is_a?(String) + +      sleep(timeout)  +    end +  end + +  def run +    Thread.new do +      while( x = fetch() ) +        print "\n" if x == "test 1" +        print "#{@name}:#{x}.. " + +        $count[x] += 1 + +        if ( rand(10) > 5 ) +          sleep 1 +        end + +      end + +    end +  end + +end + +$count = Hash.new{|h,k| h[k] = 0} +$threads = [] + +Signal.trap("INT") do +  pp $count +  exit +end + +$threads = [Arse.new("a").run, Arse.new("b").run, Arse.new("c").run] + +while $threads.any?{|t| t.alive?} do +  $threads.each do |t| +    next if t.alive? +    t.join +  end +  sleep 1 +end  | 
