summaryrefslogtreecommitdiff
path: root/lib/custodian/queue.rb
blob: c8dfed87fc5d5e762d9a30d5e1db0ff007c52628 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#
# Attempt to load the Redis-library.
#
# Without this we cannot connect to our queue.
#
%w( redis ).each do |library|
  begin
    require library
  rescue LoadError
    puts("Failed to load the #{library} library - queue access will fail!")
  end
end



module Custodian


  #
  # An abstraction layer for our queue.
  #
  class QueueType

    #
    # Retrieve a job from the queue.
    #
    def fetch(_timeout)
      raise 'Subclasses must implement this method!'
    end


    #
    # Add a new job to the queue.
    #
    def add(_job_string)
      raise 'Subclasses must implement this method!'
    end


    #
    # Get the size of the queue
    #
    def size?
      raise 'Subclasses must implement this method!'
    end


    #
    # Empty the queue
    #
    def flush!
      raise 'Subclasses must implement this method!'
    end
  end




  #
  # This is a simple queue which uses Redis for storage.
  #
  class RedisQueueType < QueueType


    #
    # Connect to the server on localhost, unless QUEUE_ADDRESS is set.
    #
    def initialize
      host = ENV['QUEUE_ADDRESS'] || '127.0.0.1'
      @redis = Redis.new(:host => host)
    end


    #
    #  Fetch a job from the queue.
    #
    #  The timeout is used to specify the period we wait for a new job, and
    # we pause that same period between fetches.
    #
    def fetch(timeout = 1)
      job = nil

      loop do

        # Get a random job
        job = @redis.spop('custodian_queue')

        # If that worked return it
        if !job.nil?
          return job
        else
          sleep(timeout)
        end
      end
    end


    #
    #  Add a new job to the queue.
    #
    def add(test)

      # Add unless already present
      @redis.sadd('custodian_queue', test) unless
        ( @redis.sismember( 'custodian_queue', test ) )
    end


    #
    #  How many jobs in the queue?
    #
    def size?
      @redis.zcard('custodian_queue')
    end


    #
    #  Empty the queue, discarding all pending jobs.
    #
    def flush!
      @redis.del('custodian_queue')
    end

  end

end