From 59d42019c258af13057b0947a86ea49b520a5494 Mon Sep 17 00:00:00 2001 From: Tyler Knappe Date: Tue, 1 Nov 2016 22:05:58 +0000 Subject: [PATCH 1/6] Add in config option to purge all queues on startup. --- .../consuming/adapters/rabbit_mq/worker.rb | 27 +++++++++++-------- lib/emque/consuming/configuration.rb | 8 +++--- spec/configuration_spec.rb | 6 ++--- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/lib/emque/consuming/adapters/rabbit_mq/worker.rb b/lib/emque/consuming/adapters/rabbit_mq/worker.rb index 422cb9c..01a1a39 100644 --- a/lib/emque/consuming/adapters/rabbit_mq/worker.rb +++ b/lib/emque/consuming/adapters/rabbit_mq/worker.rb @@ -33,17 +33,22 @@ def initialize(connection, topic) self.queue = channel - .queue( - "emque.#{config.app_name}.#{topic}", - :durable => config.adapter.options[:durable], - :auto_delete => config.adapter.options[:auto_delete], - :arguments => { - "x-dead-letter-exchange" => "#{config.app_name}.error" - } - ) - .bind( - channel.fanout(topic, :durable => true, :auto_delete => false) - ) + .queue( + "emque.#{config.app_name}.#{topic}", + :durable => config.adapter.options[:durable], + :auto_delete => config.adapter.options[:auto_delete], + :arguments => { + "x-dead-letter-exchange" => "#{config.app_name}.error" + } + ) + .bind( + channel.fanout(topic, :durable => true, :auto_delete => false) + ) + + if config.purge_queues_on_start + logger.info "#{log_prefix} is purging it's queue" + queue.purge + end end def start diff --git a/lib/emque/consuming/configuration.rb b/lib/emque/consuming/configuration.rb index 9e4f6f0..84736d2 100644 --- a/lib/emque/consuming/configuration.rb +++ b/lib/emque/consuming/configuration.rb @@ -5,9 +5,9 @@ module Consuming class Configuration attr_accessor :app_name, :adapter, :auto_shutdown, :delayed_message_workers, :enable_delayed_message, :error_handlers, - :error_limit, :error_expiration, :retryable_errors, - :retryable_error_limit, :status, :status_port, :status_host, - :socket_path, :shutdown_handlers + :error_limit, :error_expiration, :purge_queues_on_start, + :retryable_errors, :retryable_error_limit, :status, :status_port, + :status_host, :socket_path, :shutdown_handlers attr_writer :env, :log_level def initialize @@ -19,6 +19,7 @@ def initialize @error_limit = 5 @error_expiration = 3600 # 60 minutes @log_level = nil + @purge_queues_on_start = false @retryable_errors = [] @retryable_error_limit = 3 @status_port = 10000 @@ -56,6 +57,7 @@ def to_hsh :error_handlers, :error_limit, :error_expiration, + :purge_queues_on_start, :log_level, :retryable_errors, :retryable_error_limit, diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index b9c22f9..34f2bce 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -63,9 +63,9 @@ accessors = [ :app_name, :adapter, :auto_shutdown, :delayed_message_workers, :env, :enable_delayed_message, :error_handlers, :error_limit, - :error_expiration, :log_level, :retryable_errors, - :retryable_error_limit, :status_port, :status_host, :status, - :socket_path, :shutdown_handlers + :error_expiration, :purge_queues_on_start, :log_level, + :retryable_errors, :retryable_error_limit, :status_port, :status_host, + :status, :socket_path, :shutdown_handlers ] config = Emque::Consuming::Configuration.new From cf8b3a5293bb6c190ccebd74a35d58a7dc74c288 Mon Sep 17 00:00:00 2001 From: Tyler Knappe Date: Tue, 1 Nov 2016 22:08:41 +0000 Subject: [PATCH 2/6] Bump version and add changelog. --- CHANGELOG.md | 1 + lib/emque/consuming/version.rb | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de40060..c75286a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Emque Consuming CHANGELOG +- [Add in a configuration option to purge queues on startup](https://github.com/emque/emque-consuming/pull/61) 1.3.0 - [Add error logging when an exception is thrown.](https://github.com/emque/emque-consuming/pull/65) 1.2.3 - [Remove double ack when consuming a message and ending up in an error state. This was causing consumers to die silently.](https://github.com/emque/emque-consuming/pull/59) 1.2.1 - [Add in the ability to retry errors and back off with an exponential delay](https://github.com/emque/emque-consuming/pull/55) 1.2.0 diff --git a/lib/emque/consuming/version.rb b/lib/emque/consuming/version.rb index 54c79a0..8976499 100644 --- a/lib/emque/consuming/version.rb +++ b/lib/emque/consuming/version.rb @@ -1,5 +1,5 @@ module Emque module Consuming - VERSION = "1.2.3" + VERSION = "1.3.0" end end From 67524b1a7834c1608496ddd5a1246a9fcc5121b6 Mon Sep 17 00:00:00 2001 From: Tyler Knappe Date: Wed, 2 Nov 2016 17:54:43 +0000 Subject: [PATCH 3/6] Initial tests for purge_queues_on_start configuration option. --- spec/application_spec.rb | 1 - spec/configuration_spec.rb | 13 +++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/spec/application_spec.rb b/spec/application_spec.rb index 2b29d5e..28f0a34 100644 --- a/spec/application_spec.rb +++ b/spec/application_spec.rb @@ -38,7 +38,6 @@ app.notice_error({ :test => "failure" }) app.notice_error({ :test => "another failure" }) - end end end diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index 34f2bce..7cbff89 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -53,6 +53,19 @@ end end + describe "purge_queues_on_start" do + it "has a default" do + config = Emque::Consuming::Configuration.new + expect(config.purge_queues_on_start).to eq(false) + end + + it "prefers the assigned value" do + config = Emque::Consuming::Configuration.new + config.purge_queues_on_start = true + expect(config.purge_queues_on_start).to eq(true) + end + end + describe "#to_hsh" do it "returns a hash" do config = Emque::Consuming::Configuration.new From a818c9188dd1e143d76c4cdb3aee57ef5bf98798 Mon Sep 17 00:00:00 2001 From: Tyler Knappe Date: Thu, 1 Dec 2016 00:01:22 +0000 Subject: [PATCH 4/6] WIP -- can start consumers, need to figure out how to publish messages. --- lib/templates/config/application.rb.tt | 2 +- spec/adapters/rabbit_mq/worker/worker_spec.rb | 27 +++++++++++++++++++ spec/dummy/config/application.rb | 8 ++++++ spec/dummy/consumers/events_consumer.rb | 6 +++++ spec/spec_helper.rb | 1 + 5 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 spec/adapters/rabbit_mq/worker/worker_spec.rb create mode 100644 spec/dummy/consumers/events_consumer.rb diff --git a/lib/templates/config/application.rb.tt b/lib/templates/config/application.rb.tt index d18634d..ea319ba 100644 --- a/lib/templates/config/application.rb.tt +++ b/lib/templates/config/application.rb.tt @@ -15,7 +15,7 @@ module <%= @name %> [].tap { |options_out| if @options.has_key?(:app_name) options_out << "config.app_name = \"#{@options[:app_name]}\"" - end + end if @options.has_key?(:error_limit) options_out << "config.error_limit = #{@options[:error_limit]}" end diff --git a/spec/adapters/rabbit_mq/worker/worker_spec.rb b/spec/adapters/rabbit_mq/worker/worker_spec.rb new file mode 100644 index 0000000..3671429 --- /dev/null +++ b/spec/adapters/rabbit_mq/worker/worker_spec.rb @@ -0,0 +1,27 @@ +require "spec_helper" +require "emque/consuming/adapters/rabbit_mq/worker" +require "pry" + +describe Emque::Consuming::Adapters::RabbitMq::Worker do + describe "#initialize" do + Dummy::Application.config.purge_queues_on_start = false + Dummy::Application.config.set_adapter(:rabbit_mq) + Dummy::Application.router.map do + topic "events" => EventsConsumer do + map "events.new" => "new_event" + end + end + + app = Dummy::Application.new + connection = Bunny.new + connection.start + manager = Dummy::Application.config.adapter.manager.new + app.start + + # expect(queue.message_count).to be eq(10) + + # app.manager.workers.first.last.first.send(:queue).publish("tsting", :routing_key => "events") + # app.manager.workers.first.last.first.send(:channel).default_exchange.publish("test") + # app.manager.workers.first.last.first.send(:queue).message_count + end +end diff --git a/spec/dummy/config/application.rb b/spec/dummy/config/application.rb index ed3a430..ae0a736 100644 --- a/spec/dummy/config/application.rb +++ b/spec/dummy/config/application.rb @@ -1,10 +1,18 @@ require "emque/consuming" +require "emque/consuming/adapters/rabbit_mq/manager" ENV["EMQUE_ENV"] = "test" module Emque module Consuming module Adapters + module RabbitMq + def self.default_options; {}; end + def self.load; end + def self.manager + Emque::Consuming::Adapters::RabbitMq::Manager + end + end module TestAdapter def self.default_options; {}; end def self.load; end diff --git a/spec/dummy/consumers/events_consumer.rb b/spec/dummy/consumers/events_consumer.rb new file mode 100644 index 0000000..5d2e1ef --- /dev/null +++ b/spec/dummy/consumers/events_consumer.rb @@ -0,0 +1,6 @@ +class EventsConsumer + include Emque::Consuming.consumer + + def new_event(message) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 780ccdd..ec3ec36 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,6 +13,7 @@ require "timecop" require "fileutils" require_relative "dummy/config/application" +require_relative "dummy/consumers/events_consumer" module VerifyAndResetHelpers def verify(object) From 6f5fc812a24fdf2ea16753722c705f01610f8c80 Mon Sep 17 00:00:00 2001 From: Randy Girard Date: Tue, 13 Jun 2017 10:43:53 -0400 Subject: [PATCH 5/6] Adds spec updates. --- .../consuming/adapters/rabbit_mq/manager.rb | 2 + spec/adapters/rabbit_mq/worker/worker_spec.rb | 99 ++++++++++++++++--- spec/dummy/config/application.rb | 4 +- .../{events_consumer.rb => spec_consumer.rb} | 4 +- spec/spec_helper.rb | 2 +- 5 files changed, 91 insertions(+), 20 deletions(-) rename spec/dummy/consumers/{events_consumer.rb => spec_consumer.rb} (50%) diff --git a/lib/emque/consuming/adapters/rabbit_mq/manager.rb b/lib/emque/consuming/adapters/rabbit_mq/manager.rb index 7f6f374..66c8bbc 100644 --- a/lib/emque/consuming/adapters/rabbit_mq/manager.rb +++ b/lib/emque/consuming/adapters/rabbit_mq/manager.rb @@ -1,4 +1,6 @@ require "bunny" +require_relative "error_worker" +require_relative "delayed_message_worker" module Emque module Consuming diff --git a/spec/adapters/rabbit_mq/worker/worker_spec.rb b/spec/adapters/rabbit_mq/worker/worker_spec.rb index 3671429..699a66c 100644 --- a/spec/adapters/rabbit_mq/worker/worker_spec.rb +++ b/spec/adapters/rabbit_mq/worker/worker_spec.rb @@ -2,26 +2,95 @@ require "emque/consuming/adapters/rabbit_mq/worker" require "pry" +module Emque + module Consuming + module Adapters + module RabbitMq + class Worker + def start + logger.info "#{log_prefix} starting..." + logger.info "RabbitMQ Worker: Skipping consuming during test." + logger.debug "#{log_prefix} started" + end + end + end + end + end +end + describe Emque::Consuming::Adapters::RabbitMq::Worker do describe "#initialize" do - Dummy::Application.config.purge_queues_on_start = false - Dummy::Application.config.set_adapter(:rabbit_mq) - Dummy::Application.router.map do - topic "events" => EventsConsumer do - map "events.new" => "new_event" - end + before do + @connection = Bunny.new + @connection.start + @channel = @connection.create_channel + @channel.queue_delete("emque.dummy.spec") + @fanout = @channel.fanout("dummy.spec", + :durable => true, + :auto_delete => false + ) + @queue = @channel + .queue("emque.dummy.spec", { + :durable => true, + :auto_delete => false, + :arguments => { + "x-dead-letter-exchange" => "dummy.error" + } + }).bind(@fanout) + @queue.publish(Oj.dump({ + :metadata => { + :topic => "spec", + :type => "dummy.spec" + } + })) + end - app = Dummy::Application.new - connection = Bunny.new - connection.start - manager = Dummy::Application.config.adapter.manager.new - app.start + after do + @channel.queue_delete("emque.dummy.spec") + @connection.close + end - # expect(queue.message_count).to be eq(10) + it "should not purge queues on start" do + Dummy::Application.config.purge_queues_on_start = false + Dummy::Application.config.set_adapter(:rabbit_mq) + Dummy::Application.router.map do + topic "spec" => SpecConsumer do; end + end + app = Dummy::Application.new + connection = Bunny.new + connection.start + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(1) + end + app.start + sleep 0.3 + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(1) + end + end - # app.manager.workers.first.last.first.send(:queue).publish("tsting", :routing_key => "events") - # app.manager.workers.first.last.first.send(:channel).default_exchange.publish("test") - # app.manager.workers.first.last.first.send(:queue).message_count + it "should purge queues on start" do + Dummy::Application.config.purge_queues_on_start = true + Dummy::Application.config.set_adapter(:rabbit_mq) + Dummy::Application.router.map do + topic "spec" => SpecConsumer do; end + end + app = Dummy::Application.new + connection = Bunny.new + connection.start + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(1) + end + app.start + sleep 0.3 + connection.with_channel do |channel| + @queue = channel.queue("emque.dummy.spec", :passive => true) + expect(@queue.message_count).to eq(0) + end + end end end diff --git a/spec/dummy/config/application.rb b/spec/dummy/config/application.rb index ae0a736..db4f32e 100644 --- a/spec/dummy/config/application.rb +++ b/spec/dummy/config/application.rb @@ -7,14 +7,14 @@ module Emque module Consuming module Adapters module RabbitMq - def self.default_options; {}; end + def self.default_options; { :durable => true }; end def self.load; end def self.manager Emque::Consuming::Adapters::RabbitMq::Manager end end module TestAdapter - def self.default_options; {}; end + def self.default_options; { :durable => true }; end def self.load; end def self.manager Emque::Consuming::Adapters::TestAdapter::Manager diff --git a/spec/dummy/consumers/events_consumer.rb b/spec/dummy/consumers/spec_consumer.rb similarity index 50% rename from spec/dummy/consumers/events_consumer.rb rename to spec/dummy/consumers/spec_consumer.rb index 5d2e1ef..2f6e877 100644 --- a/spec/dummy/consumers/events_consumer.rb +++ b/spec/dummy/consumers/spec_consumer.rb @@ -1,6 +1,6 @@ -class EventsConsumer +class SpecConsumer include Emque::Consuming.consumer - def new_event(message) + def test(message) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ec3ec36..ecd5666 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,7 +13,7 @@ require "timecop" require "fileutils" require_relative "dummy/config/application" -require_relative "dummy/consumers/events_consumer" +require_relative "dummy/consumers/spec_consumer" module VerifyAndResetHelpers def verify(object) From 5a141b6f59976d82b6b785f778a646e19854af62 Mon Sep 17 00:00:00 2001 From: Randy Girard Date: Tue, 13 Jun 2017 11:40:16 -0400 Subject: [PATCH 6/6] Removed newline and unnecessary method. --- spec/adapters/rabbit_mq/worker/worker_spec.rb | 1 - spec/dummy/consumers/spec_consumer.rb | 3 --- 2 files changed, 4 deletions(-) diff --git a/spec/adapters/rabbit_mq/worker/worker_spec.rb b/spec/adapters/rabbit_mq/worker/worker_spec.rb index 699a66c..3f2135e 100644 --- a/spec/adapters/rabbit_mq/worker/worker_spec.rb +++ b/spec/adapters/rabbit_mq/worker/worker_spec.rb @@ -43,7 +43,6 @@ def start :type => "dummy.spec" } })) - end after do diff --git a/spec/dummy/consumers/spec_consumer.rb b/spec/dummy/consumers/spec_consumer.rb index 2f6e877..c645dc0 100644 --- a/spec/dummy/consumers/spec_consumer.rb +++ b/spec/dummy/consumers/spec_consumer.rb @@ -1,6 +1,3 @@ class SpecConsumer include Emque::Consuming.consumer - - def test(message) - end end