diff --git a/CHANGELOG.md b/CHANGELOG.md index de40060..c75286a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Emque Consuming CHANGELOG +- [Add in a configuration option to purge queues on startup](https://github.com/emque/emque-consuming/pull/61) 1.3.0 - [Add error logging when an exception is thrown.](https://github.com/emque/emque-consuming/pull/65) 1.2.3 - [Remove double ack when consuming a message and ending up in an error state. This was causing consumers to die silently.](https://github.com/emque/emque-consuming/pull/59) 1.2.1 - [Add in the ability to retry errors and back off with an exponential delay](https://github.com/emque/emque-consuming/pull/55) 1.2.0 diff --git a/lib/emque/consuming/adapters/rabbit_mq/manager.rb b/lib/emque/consuming/adapters/rabbit_mq/manager.rb index 7f6f374..66c8bbc 100644 --- a/lib/emque/consuming/adapters/rabbit_mq/manager.rb +++ b/lib/emque/consuming/adapters/rabbit_mq/manager.rb @@ -1,4 +1,6 @@ require "bunny" +require_relative "error_worker" +require_relative "delayed_message_worker" module Emque module Consuming diff --git a/lib/emque/consuming/adapters/rabbit_mq/worker.rb b/lib/emque/consuming/adapters/rabbit_mq/worker.rb index 422cb9c..01a1a39 100644 --- a/lib/emque/consuming/adapters/rabbit_mq/worker.rb +++ b/lib/emque/consuming/adapters/rabbit_mq/worker.rb @@ -33,17 +33,22 @@ def initialize(connection, topic) self.queue = channel - .queue( - "emque.#{config.app_name}.#{topic}", - :durable => config.adapter.options[:durable], - :auto_delete => config.adapter.options[:auto_delete], - :arguments => { - "x-dead-letter-exchange" => "#{config.app_name}.error" - } - ) - .bind( - channel.fanout(topic, :durable => true, :auto_delete => false) - ) + .queue( + "emque.#{config.app_name}.#{topic}", + :durable => config.adapter.options[:durable], + :auto_delete => config.adapter.options[:auto_delete], + :arguments => { + "x-dead-letter-exchange" => "#{config.app_name}.error" + } + ) + .bind( + channel.fanout(topic, :durable => true, :auto_delete => false) + ) + + if config.purge_queues_on_start + logger.info "#{log_prefix} is purging it's queue" + queue.purge + end end def start diff --git a/lib/emque/consuming/configuration.rb b/lib/emque/consuming/configuration.rb index 9e4f6f0..84736d2 100644 --- a/lib/emque/consuming/configuration.rb +++ b/lib/emque/consuming/configuration.rb @@ -5,9 +5,9 @@ module Consuming class Configuration attr_accessor :app_name, :adapter, :auto_shutdown, :delayed_message_workers, :enable_delayed_message, :error_handlers, - :error_limit, :error_expiration, :retryable_errors, - :retryable_error_limit, :status, :status_port, :status_host, - :socket_path, :shutdown_handlers + :error_limit, :error_expiration, :purge_queues_on_start, + :retryable_errors, :retryable_error_limit, :status, :status_port, + :status_host, :socket_path, :shutdown_handlers attr_writer :env, :log_level def initialize @@ -19,6 +19,7 @@ def initialize @error_limit = 5 @error_expiration = 3600 # 60 minutes @log_level = nil + @purge_queues_on_start = false @retryable_errors = [] @retryable_error_limit = 3 @status_port = 10000 @@ -56,6 +57,7 @@ def to_hsh :error_handlers, :error_limit, :error_expiration, + :purge_queues_on_start, :log_level, :retryable_errors, :retryable_error_limit, diff --git a/lib/emque/consuming/version.rb b/lib/emque/consuming/version.rb index 54c79a0..8976499 100644 --- a/lib/emque/consuming/version.rb +++ b/lib/emque/consuming/version.rb @@ -1,5 +1,5 @@ module Emque module Consuming - VERSION = "1.2.3" + VERSION = "1.3.0" end end diff --git a/lib/templates/config/application.rb.tt b/lib/templates/config/application.rb.tt index d18634d..ea319ba 100644 --- a/lib/templates/config/application.rb.tt +++ b/lib/templates/config/application.rb.tt @@ -15,7 +15,7 @@ module <%= @name %> [].tap { |options_out| if @options.has_key?(:app_name) options_out << "config.app_name = \"#{@options[:app_name]}\"" - end + end if @options.has_key?(:error_limit) options_out << "config.error_limit = #{@options[:error_limit]}" end diff --git a/spec/adapters/rabbit_mq/worker/worker_spec.rb b/spec/adapters/rabbit_mq/worker/worker_spec.rb new file mode 100644 index 0000000..3f2135e --- /dev/null +++ b/spec/adapters/rabbit_mq/worker/worker_spec.rb @@ -0,0 +1,95 @@ +require "spec_helper" +require "emque/consuming/adapters/rabbit_mq/worker" +require "pry" + +module Emque + module Consuming + module Adapters + module RabbitMq + class Worker + def start + logger.info "#{log_prefix} starting..." + logger.info "RabbitMQ Worker: Skipping consuming during test." + logger.debug "#{log_prefix} started" + end + end + end + end + end +end + +describe Emque::Consuming::Adapters::RabbitMq::Worker do + describe "#initialize" do + before do + @connection = Bunny.new + @connection.start + @channel = @connection.create_channel + @channel.queue_delete("emque.dummy.spec") + @fanout = @channel.fanout("dummy.spec", + :durable => true, + :auto_delete => false + ) + @queue = @channel + .queue("emque.dummy.spec", { + :durable => true, + :auto_delete => false, + :arguments => { + "x-dead-letter-exchange" => "dummy.error" + } + }).bind(@fanout) + @queue.publish(Oj.dump({ + :metadata => { + :topic => "spec", + :type => "dummy.spec" + } + })) + end + + after do + @channel.queue_delete("emque.dummy.spec") + @connection.close + end + + it "should not purge queues on start" do + Dummy::Application.config.purge_queues_on_start = false + Dummy::Application.config.set_adapter(:rabbit_mq) + Dummy::Application.router.map do + topic "spec" => SpecConsumer do; end + end + app = Dummy::Application.new + connection = Bunny.new + connection.start + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(1) + end + app.start + sleep 0.3 + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(1) + end + end + + it "should purge queues on start" do + Dummy::Application.config.purge_queues_on_start = true + Dummy::Application.config.set_adapter(:rabbit_mq) + Dummy::Application.router.map do + topic "spec" => SpecConsumer do; end + end + app = Dummy::Application.new + connection = Bunny.new + connection.start + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(1) + end + app.start + sleep 0.3 + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(0) + end + end + end +end diff --git a/spec/application_spec.rb b/spec/application_spec.rb index 2b29d5e..28f0a34 100644 --- a/spec/application_spec.rb +++ b/spec/application_spec.rb @@ -38,7 +38,6 @@ app.notice_error({ :test => "failure" }) app.notice_error({ :test => "another failure" }) - end end end diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index b9c22f9..7cbff89 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -53,6 +53,19 @@ end end + describe "purge_queues_on_start" do + it "has a default" do + config = Emque::Consuming::Configuration.new + expect(config.purge_queues_on_start).to eq(false) + end + + it "prefers the assigned value" do + config = Emque::Consuming::Configuration.new + config.purge_queues_on_start = true + expect(config.purge_queues_on_start).to eq(true) + end + end + describe "#to_hsh" do it "returns a hash" do config = Emque::Consuming::Configuration.new @@ -63,9 +76,9 @@ accessors = [ :app_name, :adapter, :auto_shutdown, :delayed_message_workers, :env, :enable_delayed_message, :error_handlers, :error_limit, - :error_expiration, :log_level, :retryable_errors, - :retryable_error_limit, :status_port, :status_host, :status, - :socket_path, :shutdown_handlers + :error_expiration, :purge_queues_on_start, :log_level, + :retryable_errors, :retryable_error_limit, :status_port, :status_host, + :status, :socket_path, :shutdown_handlers ] config = Emque::Consuming::Configuration.new diff --git a/spec/dummy/config/application.rb b/spec/dummy/config/application.rb index ed3a430..db4f32e 100644 --- a/spec/dummy/config/application.rb +++ b/spec/dummy/config/application.rb @@ -1,12 +1,20 @@ require "emque/consuming" +require "emque/consuming/adapters/rabbit_mq/manager" ENV["EMQUE_ENV"] = "test" module Emque module Consuming module Adapters + module RabbitMq + def self.default_options; { :durable => true }; end + def self.load; end + def self.manager + Emque::Consuming::Adapters::RabbitMq::Manager + end + end module TestAdapter - def self.default_options; {}; end + def self.default_options; { :durable => true }; end def self.load; end def self.manager Emque::Consuming::Adapters::TestAdapter::Manager diff --git a/spec/dummy/consumers/spec_consumer.rb b/spec/dummy/consumers/spec_consumer.rb new file mode 100644 index 0000000..c645dc0 --- /dev/null +++ b/spec/dummy/consumers/spec_consumer.rb @@ -0,0 +1,3 @@ +class SpecConsumer + include Emque::Consuming.consumer +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 780ccdd..ecd5666 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,6 +13,7 @@ require "timecop" require "fileutils" require_relative "dummy/config/application" +require_relative "dummy/consumers/spec_consumer" module VerifyAndResetHelpers def verify(object)