class Raindrops::Aggregate::PMQ
Aggregate + POSIX message queues support for Ruby 1.9+ and Linux
This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv
Rack application, but can be used independently on compatible Runtimes.
Unlike the core of raindrops, this is only supported on Ruby 1.9+ and Linux 2.6+. Using this class requires the following additional RubyGems or libraries:
-
aggregate (tested with 0.2.2)
-
posix_mq (tested with 1.0.0)
Design¶ ↑
There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from
Setting :worker_interval
and :master_interval
to 1
will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.
Attributes
returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy
Public Class Methods
Source
# File lib/raindrops/aggregate/pmq.rb, line 63 def initialize(params = {}) opts = { :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, :mq_attr => nil, :mq_umask => 0666, :aggregate => Aggregate.new, }.merge! params @master_interval = opts[:master_interval] @worker_interval = opts[:worker_interval] @aggregate = opts[:aggregate] @worker_queue = @worker_interval ? [] : nil @mutex = Mutex.new @mq_name = opts[:queue] mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr] Tempfile.open("raindrops_pmq") do |t| @wr = File.open(t.path, "wb") @rd = File.open(t.path, "rb") end @wr.sync = true @cached_aggregate = @aggregate flush_master @mq_send = if opts[:lossy] @nr_dropped = 0 mq.nonblock = true mq.method :trysend else mq.method :send end end
Creates a new Raindrops::Aggregate::PMQ
object
Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate
options
is a hash that accepts the following keys:
-
:queue - name of the POSIX message queue (default: “/raindrops”)
-
:worker_interval - interval to send to the master (default: 10)
-
:master_interval - interval to for the master to write out (default: 5)
-
:lossy - workers drop packets if master cannot keep up (default: false)
-
:aggregate - Aggregate object (default: Aggregate.new)
-
:mq_umask - umask for creatingthe POSIX message queue (default: 0666)
Public Instance Methods
Source
# File lib/raindrops/aggregate/pmq.rb, line 98 def << val if q = @worker_queue q << val if q.size >= @worker_interval mq_send(q) or @nr_dropped += 1 q.clear end else mq_send(val) or @nr_dropped += 1 end end
adds a sample to the underlying Aggregate object
Source
# File lib/raindrops/aggregate/pmq.rb, line 149 def aggregate @cached_aggregate ||= begin flush Marshal.load(synchronize(@rd, RDLOCK) do |rd| dst = StringIO.new dst.binmode IO.copy_stream(rd, dst, rd.size, 0) dst.string end) end end
Loads the last shared Aggregate from the master thread/process
Source
# File lib/raindrops/aggregate/pmq.rb, line 213 def count; aggregate.count; end
proxy for Aggregate#count
Source
# File lib/raindrops/aggregate/pmq.rb, line 240 def each; aggregate.each { |*args| yield(*args) }; end
proxy for Aggregate#each
Source
# File lib/raindrops/aggregate/pmq.rb, line 243 def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end
proxy for Aggregate#each_nonzero
Source
# File lib/raindrops/aggregate/pmq.rb, line 204 def flush if q = @local_queue && ! q.empty? mq_send q q.clear end nil end
flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval
defines how frequently your queue will be flushed
Source
# File lib/raindrops/aggregate/pmq.rb, line 164 def flush_master dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| wr.truncate 0 wr.rewind wr.write(dump) end end
Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval
defines how frequently your data will be flushed for workers to read.
Source
# File lib/raindrops/aggregate/pmq.rb, line 122 def master_loop buf = "" a = @aggregate nr = 0 mq = POSIX_MQ.new @mq_name, :r # this one is always blocking begin if (nr -= 1) < 0 nr = @master_interval flush_master end mq.shift(buf) data = begin Marshal.load(buf) or return rescue ArgumentError, TypeError next end Array === data ? data.each { |x| a << x } : a << data rescue Errno::EINTR rescue => e warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" break end while true ensure flush_master end
Starts running a master loop, usually in a dedicated thread or process:
Thread.new { agg.master_loop }
Any worker can call agg.stop_master_loop
to stop the master loop (possibly causing the thread or process to exit)
Source
# File lib/raindrops/aggregate/pmq.rb, line 216 def max; aggregate.max; end
proxy for Aggregate#max
Source
# File lib/raindrops/aggregate/pmq.rb, line 225 def mean; aggregate.mean; end
proxy for Aggregate#mean
Source
# File lib/raindrops/aggregate/pmq.rb, line 219 def min; aggregate.min; end
proxy for Aggregate#min
Source
# File lib/raindrops/aggregate/pmq.rb, line 234 def outliers_high; aggregate.outliers_high; end
proxy for Aggregate#outliers_high
Source
# File lib/raindrops/aggregate/pmq.rb, line 231 def outliers_low; aggregate.outliers_low; end
proxy for Aggregate#outliers_low
Source
# File lib/raindrops/aggregate/pmq.rb, line 228 def std_dev; aggregate.std_dev; end
proxy for Aggregate#std_dev
Source
# File lib/raindrops/aggregate/pmq.rb, line 175 def stop_master_loop sleep 0.1 until mq_send(false) rescue Errno::EINTR retry end
stops the currently running master loop, may be called from any worker thread or process
Source
# File lib/raindrops/aggregate/pmq.rb, line 222 def sum; aggregate.sum; end
proxy for Aggregate#sum
Source
# File lib/raindrops/aggregate/pmq.rb, line 237 def to_s(*args); aggregate.to_s(*args); end
proxy for Aggregate#to_s