From c34a4b2e54ad091e0e98d195c0a9bf70da47b91f Mon Sep 17 00:00:00 2001 From: Patrick J Cherry Date: Mon, 30 Jan 2012 12:27:05 +0000 Subject: Added possibility of using postgres databases. --- lib/mauve/alert.rb | 22 +++++++++++++++------- lib/mauve/datamapper.rb | 8 +++++++- lib/mauve/server.rb | 5 ++--- 3 files changed, 24 insertions(+), 11 deletions(-) (limited to 'lib/mauve') diff --git a/lib/mauve/alert.rb b/lib/mauve/alert.rb index 37a8f2e..3c9fbdc 100644 --- a/lib/mauve/alert.rb +++ b/lib/mauve/alert.rb @@ -14,7 +14,6 @@ module Mauve include DataMapper::Resource - property :id, Serial property :alert_id, Integer property :earliest, EpochTime belongs_to :alert, :model => "Alert" @@ -29,6 +28,16 @@ module Mauve # def self.create_view! the_distant_future = (Time.now + 2000.days).to_i # it is the year 2000 - the humans are dead + + case DataMapper.repository(:default).adapter.class.to_s + when "DataMapper::Adapters::PostgresAdapter" + ifnull = "COALESCE" + min = "LEAST" + else + ifnull = "IFNULL" + min = "MIN" + end + ["BEGIN TRANSACTION", "DROP VIEW IF EXISTS mauve_alert_earliest_dates", "CREATE VIEW @@ -37,12 +46,12 @@ module Mauve SELECT id AS alert_id, NULLIF( - MIN( - IFNULL(will_clear_at, '#{the_distant_future}'), - IFNULL(will_raise_at, '#{the_distant_future}'), - IFNULL(will_unacknowledge_at, '#{the_distant_future}') + #{min}( + #{ifnull}(will_clear_at, #{the_distant_future}), + #{ifnull}(will_raise_at, #{the_distant_future}), + #{ifnull}(will_unacknowledge_at, #{the_distant_future}) ), - '#{the_distant_future}' + #{the_distant_future} ) AS earliest FROM mauve_alerts WHERE @@ -54,7 +63,6 @@ module Mauve repository(:default).adapter.execute(statement.gsub(/\s+/, " ")) end end - end # diff --git a/lib/mauve/datamapper.rb b/lib/mauve/datamapper.rb index abf56c7..e7334da 100644 --- a/lib/mauve/datamapper.rb +++ b/lib/mauve/datamapper.rb @@ -6,7 +6,13 @@ require 'dm-core' require 'dm-migrations' require 'dm-serializer' -require 'dm-sqlite-adapter-with-mutex' +%w(dm-sqlite-adapter-with-mutex dm-postgres-adapter).each do |req| + begin + require req + rescue LoadError => err + # do not a lot. + end +end require 'dm-types' require 'dm-validations' diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb index 233695d..f07b7ad 100644 --- a/lib/mauve/server.rb +++ b/lib/mauve/server.rb @@ -1,7 +1,7 @@ # encoding: UTF-8 require 'yaml' require 'socket' -# require 'mauve/datamapper' +require 'mauve/datamapper' require 'mauve/proto' require 'mauve/alert' require 'mauve/history' @@ -118,7 +118,6 @@ module Mauve # # m.auto_migrate! if m.respond_to?("auto_migrate!") # - # m.properties.each do |prop| next unless prop.is_a?(DataMapper::Property::EpochTime) logger.info("Updating #{c}.#{prop.name}") @@ -126,7 +125,7 @@ module Mauve DataMapper.repository(:default).adapter.execute("BEGIN TRANSACTION;") DataMapper.repository(:default).adapter.execute(statement) DataMapper.repository(:default).adapter.execute("COMMIT TRANSACTION;") - end + end if DataMapper.repository(:default).adapter.class.to_s == "DataMapper::Adapters::SqliteAdapter" end AlertHistory.migrate! -- cgit v1.2.1 From 8ed82944ebd764f1986b3155990b895045b584bc Mon Sep 17 00:00:00 2001 From: Patrick J Cherry Date: Mon, 30 Jan 2012 12:28:14 +0000 Subject: Bundled timer in with processor. What could go wrong..? --- lib/mauve/configuration_builders/server.rb | 19 ------ lib/mauve/processor.rb | 89 ++++++++++++++++++++----- lib/mauve/server.rb | 4 +- lib/mauve/timer.rb | 100 ----------------------------- 4 files changed, 76 insertions(+), 136 deletions(-) delete mode 100644 lib/mauve/timer.rb (limited to 'lib/mauve') diff --git a/lib/mauve/configuration_builders/server.rb b/lib/mauve/configuration_builders/server.rb index c000144..e3654b9 100644 --- a/lib/mauve/configuration_builders/server.rb +++ b/lib/mauve/configuration_builders/server.rb @@ -64,24 +64,6 @@ module Mauve end end - # - # This is the Timer singleton. - # - class Timer < ObjectBuilder - # - # This is the interval at which the Timer thread is run. This will limit - # the rate at which notifications can be sent, if set. - # - is_attribute "poll_every" - - # Sets up a Mauve::Timer singleton as the result - # - # @return [Mauve::Timer] - def builder_setup - @result = Mauve::Timer.instance - end - end - class Notifier < ObjectBuilder # # This is the interval at which the notification queue is polled for new @@ -163,7 +145,6 @@ module Mauve is_builder "web_interface", HTTPServer is_builder "listener", UDPServer is_builder "processor", Processor - is_builder "timer", Timer is_builder "notifier", Notifier is_builder "heartbeat", Heartbeat is_builder "pop3_server", Pop3Server diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index c8a1dfb..96c44aa 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -76,25 +76,87 @@ module Mauve # # flush the queue # - main_loop + do_processor end private - # This is the main loop that does the processing. - # def main_loop - - sz = Server.packet_buffer_size + do_processor + do_timer unless timer_should_stop? + end - sz.times do - Timer.instance.freeze if Timer.instance.alive? and !Timer.instance.frozen? + def do_timer + # + # Get the next alert. + # + next_alert = Alert.find_next_with_event + + # + # If we didn't find an alert, or the alert we found is due in the future, + # look for the next alert_changed object. + # + if next_alert.nil? or next_alert.due_at > Time.now + next_alert_changed = AlertChanged.find_next_with_event + end + if next_alert_changed.nil? and next_alert.nil? + next_to_notify = nil + + elsif next_alert.nil? or next_alert_changed.nil? + next_to_notify = (next_alert || next_alert_changed) + + else + next_to_notify = ( next_alert.due_at < next_alert_changed.due_at ? next_alert : next_alert_changed ) + + end + + # + # Nothing to notify? + # + if next_to_notify.nil? + # + # Sleep indefinitely + # + logger.info("Nothing to notify about -- snoozing for a while.") + sleep_loops = 600 + else # - # Hmm.. timer not frozen. + # La la la nothing to do. # - break unless Timer.instance.frozen? + logger.info("Next to notify: #{next_to_notify} #{next_to_notify.is_a?(AlertChanged) ? "(reminder)" : "(heartbeat)"} -- snoozing until #{next_to_notify.due_at.iso8601}") + sleep_loops = ((next_to_notify.due_at - Time.now).to_f / 0.1).round.to_i + end + + sleep_loops = 0 if sleep_loops.nil? or sleep_loops < 1 + # + # Ah-ha! Sleep with a break clause. + # + sleep_loops.times do + # + # Start again if the situation has changed. + # + break if timer_should_stop? + + # + # This is a rate-limiting step for alerts. + # + Kernel.sleep 0.1 + end + + return if timer_should_stop? or next_to_notify.nil? + + next_to_notify.poll + end + + # This is processor loop + # + def do_processor + + sz = Server.packet_buffer_size + + sz.times do data, client, received_at = Server.packet_pop # @@ -136,14 +198,11 @@ module Mauve ensure @transmission_id_cache[update.transmission_id.to_s] = Time.now end - end + end - ensure - # - # Thaw the timer - # - Timer.instance.thaw if Timer.instance.frozen? + def timer_should_stop? + (Server.packet_buffer_size > 0) end end diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb index f07b7ad..3e82858 100644 --- a/lib/mauve/server.rb +++ b/lib/mauve/server.rb @@ -7,7 +7,7 @@ require 'mauve/alert' require 'mauve/history' require 'mauve/mauve_thread' require 'mauve/mauve_time' -require 'mauve/timer' +require 'mauve/notifier' require 'mauve/udp_server' require 'mauve/pop3_server' require 'mauve/processor' @@ -23,7 +23,7 @@ module Mauve # # This is the order in which the threads should be started. # - THREAD_CLASSES = [UDPServer, HTTPServer, Pop3Server, Processor, Timer, Notifier, Heartbeat] + THREAD_CLASSES = [UDPServer, HTTPServer, Pop3Server, Processor, Notifier, Heartbeat] attr_reader :hostname, :database, :initial_sleep attr_reader :packet_buffer, :notification_buffer, :started_at diff --git a/lib/mauve/timer.rb b/lib/mauve/timer.rb deleted file mode 100644 index 5a43b60..0000000 --- a/lib/mauve/timer.rb +++ /dev/null @@ -1,100 +0,0 @@ -# encoding: UTF-8 -require 'mauve/alert' -require 'mauve/notifier' -require 'mauve/mauve_thread' -require 'thread' -require 'log4r' - -module Mauve - - # - # This is the thread that looks for reminders and heartbeat alerts to poll. - # - class Timer < MauveThread - - include Singleton - - def initialize - # - # Set the default polling interval to zero.. - # - self.poll_every = 0 - - super - end - - private - - # This is the trigger for heartbeats and reminders. - # - # It looks up the next event, and sleeps until it is due. If an update - # comes in (via the processor) it is broken out of its sleep, and starts - # again when woken up. - # - def main_loop - # - # Get the next alert. - # - next_alert = Alert.find_next_with_event - - # - # If we didn't find an alert, or the alert we found is due in the future, - # look for the next alert_changed object. - # - if next_alert.nil? or next_alert.due_at > Time.now - next_alert_changed = AlertChanged.find_next_with_event - end - - if next_alert_changed.nil? and next_alert.nil? - next_to_notify = nil - - elsif next_alert.nil? or next_alert_changed.nil? - next_to_notify = (next_alert || next_alert_changed) - - else - next_to_notify = ( next_alert.due_at < next_alert_changed.due_at ? next_alert : next_alert_changed ) - - end - - # - # Nothing to notify? - # - if next_to_notify.nil? - # - # Sleep indefinitely - # - logger.info("Nothing to notify about -- snoozing for a while.") - sleep_loops = 600 - else - # - # La la la nothing to do. - # - logger.info("Next to notify: #{next_to_notify} #{next_to_notify.is_a?(AlertChanged) ? "(reminder)" : "(heartbeat)"} -- snoozing until #{next_to_notify.due_at.iso8601}") - sleep_loops = ((next_to_notify.due_at - Time.now).to_f / 0.1).round.to_i - end - - sleep_loops = 1 if sleep_loops.nil? or sleep_loops < 1 - - # - # Ah-ha! Sleep with a break clause. - # - sleep_loops.times do - # - # Start again if the situation has changed. - # - break if self.should_stop? - - # - # This is a rate-limiting step for alerts. - # - Kernel.sleep 0.1 - end - - return if self.should_stop? or next_to_notify.nil? - - next_to_notify.poll - end - - end - -end -- cgit v1.2.1