diff --git a/Gemfile b/Gemfile index 96b2a9e1c..1de200de4 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,10 @@ gem "rake-compiler" gem "concurrent-ruby" group :test do + # Async bus/service for circuit breaker sync (PoC) + gem "async-bus", "~> 0.3" + gem "async-service", "~> 0.14" + gem "benchmark-memory" gem "benchmark-ips" gem "memory_profiler" diff --git a/Gemfile.lock b/Gemfile.lock index af0bd1c8c..a4699bc88 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -33,6 +33,23 @@ GEM remote: https://rubygems.org/ specs: ast (2.4.3) + async (2.35.3) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) + async-bus (0.3.1) + async + io-endpoint + io-stream + msgpack + async-container (0.29.0) + async (~> 2.22) + async-service (0.17.0) + async + async-container (~> 0.28) + string-format (~> 0.2) base64 (0.3.0) benchmark-ips (2.14.0) benchmark-memory (0.2.0) @@ -42,12 +59,20 @@ GEM coderay (1.1.3) concurrent-ruby (1.3.5) connection_pool (3.0.1) + console (1.34.2) + fiber-annotation + fiber-local (~> 1.1) + json date (3.4.1) debug (1.11.0) irb (~> 1.10) reline (>= 0.3.8) drb (2.2.3) erb (5.0.1) + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) google-protobuf (4.33.0-aarch64-linux-gnu) bigdecimal rake (>= 13) @@ -80,6 +105,9 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-endpoint (0.16.0) + io-event (1.14.2) + io-stream (0.11.1) irb (1.15.2) pp (>= 0.6.0) rdoc (>= 4.0.0) @@ -90,9 +118,11 @@ GEM logger (1.7.0) memory_profiler (1.1.0) method_source (1.1.0) + metrics (0.15.0) minitest (5.26.2) mocha (2.8.0) ruby2_keywords (>= 0.0.5) + msgpack (1.8.0) mutex_m (0.3.0) mysql2 (0.5.7) bigdecimal @@ -162,9 +192,11 @@ GEM ruby-progressbar (1.13.0) ruby2_keywords (0.0.5) securerandom (0.4.1) + string-format (0.2.0) stringio (3.1.7) timeout (0.4.4) toxiproxy (2.0.2) + traces (0.18.2) trilogy (2.9.0) tzinfo (2.0.6) concurrent-ruby (~> 1.0) @@ -187,6 +219,8 @@ PLATFORMS DEPENDENCIES activerecord! + async-bus (~> 0.3) + async-service (~> 0.14) benchmark-ips benchmark-memory bigdecimal diff --git a/bin/semian_server b/bin/semian_server new file mode 100755 index 000000000..04b76431d --- /dev/null +++ b/bin/semian_server @@ -0,0 +1,86 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Semian Sync Server +# +# This server aggregates circuit breaker state across all connected workers +# and broadcasts state changes back to them. +# +# == Dynamic Resource Registration +# +# The server starts with NO predefined resources. Resources are registered +# dynamically by clients when they create Semian resources with `sync_scope: :shared`. +# This allows the server to be deployed as a sidecar without knowing all resources upfront. +# +# == Usage +# +# bundle exec ruby bin/semian_server +# +# == Environment Variables +# +# SEMIAN_SYNC_SOCKET - Path to Unix socket (default: /var/run/semian/semian.sock) +# +# == Client Configuration +# +# Clients connect and register resources when they call: +# +# Semian.register(:mysql_shard_0, +# error_threshold: 6, +# error_timeout: 45, +# success_threshold: 2, +# sync_scope: :shared # This triggers registration with the server +# ) +# + +require "bundler/setup" +require "fileutils" + +$LOAD_PATH.unshift(File.expand_path("../lib", __dir__)) +require "semian/sync/server" + +def main + socket_path = ENV["SEMIAN_SYNC_SOCKET"] || "/var/run/semian/semian.sock" + + # Ensure socket directory exists + socket_dir = File.dirname(socket_path) + FileUtils.mkdir_p(socket_dir) unless File.directory?(socket_dir) + + puts "=" * 60 + puts "Semian Sync Server" + puts "=" * 60 + puts "" + puts "Socket: #{socket_path}" + puts "" + puts "Resources: (none - registered dynamically by clients)" + puts "" + puts "Clients will register resources when they create Semian" + puts "resources with sync_scope: :shared" + puts "" + puts "To enable clients, set:" + puts " SEMIAN_SYNC_ENABLED=1" + puts " SEMIAN_SYNC_SOCKET=#{socket_path}" + puts "" + puts "Press Ctrl+C to stop" + puts "=" * 60 + puts "" + + # Start server with empty resources - clients register dynamically + server = Semian::Sync::CircuitBreakerServer.new( + socket_path: socket_path, + resources: {}, + ) + + # Handle shutdown signals + shutdown = proc do + puts "\nShutting down..." + server.stop + exit(0) + end + + trap("INT", &shutdown) + trap("TERM", &shutdown) + + server.start +end + +main if __FILE__ == $PROGRAM_NAME diff --git a/examples/circuit_breaker_sync/README.md b/examples/circuit_breaker_sync/README.md new file mode 100644 index 000000000..6d1515564 --- /dev/null +++ b/examples/circuit_breaker_sync/README.md @@ -0,0 +1,220 @@ +# Circuit Breaker Sync Demo + +This demo showcases the shared circuit breaker coordination using the `Semian::Sync` module. Circuit breaker state is synchronized across multiple clients via a central server using async-bus for transparent RPC and bidirectional communication. + +**Note:** This demo uses async-service to manage the server lifecycle and async-bus for client-server communication. + +## Prerequisites + +```bash +bundle install +``` + +## Running the Demo + +### 1. Start the Server + +In one terminal: + +```bash +bundle exec async-service examples/circuit_breaker_sync/demo_server.rb +``` + +The server will: +- Listen on `/tmp/semian_demo.sock` +- Manage a single circuit breaker with default config (3 errors to open, 10s timeout, 2 successes to close) +- Log state transitions and client connections + +### 2. Start Client(s) + +#### Interactive Mode + +Open separate terminals for each client: + +```bash +# Client 1 +bundle exec ruby examples/circuit_breaker_sync/demo_client.rb client1 + +# Client 2 +bundle exec ruby examples/circuit_breaker_sync/demo_client.rb client2 +``` + +Interactive commands: +- `e` - Simulate an error (triggers circuit breaker) +- `s` - Simulate a success +- `a` - Attempt acquire (will raise if open) +- `?` - Get current state +- `q` - Quit + +#### Automated Mode + +Run an automated demo that triggers circuit state changes: + +```bash +bundle exec ruby examples/circuit_breaker_sync/demo_client.rb client1 auto +``` + +The automated demo will: +1. Report 3 errors to open the circuit +2. Wait 11 seconds for timeout +3. Report 2 successes to close the circuit + +## Expected Output + +### Server Terminal + +``` +0.0s info: SemianSyncService [oid=0x...] [ec=0x...] [pid=12345] + | Semian Sync Server listening on /tmp/semian_demo.sock +0.01s info: SemianSyncService [oid=0x...] [ec=0x...] [pid=12345] + | Client connected +``` + +### Client Terminal (Interactive) + +``` +============================================================ +Semian Circuit Breaker Sync Demo Client +============================================================ + +Socket: /tmp/semian_demo.sock +Resource: demo_resource + +Mode: Interactive (use 'auto' argument for automated demo) + +[client1] Registering resource with sync_scope: :shared... +[client1] Resource registered! +[client1] Circuit breaker type: Semian::Sync::CircuitBreaker +[client1] Current state: closed + +[client1] Commands: + e - Simulate an error (triggers circuit breaker) + s - Simulate a success + a - Attempt acquire (will raise if open) + ? - Get current state + q - Quit + +[client1] > e +[client1] Error reported. State: closed +[client1] > e +[client1] Error reported. State: closed +[client1] > e +[client1] Error reported. State: open +``` + +### Client Terminal (Automated) + +``` +============================================================ +Semian Circuit Breaker Sync Demo Client +============================================================ + +Socket: /tmp/semian_demo.sock +Resource: demo_resource + +Mode: Automated demo + +[client1] Registering resource with sync_scope: :shared... +[client1] Resource registered! +[client1] Circuit breaker type: Semian::Sync::CircuitBreaker +[client1] Initial state: closed + +[client1] Starting automated demo... +[client1] Will report 3 errors to open circuit, wait for half-open, then close it + +[client1] Reporting error 1/3... +[client1] State after error: closed +[client1] Reporting error 2/3... +[client1] State after error: closed +[client1] Reporting error 3/3... +[client1] State after error: open + +[client1] Circuit should be OPEN now. Waiting 11 seconds for timeout... +[client1] State after timeout: half_open + +[client1] Reporting successes to close circuit... +[client1] Reporting success 1/2... +[client1] State after success: half_open +[client1] Reporting success 2/2... +[client1] State after success: closed + +[client1] Demo complete! +``` + +## Multi-Client Demo + +To see state synchronization across multiple clients: + +1. Start the server +2. Start two clients in interactive mode (`client1` and `client2`) +3. Report errors from `client1` to open the circuit +4. Observe that `client2` also sees the open state + +## Architecture + +### Using the Real Semian API + +The demo client uses the standard Semian API with `sync_scope: :shared`: + +```ruby +ENV["SEMIAN_SYNC_ENABLED"] = "1" +ENV["SEMIAN_SYNC_SOCKET"] = "/tmp/semian_demo.sock" + +resource = Semian.register( + :my_resource, + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + exceptions: [MyError], + sync_scope: :shared, # This enables shared circuit breaker +) + +resource.acquire do + # Your code here - errors are automatically reported +end +``` + +### Async-Bus Communication + +``` +┌─────────────────┐ ┌─────────────────────────────────┐ +│ Demo Client │ │ async-service │ +│ │ │ ┌─────────────────────────────┐│ +│ ┌─────────────┐ │ socket │ │ SemianSyncService ││ +│ │ Subscriber │◄├──────────┼──┤ ││ +│ │ Controller │ │ callback │ │ ┌───────────────────────┐ ││ +│ └─────────────┘ │ │ │ │ CircuitBreakerController│ ││ +│ │ │ │ │ │ ││ +│ circuit_breaker├──────────┼──┤ │ - state transitions │ ││ +│ proxy │ RPC │ │ │ - error/success track│ ││ +│ │ │ │ │ - subscriber mgmt │ ││ +└─────────────────┘ │ │ └───────────────────────┘ ││ + │ └─────────────────────────────┘│ + └─────────────────────────────────┘ +``` + +### Key Patterns Demonstrated + +1. **Real Semian API**: Uses `Semian.register(..., sync_scope: :shared)` for shared circuit breakers +2. **async-service**: Manages server lifecycle with health checking and graceful shutdown +3. **Controller Binding**: Server binds `CircuitBreakerController` for client access +4. **Transparent RPC**: Client calls methods on proxy as if they were local +5. **Bidirectional Communication**: Client binds `SubscriberController`, server calls back via proxy +6. **State Machine**: closed -> open -> half_open -> closed + +### Code Structure + +- `demo_server.rb`: async-service configuration with `SemianSyncService` +- `demo_client.rb`: Uses real Semian API with `sync_scope: :shared` + +## What This Validates + +- Semian.register with sync_scope: :shared creates a `Semian::Sync::CircuitBreaker` +- async-service manages server lifecycle properly +- async-bus server accepts multiple concurrent connections +- Transparent RPC for circuit breaker operations +- Bidirectional communication via subscriber callbacks +- State change broadcasts to all subscribed clients +- Circuit breaker state machine works correctly +- Dead subscriber cleanup on notification failure +- Client auto-reconnect with queue flushing and resubscription diff --git a/examples/circuit_breaker_sync/demo_client.rb b/examples/circuit_breaker_sync/demo_client.rb new file mode 100644 index 000000000..6de4ae35a --- /dev/null +++ b/examples/circuit_breaker_sync/demo_client.rb @@ -0,0 +1,258 @@ +# frozen_string_literal: true + +# Demo client for Circuit Breaker Sync using the real Semian API +# Run: bundle exec ruby examples/circuit_breaker_sync/demo_client.rb [client_name] [mode] +# +# This demo validates: +# - Semian.register with sync_scope: :shared creates a synced circuit breaker +# - The circuit breaker reports errors/successes to the server +# - State change notifications are received from the server +# - Multiple clients see synchronized state + +$LOAD_PATH.unshift(File.expand_path("../../lib", __dir__)) + +require "semian" + +Semian.logger = Logger.new($stdout) +Semian.logger.level = Logger::INFO + +SOCKET_PATH = "/tmp/semian_demo.sock" +RESOURCE_NAME = :demo_resource + +# Set up environment for sync +ENV["SEMIAN_SYNC_ENABLED"] = "1" +ENV["SEMIAN_SYNC_SOCKET"] = SOCKET_PATH + +# Custom error class for demo +class DemoError < StandardError + def marks_semian_circuits? + true + end +end + +# Interactive demo client using real Semian API +class DemoClient + def initialize(name) + @name = name + @resource = nil + end + + def start + puts "[#{@name}] Registering resource with sync_scope: :shared..." + + # Register a synced circuit breaker using the real Semian API + @resource = Semian.register( + RESOURCE_NAME, + bulkhead: false, # Disable bulkhead for demo simplicity + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + exceptions: [DemoError], + sync_scope: :shared, + ) + + puts "[#{@name}] Resource registered!" + puts "[#{@name}] Circuit breaker type: #{@resource.circuit_breaker.class}" + puts "[#{@name}] Current state: #{@resource.circuit_breaker.state.value}" + puts "" + + # Run interactive loop + interactive_loop + rescue Errno::ENOENT + puts "[#{@name}] Error: Server socket not found at #{SOCKET_PATH}" + puts "[#{@name}] Make sure the server is running first:" + puts " bundle exec async-service examples/circuit_breaker_sync/demo_server.rb" + rescue Errno::ECONNREFUSED + puts "[#{@name}] Error: Connection refused" + rescue => e + puts "[#{@name}] Error: #{e.class} - #{e.message}" + puts e.backtrace.first(5).join("\n") + ensure + Semian.destroy(RESOURCE_NAME) if @resource + end + + private + + def interactive_loop + puts "[#{@name}] Commands:" + puts " e - Simulate an error (triggers circuit breaker)" + puts " s - Simulate a success" + puts " a - Attempt acquire (will raise if open)" + puts " ? - Get current state" + puts " q - Quit" + puts "" + + loop do + print "[#{@name}] > " + input = $stdin.gets&.strip&.downcase + + case input + when "e" + simulate_error + when "s" + simulate_success + when "a" + attempt_acquire + when "?" + show_state + when "q", nil + puts "[#{@name}] Goodbye!" + exit(0) + else + puts "[#{@name}] Unknown command. Use e/s/a/?/q" + end + rescue => e + puts "[#{@name}] Command error: #{e.message}" + end + end + + def simulate_error + @resource.acquire do + raise DemoError, "Simulated error" + end + rescue DemoError + puts "[#{@name}] Error reported. State: #{@resource.circuit_breaker.state.value}" + rescue Semian::OpenCircuitError + puts "[#{@name}] Circuit is OPEN - request blocked" + end + + def simulate_success + @resource.acquire do + # Success - do nothing + end + puts "[#{@name}] Success reported. State: #{@resource.circuit_breaker.state.value}" + rescue Semian::OpenCircuitError + puts "[#{@name}] Circuit is OPEN - cannot report success" + end + + def attempt_acquire + @resource.acquire do + puts "[#{@name}] Acquire succeeded!" + end + rescue Semian::OpenCircuitError + puts "[#{@name}] Circuit is OPEN - acquire blocked" + end + + def show_state + puts "[#{@name}] Current state: #{@resource.circuit_breaker.state.value}" + end +end + +# Automated demo that triggers circuit state changes +class AutomatedDemoClient + def initialize(name) + @name = name + @resource = nil + end + + def start + puts "[#{@name}] Registering resource with sync_scope: :shared..." + + # Register a synced circuit breaker using the real Semian API + @resource = Semian.register( + RESOURCE_NAME, + bulkhead: false, + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + exceptions: [DemoError], + sync_scope: :shared, + ) + + puts "[#{@name}] Resource registered!" + puts "[#{@name}] Circuit breaker type: #{@resource.circuit_breaker.class}" + puts "[#{@name}] Initial state: #{@resource.circuit_breaker.state.value}" + puts "" + + # Demo sequence + puts "[#{@name}] Starting automated demo..." + puts "[#{@name}] Will report 3 errors to open circuit, wait for half-open, then close it" + puts "" + + # Report errors to open the circuit + 3.times do |i| + sleep(1) + puts "[#{@name}] Reporting error #{i + 1}/3..." + begin + @resource.acquire { raise DemoError, "Error #{i + 1}" } + rescue DemoError + # Expected + rescue Semian::OpenCircuitError + puts "[#{@name}] Circuit already open!" + end + puts "[#{@name}] State after error: #{@resource.circuit_breaker.state.value}" + end + + puts "" + puts "[#{@name}] Circuit should be OPEN now. Waiting 11 seconds for timeout..." + sleep(11) + + # Refresh state from server + puts "[#{@name}] State after timeout: #{@resource.circuit_breaker.state.value}" + puts "" + + # Report successes to close the circuit + puts "[#{@name}] Reporting successes to close circuit..." + 2.times do |i| + sleep(1) + puts "[#{@name}] Reporting success #{i + 1}/2..." + begin + @resource.acquire { } # Success + rescue Semian::OpenCircuitError + puts "[#{@name}] Circuit still open, cannot report success" + end + puts "[#{@name}] State after success: #{@resource.circuit_breaker.state.value}" + end + + puts "" + puts "[#{@name}] Demo complete!" + + # Keep alive for a bit + sleep(2) + rescue Errno::ENOENT + puts "[#{@name}] Error: Server not running. Start it first:" + puts " bundle exec async-service examples/circuit_breaker_sync/demo_server.rb" + rescue => e + puts "[#{@name}] Error: #{e.class} - #{e.message}" + puts e.backtrace.first(5).join("\n") + ensure + Semian.destroy(RESOURCE_NAME) if @resource + end +end + +# Run the client +if __FILE__ == $PROGRAM_NAME + require "async" + + # Parse arguments: support both "auto" and "client_name auto" + if ARGV[0] == "auto" + mode = "auto" + client_name = ARGV[1] || "client-#{Process.pid}" + else + client_name = ARGV[0] || "client-#{Process.pid}" + mode = ARGV[1] + end + + puts "=" * 60 + puts "Semian Circuit Breaker Sync Demo Client" + puts "=" * 60 + puts "" + puts "Socket: #{SOCKET_PATH}" + puts "Resource: #{RESOURCE_NAME}" + puts "" + + if mode == "auto" + puts "Mode: Automated demo" + puts "" + client = AutomatedDemoClient.new(client_name) + else + puts "Mode: Interactive (use 'auto' argument for automated demo)" + puts "" + client = DemoClient.new(client_name) + end + + # Run inside Async reactor to keep async-bus connection alive + Async do + client.start + end +end diff --git a/examples/circuit_breaker_sync/demo_server.rb b/examples/circuit_breaker_sync/demo_server.rb new file mode 100644 index 000000000..59676e342 --- /dev/null +++ b/examples/circuit_breaker_sync/demo_server.rb @@ -0,0 +1,123 @@ +#!/usr/bin/env async-service +# frozen_string_literal: true + +# Demo server for Circuit Breaker Sync using async-service +# Run: bundle exec async-service examples/circuit_breaker_sync/demo_server.rb +# +# This demo validates: +# - Server accepts multiple client connections via async-bus +# - Circuit breaker state machine (closed -> open -> half_open -> closed) +# - State change broadcasts to all subscribed clients +# - Proper lifecycle management via async-service + +$LOAD_PATH.unshift(File.expand_path("../../lib", __dir__)) + +require "async/service/managed_service" +require "async/service/managed_environment" +require "async/bus/server" +require "semian/sync/server" + +# Service that runs the Semian circuit breaker sync server +class SemianSyncService < Async::Service::ManagedService + def run(instance, evaluator) + socket_path = evaluator.socket_path + + # Clean up any existing socket + File.unlink(socket_path) if File.exist?(socket_path) + + # Create the circuit breaker controller with configured resources + controller = Semian::Sync::CircuitBreakerController.new + evaluator.resources.each do |name, config| + controller.register_resource( + name, + error_threshold: config[:error_threshold], + error_timeout: config[:error_timeout], + success_threshold: config[:success_threshold], + ) + Console.info(self) do + "Registered resource: #{name} (errors: #{config[:error_threshold]}, " \ + "timeout: #{config[:error_timeout]}s, successes: #{config[:success_threshold]})" + end + end + + # Create async-bus server + endpoint = IO::Endpoint.unix(socket_path) + server = Async::Bus::Server.new(endpoint) + client_count = 0 + + Console.info(self) { "Semian Sync Server listening on #{socket_path}" } + instance.ready! + + # Start background tasks + Async do |task| + # Timeout checker + task.async do + loop do + sleep(1) + controller.check_timeouts + end + end + + # Periodic stats logger + task.async do + loop do + sleep(10) + stats = controller.statistics + Console.info(self) do + "Stats: clients=#{client_count}, resources=#{stats[:resources]}, " \ + "open_circuits=#{stats[:open_circuits]}, subscribers=#{stats[:total_subscribers]}" + end + end + end + + # Accept connections + server.accept do |connection| + client_count += 1 + Console.info(self) { "Client connected (total: #{client_count})" } + connection.bind(:circuit_breaker, controller) + + # Note: async-bus handles connection lifecycle; this block runs for duration of connection + rescue => e + Console.error(self) { "Connection error: #{e.class} - #{e.message}" } + ensure + client_count -= 1 + Console.info(self) { "Client disconnected (remaining: #{client_count})" } + end + end + + server + end + + private def format_title(evaluator, server) + "semian-sync [#{evaluator.socket_path}]" + end +end + +# Environment configuration for the Semian service +module SemianEnvironment + include Async::Service::ManagedEnvironment + + def socket_path + ENV.fetch("SEMIAN_SOCKET_PATH", "/tmp/semian_demo.sock") + end + + def resources + { + demo_resource: { + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + }, + } + end + + def count + 1 + end +end + +# Define the service +service "semian-sync" do + service_class SemianSyncService + include SemianEnvironment +end diff --git a/lib/semian.rb b/lib/semian.rb index c135d8ec6..5dc96dc17 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -197,6 +197,11 @@ def to_s # +exceptions+: An array of exception classes that should be accounted as resource errors. Default []. # (circuit breaker) # + # +sync_scope+: Controls circuit breaker state synchronization scope. (circuit breaker) + # - +:local+ (default): Circuit breaker state is local to this process + # - +:shared+: Circuit breaker state is synchronized across all connected workers + # via the Sync::Server. Requires SEMIAN_SYNC_ENABLED=1. + # # Returns the registered resource. def register(name, **options) return UnprotectedResource.new(name) if ENV.key?("SEMIAN_DISABLED") @@ -302,9 +307,28 @@ def consumers def create_circuit_breaker(name, **options) return if ENV.key?("SEMIAN_CIRCUIT_BREAKER_DISABLED") + return unless options.fetch(:circuit_breaker, true) exceptions = options[:exceptions] || [] + sync_scope = options.fetch(:sync_scope, :local) + + # Use synced circuit breaker for shared scope + if sync_scope == :shared && ENV["SEMIAN_SYNC_ENABLED"] + require_relative "semian/sync/circuit_breaker" unless defined?(Sync::CircuitBreaker) + Sync::Client.setup! + + return Sync::CircuitBreaker.new( + name, + exceptions: Array(exceptions) + [::Semian::BaseError], + success_threshold: options[:success_threshold], + error_threshold: options[:error_threshold], + error_timeout: options[:error_timeout], + half_open_resource_timeout: options[:half_open_resource_timeout], + ) + end + + # Default: local circuit breaker CircuitBreaker.new( name, success_threshold: options[:success_threshold], diff --git a/lib/semian/sync/README.md b/lib/semian/sync/README.md new file mode 100644 index 000000000..03bd75305 --- /dev/null +++ b/lib/semian/sync/README.md @@ -0,0 +1,239 @@ +# Semian::Sync - Shared Circuit Breaker Coordination + +Enables circuit breaker state to be synchronized across multiple worker processes. +Errors from all workers are aggregated by a central server, and state changes +are broadcast back to all workers. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ SERVER PROCESS │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ CircuitBreakerServer │ │ +│ │ - Listens on Unix socket │ │ +│ │ - Accepts connections │ │ +│ │ - Runs background tasks (timeout checks, stats) │ │ +│ │ │ │ +│ │ ┌───────────────────────────────────────────────────────────────┐ │ │ +│ │ │ CircuitBreakerController │ │ │ +│ │ │ - @resources: { name → {state, errors[], thresholds...} } │ │ │ +│ │ │ - @subscribers: { name → [proxy1, proxy2, ...] } │ │ │ +│ │ │ - State machine logic (closed→open→half_open→closed) │ │ │ +│ │ │ - Bound to connections as :circuit_breaker │ │ │ +│ │ └───────────────────────────────────────────────────────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────┘ + ▲ + │ Unix Socket + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ WORKER PROCESS │ +│ │ +│ ┌──────────────────────────┐ ┌────────────────────────────────────┐ │ +│ │ CircuitBreaker │ │ Client (module) │ │ +│ │ (user-facing API) │ │ - @state_cache │ │ +│ │ │ │ - @subscriptions │ │ +│ │ acquire { } │─────▶│ - @report_queue │ │ +│ │ mark_failed(error) │ │ │ │ +│ │ mark_success │ │ ┌──────────────────────────────┐ │ │ +│ │ request_allowed? │ │ │ SemianBusClient │ │ │ +│ │ │ │ │ - Auto-reconnect │ │ │ +│ │ @state (cached locally) │◀─────│ │ - @circuit_breaker (proxy) │ │ │ +│ └──────────────────────────┘ │ │ - @subscriber_proxy │ │ │ +│ │ │ │ │ │ +│ │ │ ┌────────────────────────┐ │ │ │ +│ │ │ │ SubscriberController │ │ │ │ +│ │ │ │ on_state_change(r, s) │ │ │ │ +│ │ │ └────────────────────────┘ │ │ │ +│ │ └──────────────────────────────┘ │ │ +│ └────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +## Class Responsibilities + +| Class | File | Purpose | +|-------|------|---------| +| `CircuitBreaker` | circuit_breaker.rb | User-facing API, delegates to Client, caches state locally | +| `Client` | client.rb | Module managing state cache, report queue, delegates to SemianBusClient | +| `SemianBusClient` | client.rb | Owns connection lifecycle (run task, reconnect), holds RPC proxies | +| `SubscriberController` | client.rb | Receives state change callbacks from server | +| `CircuitBreakerController` | server.rb | State machine logic, RPC interface, manages all resources | +| `CircuitBreakerServer` | server.rb | Socket lifecycle, background tasks, owns controller | + +## Data Flow + +### 1. Resource Registration (startup) + +``` +CircuitBreaker.new(:mysql) + │ + ▼ +Client.register_resource(:mysql, config) + │ + ▼ RPC +CircuitBreakerController.register_resource(:mysql, ...) + │ + ├─► Creates @resources[:mysql] = {state: :closed, errors: [], ...} + │ + ▼ +Returns {registered: true, state: "closed"} + │ + ▼ +Client.subscribe_to_updates(:mysql) { |state| ... } + │ + ▼ RPC (passes subscriber_proxy) +CircuitBreakerController.subscribe(:mysql, proxy) + │ + ▼ +Stores proxy in @subscribers[:mysql] +``` + +### 2. Error Reporting (request fails) + +``` +CircuitBreaker.acquire { raise Error } + │ + ▼ +CircuitBreaker.mark_failed(error) + │ + ▼ +Client.report_error_async(:mysql, timestamp) + │ + ▼ RPC +CircuitBreakerController.report_error(:mysql, timestamp) + │ + ├─► Adds to errors[], checks threshold + │ + ├─► If threshold reached: state = :open + │ + ▼ +notify_subscribers(:mysql, :open) + │ + ▼ RPC callback to each proxy +SubscriberController.on_state_change("mysql", "open") + │ + ▼ +Client.handle_state_change → updates @state_cache + │ + ▼ +Invokes registered callbacks + │ + ▼ +CircuitBreaker.@state = :open +``` + +### 3. State Check (before request) + +``` +CircuitBreaker.acquire { ... } + │ + ▼ +CircuitBreaker.request_allowed? + │ + ├─► If @state == :open, refresh from server + │ │ + │ ▼ RPC + │ Client.get_state(:mysql) + │ │ + │ ▼ + │ CircuitBreakerController.get_state(:mysql) + │ + ▼ +Returns true if :closed or :half_open +``` + +### 4. Timeout Transition (background) + +``` +CircuitBreakerServer background task (every 1s) + │ + ▼ +CircuitBreakerController.check_timeouts + │ + ├─► For each :open resource where timeout elapsed + │ + ├─► state = :half_open + │ + ▼ +notify_subscribers(:mysql, :half_open) + │ + ▼ RPC callback +All connected clients receive state update +``` + +## Key Concepts + +### State Machine (lives on server) + +``` +closed ──(error_threshold errors)──► open + │ + │ (error_timeout elapsed) + ▼ +closed ◄──(success_threshold)─── half_open + │ + │ (any error) + ▼ + open +``` + +### Why Shared State? + +The **state machine lives on the server** (`CircuitBreakerController`). Clients just: +1. Report events (errors/successes) +2. Cache state locally for fast reads +3. Receive broadcasts when state changes + +This allows multiple worker processes to share circuit breaker state - if one worker +sees enough errors to trip the threshold, ALL workers' circuits open simultaneously. + +### Graceful Degradation + +- Workers continue with local state cache if server unavailable +- Reports queued (up to 1000) for later delivery +- Auto-reconnect with exponential backoff +- Subscriptions automatically restored on reconnect + +## Usage + +### 1. Start the Server + +```bash +bundle exec ruby bin/semian_server +``` + +Or set environment variables: +```bash +SEMIAN_SYNC_SOCKET=/var/run/semian/semian.sock bundle exec ruby bin/semian_server +``` + +### 2. Configure Clients + +```ruby +ENV["SEMIAN_SYNC_ENABLED"] = "1" +ENV["SEMIAN_SYNC_SOCKET"] = "/var/run/semian/semian.sock" + +resource = Semian.register( + :mysql_primary, + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + sync_scope: :shared, # Enables shared circuit breaker +) + +resource.acquire do + # Your code here - errors are automatically reported +end +``` + +## Files + +``` +lib/semian/sync/ +├── README.md # This file +├── client.rb # Client (module), SemianBusClient, SubscriberController +├── server.rb # CircuitBreakerServer, CircuitBreakerController +└── circuit_breaker.rb # CircuitBreaker (user-facing API) +``` diff --git a/lib/semian/sync/circuit_breaker.rb b/lib/semian/sync/circuit_breaker.rb new file mode 100644 index 000000000..911f8ef52 --- /dev/null +++ b/lib/semian/sync/circuit_breaker.rb @@ -0,0 +1,172 @@ +# frozen_string_literal: true + +# Semian::Sync::CircuitBreaker - Circuit breaker with server-delegated state +# +# Same interface as Semian::CircuitBreaker but delegates state to a central server: +# - Reports errors/successes to CircuitBreakerServer via Client +# - Receives state broadcasts from server +# - Caches state locally for fast access +# +# Used automatically when registering with sync_scope: :shared: +# +# Semian.register(:mysql_primary, +# error_threshold: 3, error_timeout: 10, success_threshold: 2, +# sync_scope: :shared +# ) + +require_relative "client" + +module Semian + module Sync + class CircuitBreaker + attr_reader :name, :half_open_resource_timeout, :error_timeout, :last_error + + def initialize(name, exceptions:, success_threshold:, error_threshold:, + error_timeout:, half_open_resource_timeout: nil, **_options) + @name = name.to_sym + @exceptions = Array(exceptions) + [::Semian::BaseError] + @error_threshold = error_threshold + @error_timeout = error_timeout + @success_threshold = success_threshold + @half_open_resource_timeout = half_open_resource_timeout + @last_error = nil + + # Local state cache - updated by server broadcasts + @state = :closed + + setup_sync! + end + + # Execute block with circuit breaker protection. Raises OpenCircuitError if open. + def acquire(resource = nil, &block) + raise OpenCircuitError unless request_allowed? + + result = nil + begin + result = maybe_with_half_open_resource_timeout(resource, &block) + rescue *@exceptions => error + if !error.respond_to?(:marks_semian_circuits?) || error.marks_semian_circuits? + mark_failed(error) + end + raise error + else + mark_success + end + result + end + + def request_allowed? + refresh_state_if_stale + closed? || half_open? + end + + def mark_failed(error) + @last_error = error + timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC) + Client.report_error_async(@name, timestamp) + end + + # Only meaningful when half-open. + def mark_success + return unless half_open? + Client.report_success_async(@name) + end + + def reset + @state = :closed + @last_error = nil + end + + def destroy + # No local resources to clean up + end + + def closed? + @state == :closed + end + + def open? + @state == :open + end + + def half_open? + @state == :half_open + end + + # For compatibility with code expecting state object. + def state + self + end + + def value + @state + end + + def in_use? + !closed? + end + + private + + def setup_sync! + # Register with server + result = Client.register_resource(@name, { + error_threshold: @error_threshold, + error_timeout: @error_timeout, + success_threshold: @success_threshold, + }) + + # Set initial state from server + if result && result[:state] + update_state(result[:state].to_sym) + end + + # Subscribe to state broadcasts + Client.subscribe_to_updates(@name) do |new_state| + update_state(new_state) + end + end + + def update_state(new_state) + old_state = @state + @state = new_state.to_sym + + if old_state != @state + log_state_transition(old_state, @state) + notify_state_transition(@state) + end + end + + def refresh_state_if_stale + return unless open? + + # When open, check if server has transitioned to half_open + server_state = Client.get_state(@name) + if server_state && server_state != @state + update_state(server_state) + end + end + + def log_state_transition(old_state, new_state) + Semian.logger.info( + "[Semian::Sync::CircuitBreaker] State transition from #{old_state} to #{new_state}. " \ + "name=\"#{@name}\"", + ) + end + + def notify_state_transition(new_state) + Semian.notify(:state_change, self, nil, nil, state: new_state) + end + + def maybe_with_half_open_resource_timeout(resource, &block) + if half_open? && @half_open_resource_timeout && resource.respond_to?(:with_resource_timeout) + resource.with_resource_timeout(@half_open_resource_timeout) do + block.call + end + else + block.call + end + end + end + end +end diff --git a/lib/semian/sync/client.rb b/lib/semian/sync/client.rb new file mode 100644 index 000000000..43d4d86e8 --- /dev/null +++ b/lib/semian/sync/client.rb @@ -0,0 +1,408 @@ +# frozen_string_literal: true + +# Semian::Sync::Client - Client-side coordination for shared circuit breakers +# +# Manages communication with the CircuitBreakerServer via async-bus RPC. +# Clients report errors/successes and receive state change broadcasts. +# +# Components: +# - SubscriberController: Receives state change callbacks from the server +# - SemianBusClient: Persistent connection with auto-reconnect +# - Client: Module managing connection, caching, and report queuing +# +# Example: +# Semian::Sync::Client.configure("/var/run/semian/semian.sock") +# Semian::Sync::Client.report_error_async(:mysql_primary, Time.now.to_f) +# Semian::Sync::Client.subscribe_to_updates(:mysql_primary) { |state| ... } + +require "async" +require "async/bus/client" +require "async/bus/controller" +require "async/condition" +require "io/endpoint/unix_endpoint" + +module Semian + module Sync + # Receives state change callbacks from the server via RPC. + # Passed by reference to server's subscribe method for bidirectional communication. + class SubscriberController < Async::Bus::Controller + def initialize(client) + @client = client + end + + # Called by server when a resource's state changes (via RPC callback). + # Must return a serializable value for RPC response. + def on_state_change(resource_name, new_state) + @client.handle_state_change(resource_name, new_state) + true + end + end + + # Persistent async-bus connection with auto-reconnect. + # Owns the full connection lifecycle including the run task. + # + # Usage: + # bus_client = SemianBusClient.new(endpoint, client_manager) + # bus_client.start_connection(timeout: 5.0) + # bus_client.circuit_breaker.report_error(...) + # bus_client.disconnect + # + class SemianBusClient < Async::Bus::Client + attr_reader :circuit_breaker, :connection, :subscriber_proxy + + def initialize(endpoint, client_manager) + super(endpoint) + @client_manager = client_manager + @run_task = nil + @connection = nil + @circuit_breaker = nil + @subscriber = nil + @subscriber_proxy = nil + @connection_ready = Async::Condition.new + @connection_failed = false + end + + def connected? + @connection != nil && @circuit_breaker != nil + end + + # Start the async-bus run loop and wait for connection. + def start_connection(timeout: 5.0) + return if @run_task + + @run_task = run + wait_for_connection(timeout: timeout) + end + + # Cleanly disconnect and stop the run task. + def disconnect + @run_task&.stop rescue nil + @run_task = nil + @connection&.close rescue nil + @connection = nil + @circuit_breaker = nil + end + + def mark_failed! + @connection = nil + @circuit_breaker = nil + @connection_failed = true + @connection_ready.signal + end + + # Called by async-bus on connection (initial and reconnections). + protected def connected!(connection) + @connection_failed = false + @connection = connection + @circuit_breaker = connection[:circuit_breaker] + + # Bind subscriber controller for receiving server callbacks + @subscriber = SubscriberController.new(@client_manager) + @subscriber_proxy = connection.bind(:subscriber, @subscriber) + + @client_manager.log_info("Connected to server") + @client_manager.on_connected(self) + @connection_ready.signal + rescue => e + @client_manager.log_error("Setup error in connected!: #{e.message}") + @connection = nil + @circuit_breaker = nil + @connection_failed = true + @connection_ready.signal + end + + private + + # Block until connected or timeout. Returns immediately if already connected. + def wait_for_connection(timeout: 5.0) + return true if connected? + return false if @connection_failed + + Async do |task| + task.with_timeout(timeout) do + @connection_ready.wait until connected? || @connection_failed + end + rescue Async::TimeoutError + # Timeout waiting for connection + end + + connected? + end + end + + # Module managing server communication for shared circuit breakers. + # + # Responsibilities: + # - Auto-reconnecting connection via async-bus + # - Error/success reporting via RPC + # - State change callbacks via subscriber controller + # - Local state caching for fast access + # - Report queuing when disconnected (up to MAX_QUEUE_SIZE) + # + # Designed for Async context (e.g., Falcon workers). Uses cooperative fibers, + # no threads or mutexes needed. + module Client + extend self + + MAX_QUEUE_SIZE = 1000 + + # Module-level state + @socket_path = nil + @bus_client = nil + @state_cache = {} + @subscriptions = {} + @subscribed_resources = [] + @report_queue = [] + @setup_complete = false + + def enabled? + ENV["SEMIAN_SYNC_ENABLED"] == "1" || ENV["SEMIAN_SYNC_ENABLED"] == "true" + end + + def socket_path + @socket_path || ENV["SEMIAN_SYNC_SOCKET"] || "/var/run/semian/semian.sock" + end + + # Initialize client connection. Idempotent. + def setup! + return unless enabled? + return if @setup_complete + + configure(socket_path) + at_exit { disconnect rescue nil } + @setup_complete = true + end + + def configure(path) + @socket_path = path + end + + def connected? + @bus_client&.connected? || false + end + + def disconnect + @bus_client&.disconnect + @bus_client = nil + end + + # Called by SubscriberController when server broadcasts a state change. + def handle_state_change(resource_name, new_state) + resource = resource_name.to_sym + state = new_state.to_sym + + @state_cache[resource] = state + + @subscriptions[resource]&.each do |callback| + callback.call(state) + rescue => e + log_error("Callback error for #{resource}: #{e.message}") + end + end + + # Called by SemianBusClient on connection/reconnection. + # Flushes queued reports, re-registers subscriptions, syncs state. + def on_connected(bus_client) + flush_queued_reports + resubscribe_resources + sync_open_states + end + + # Report an error for circuit breaker tracking. Queues if disconnected. + def report_error_async(resource_name, timestamp) + ensure_connection + + if connected? + begin + @bus_client.circuit_breaker.report_error(resource_name.to_s, timestamp) + rescue => e + log_error("Report error failed: #{e.message}") + queue_report({ type: :error, resource: resource_name, timestamp: timestamp }) + end + else + queue_report({ type: :error, resource: resource_name, timestamp: timestamp }) + end + end + + # Report a success. Only meaningful in half-open state. Queues if disconnected. + def report_success_async(resource_name) + ensure_connection + + if connected? + begin + @bus_client.circuit_breaker.report_success(resource_name.to_s) + rescue => e + log_error("Report success failed: #{e.message}") + queue_report({ type: :success, resource: resource_name }) + end + else + queue_report({ type: :success, resource: resource_name }) + end + end + + # Get current state from server. Falls back to cache when disconnected. + def get_state(resource_name) + ensure_connection + + if connected? + begin + state = @bus_client.circuit_breaker.get_state(resource_name.to_s) + @state_cache[resource_name.to_sym] = state&.to_sym + state&.to_sym + rescue => e + log_error("Get state failed: #{e.message}") + @state_cache[resource_name.to_sym] + end + else + @state_cache[resource_name.to_sym] + end + end + + # Register callback for state changes. Persisted across reconnections. + def subscribe_to_updates(resource_name, &block) + resource = resource_name.to_sym + + @subscriptions[resource] ||= [] + @subscriptions[resource] << block + @subscribed_resources << resource unless @subscribed_resources.include?(resource) + + subscribe_on_server(resource) if connected? + end + + # Register resource with server. Idempotent. Called for sync_scope: :shared resources. + # Returns { registered: bool, state: string } or nil if failed. + def register_resource(resource_name, config) + ensure_connection + return nil unless connected? + + begin + result = @bus_client.circuit_breaker.register_resource( + resource_name.to_s, + error_threshold: config[:error_threshold], + error_timeout: config[:error_timeout], + success_threshold: config[:success_threshold], + ) + + if result && result[:state] + @state_cache[resource_name.to_sym] = result[:state].to_sym + end + + log_info("Registered resource #{resource_name}: #{result}") + result + rescue => e + log_error("Register resource failed for #{resource_name}: #{e.message}") + nil + end + end + + def log_info(message) + Semian.logger&.info("[Semian::Sync::Client] #{message}") + end + + def log_error(message) + Semian.logger&.error("[Semian::Sync::Client] ERROR: #{message}") + end + + def reset! + disconnect + @socket_path = nil + @bus_client = nil + @state_cache = {} + @subscriptions = {} + @subscribed_resources = [] + @report_queue = [] + @setup_complete = false + end + + private + + # Lazily connect to server. Connection lifecycle managed by SemianBusClient. + def ensure_connection + return if connected? + return if @socket_path.nil? + + # Clean up stale bus_client that failed to connect + if @bus_client && !@bus_client.connected? + @bus_client.disconnect rescue nil + @bus_client = nil + end + + return if @bus_client # Already attempting connection + + # Create client first so @bus_client is set when on_connected callback fires + endpoint = IO::Endpoint.unix(@socket_path) + @bus_client = SemianBusClient.new(endpoint, self) + @bus_client.start_connection(timeout: 5.0) + + # Clear reference if connection failed + @bus_client = nil unless @bus_client.connected? + rescue Errno::ENOENT, Errno::ECONNREFUSED => e + log_info("Server not available: #{e.message}") + @bus_client&.mark_failed! + @bus_client = nil + rescue => e + log_error("Connection error: #{e.class} - #{e.message}") + @bus_client&.mark_failed! + @bus_client = nil + end + + def subscribe_on_server(resource_name) + return unless @bus_client&.circuit_breaker && @bus_client&.subscriber_proxy + + begin + @bus_client.circuit_breaker.subscribe(resource_name.to_s, @bus_client.subscriber_proxy) + rescue => e + log_error("Subscribe failed for #{resource_name}: #{e.message}") + end + end + + def resubscribe_resources + @subscribed_resources.each { |r| subscribe_on_server(r) } + end + + # Fetch non-closed states from server to sync cache after reconnection. + def sync_open_states + return unless @bus_client&.circuit_breaker + + begin + open_states = @bus_client.circuit_breaker.get_open_states + open_states.each do |resource, state| + @state_cache[resource.to_sym] = state.to_sym + end + rescue => e + log_error("Sync open states failed: #{e.message}") + end + end + + # Queue report for later delivery. Drops oldest when full. + def queue_report(report) + if @report_queue.size < MAX_QUEUE_SIZE + @report_queue << report + else + @report_queue.shift + @report_queue << report + log_info("Report queue full, dropped oldest report") + end + end + + # Send queued reports after reconnecting. + def flush_queued_reports + return unless @bus_client&.circuit_breaker + + queued = @report_queue.dup + @report_queue.clear + + queued.each do |report| + case report[:type] + when :error + @bus_client.circuit_breaker.report_error(report[:resource].to_s, report[:timestamp]) + when :success + @bus_client.circuit_breaker.report_success(report[:resource].to_s) + end + rescue => e + log_error("Flush report failed: #{e.message}") + end + end + end + end +end diff --git a/lib/semian/sync/server.rb b/lib/semian/sync/server.rb new file mode 100644 index 000000000..f5c297ed6 --- /dev/null +++ b/lib/semian/sync/server.rb @@ -0,0 +1,354 @@ +# frozen_string_literal: true + +# Semian::Sync::CircuitBreakerServer - Server-side circuit breaker coordination +# +# Aggregates error/success reports from all connected clients and broadcasts +# state changes back to them via async-bus RPC. +# +# Components: +# - CircuitBreakerController: State machine logic, exposed via async-bus +# - CircuitBreakerServer: Connection management and background tasks +# +# State Machine: +# closed -> open: error_threshold errors within error_timeout +# open -> half_open: after error_timeout seconds +# half_open -> closed: success_threshold successes +# half_open -> open: any error +# +# Example: +# server = Semian::Sync::CircuitBreakerServer.new( +# socket_path: "/var/run/semian/semian.sock", +# resources: { mysql_primary: { error_threshold: 3, error_timeout: 10, success_threshold: 2 } } +# ) +# server.start + +require "async" +require "async/bus/server" +require "async/bus/controller" +require "io/endpoint/unix_endpoint" +require "fileutils" +require "console" + +module Semian + module Sync + # Circuit breaker state machine exposed via async-bus RPC. + # Clients call methods on this controller to report errors/successes, + # query state, and subscribe to state change broadcasts. + class CircuitBreakerController < Async::Bus::Controller + def initialize + @resources = {} + @subscribers = {} # resource_name => [subscriber_proxies] + end + + # Report an error. Returns new state if changed, nil otherwise. + def report_error(resource_name, timestamp) + resource = @resources[resource_name.to_sym] + return nil unless resource + + prev_state = resource[:state] + + # Add error to sliding window + resource[:errors] << timestamp + resource[:last_error_at] = timestamp + + # Remove old errors outside the window + cutoff = timestamp - resource[:error_timeout] + resource[:errors].reject! { |t| t < cutoff } + + # Check state transitions + state_change = nil + if resource[:state] == :closed && resource[:errors].size >= resource[:error_threshold] + resource[:state] = :open + resource[:successes] = 0 + state_change = :open + elsif resource[:state] == :half_open + resource[:state] = :open + resource[:successes] = 0 + state_change = :open + end + + Console.info(self) do + "Error reported for #{resource_name}: errors=#{resource[:errors].size}/#{resource[:error_threshold]}, " \ + "state=#{resource[:state]}, subscribers=#{@subscribers[resource_name.to_sym]&.size || 0}" + end + + if state_change + Console.info(self) { "State transition: #{resource_name} #{prev_state} -> #{state_change}" } + notify_subscribers(resource_name.to_sym, state_change) + end + state_change + end + + # Report a success. Returns new state if changed, nil otherwise. + # Only meaningful in half_open state. + def report_success(resource_name) + resource = @resources[resource_name.to_sym] + return nil unless resource + + prev_state = resource[:state] + + # Successes only count in half_open state + if resource[:state] != :half_open + Console.info(self) do + "Success reported for #{resource_name}: ignored (state=#{resource[:state]}, not half_open)" + end + return nil + end + + resource[:successes] += 1 + + state_change = nil + if resource[:successes] >= resource[:success_threshold] + resource[:state] = :closed + resource[:errors].clear + resource[:successes] = 0 + state_change = :closed + end + + Console.info(self) do + "Success reported for #{resource_name}: successes=#{resource[:successes]}/#{resource[:success_threshold]}, " \ + "state=#{resource[:state]}, subscribers=#{@subscribers[resource_name.to_sym]&.size || 0}" + end + + if state_change + Console.info(self) { "State transition: #{resource_name} #{prev_state} -> #{state_change}" } + notify_subscribers(resource_name.to_sym, state_change) + end + state_change + end + + def get_state(resource_name) + @resources[resource_name.to_sym]&.dig(:state) + end + + # Returns map of resource name to state for non-closed resources. + def get_open_states + @resources.each_with_object({}) do |(name, resource), result| + result[name] = resource[:state] unless resource[:state] == :closed + end + end + + # Register a resource dynamically. Idempotent. + # Called by clients with sync_scope: :shared. Returns { registered: bool, state: string }. + def register_resource(name, error_threshold:, error_timeout:, success_threshold:) + resource_sym = name.to_sym + + # Already registered - return current state + if @resources.key?(resource_sym) + Console.debug(self) { "Resource #{name} already registered, returning existing state" } + return { + registered: false, + state: @resources[resource_sym][:state].to_s, + } + end + + # Register new resource + @resources[resource_sym] = { + name: resource_sym, + error_threshold: error_threshold, + error_timeout: error_timeout, + success_threshold: success_threshold, + errors: [], + successes: 0, + state: :closed, + last_error_at: nil, + } + @subscribers[resource_sym] ||= [] + + Console.info(self) do + "Registered resource: #{name} (errors: #{error_threshold}, timeout: #{error_timeout}s, successes: #{success_threshold})" + end + + { + registered: true, + state: "closed", + } + end + + # Subscribe to state changes. Subscriber must implement on_state_change(resource, state). + def subscribe(resource_name, subscriber) + resource = resource_name.to_sym + @subscribers[resource] ||= [] + @subscribers[resource] << subscriber + + Console.info(self) do + "Client subscribed to #{resource_name}: total_subscribers=#{@subscribers[resource].size}" + end + + # Send current state if not closed + current_state = get_state(resource_name) + if current_state && current_state != :closed + Console.info(self) { "Sending current state #{current_state} to new subscriber" } + subscriber.on_state_change(resource, current_state) rescue nil + end + + true + end + + def unsubscribe(resource_name, subscriber) + resource = resource_name.to_sym + @subscribers[resource]&.delete(subscriber) + true + end + + # Transition open circuits to half-open if timeout elapsed. + def check_timeouts + state_changes = [] + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + @resources.each do |name, resource| + next unless resource[:state] == :open + next unless resource[:last_error_at] + + elapsed = now - resource[:last_error_at] + if elapsed >= resource[:error_timeout] + resource[:state] = :half_open + state_changes << { resource: name, state: :half_open } + end + end + + state_changes.each do |change| + Console.info(self) do + "State transition: #{change[:resource]} open -> half_open (timeout elapsed), " \ + "subscribers=#{@subscribers[change[:resource]]&.size || 0}" + end + notify_subscribers(change[:resource], change[:state]) + end + + state_changes + end + + def statistics + { + resources: @resources.size, + open_circuits: @resources.count { |_, r| r[:state] == :open }, + total_subscribers: @subscribers.values.sum(&:size), + } + end + + def resources + @resources + end + + private + + def notify_subscribers(resource_name, new_state) + entries = @subscribers[resource_name]&.dup || [] + dead_subscribers = [] + + Console.info(self) { "Broadcasting #{new_state} to #{entries.size} subscriber(s) for #{resource_name}" } + + entries.each_with_index do |subscriber, idx| + Console.debug(self) { "Subscriber #{idx + 1} is #{subscriber.class}" } + # Convert symbol to string for RPC serialization + subscriber.on_state_change(resource_name.to_s, new_state.to_s) + Console.debug(self) { "Notified subscriber #{idx + 1}/#{entries.size}" } + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN => e + # Client disconnected - mark for removal + Console.warn(self) { "Subscriber disconnected (#{e.class}), removing" } + dead_subscribers << subscriber + rescue Async::Stop, Async::TimeoutError => e + # Task stopped or timed out - client likely gone + Console.warn(self) { "Subscriber task stopped (#{e.class}), removing" } + dead_subscribers << subscriber + rescue => e + # Log unexpected errors but DON'T remove subscriber - client might still be valid + Console.error(self) { "Failed to notify subscriber: #{e.class} - #{e.message}" } + # Don't add to dead_subscribers - transient errors shouldn't disconnect clients + end + + # Remove dead subscribers + if dead_subscribers.any? + dead_subscribers.each do |subscriber| + @subscribers[resource_name]&.delete(subscriber) + end + Console.info(self) { "Removed #{dead_subscribers.size} dead subscriber(s), remaining: #{@subscribers[resource_name]&.size || 0}" } + end + end + end + + # Manages circuit breaker coordination via async-bus Unix socket. + # Accepts client connections and binds CircuitBreakerController for RPC. + # Runs background tasks for timeout checks and periodic stats logging. + class CircuitBreakerServer + attr_reader :socket_path, :controller + + def initialize(socket_path:, resources: {}) + @socket_path = socket_path + @controller = CircuitBreakerController.new + @running = false + @client_count = 0 + + resources.each { |name, config| register_resource(name, **config) } + end + + def register_resource(name, error_threshold:, error_timeout:, success_threshold:, **_opts) + @controller.register_resource( + name, + error_threshold: error_threshold, + error_timeout: error_timeout, + success_threshold: success_threshold, + ) + end + + def resources + @controller.resources + end + + # Start server and block until stop is called. + def start + # Clean up any existing socket + File.unlink(@socket_path) if File.exist?(@socket_path) + + endpoint = IO::Endpoint.unix(@socket_path) + bus_server = Async::Bus::Server.new(endpoint) + + @running = true + + Console.info(self) { "Semian Sync Server listening on #{@socket_path}" } + + Async do |task| + # Background task: check for timeout transitions (open -> half_open) + task.async do + while @running + sleep(1) + @controller.check_timeouts + end + end + + # Background task: periodic stats logging + task.async do + while @running + sleep(10) + stats = @controller.statistics + Console.info(self) do + "Stats: clients=#{@client_count}, resources=#{stats[:resources]}, " \ + "open_circuits=#{stats[:open_circuits]}, subscribers=#{stats[:total_subscribers]}" + end + end + end + + # Accept client connections + bus_server.accept do |connection| + @client_count += 1 + Console.info(self) { "Client connected (total: #{@client_count})" } + connection.bind(:circuit_breaker, @controller) + rescue => e + Console.error(self) { "Connection error: #{e.class} - #{e.message}" } + ensure + @client_count -= 1 + Console.info(self) { "Client disconnected (remaining: #{@client_count})" } + end + end + end + + def stop + @running = false + end + + def running? + @running + end + end + end +end diff --git a/test/circuit_breaker_sync_test.rb b/test/circuit_breaker_sync_test.rb new file mode 100644 index 000000000..29a68e6c9 --- /dev/null +++ b/test/circuit_breaker_sync_test.rb @@ -0,0 +1,560 @@ +# frozen_string_literal: true + +require "rubygems" +require "bundler/setup" +require "minitest/autorun" + +$VERBOSE = true +require "semian" +require "semian/sync/circuit_breaker" +require "semian/sync/server" +require "async/bus/server" +require "io/endpoint/bound_endpoint" + +Semian.logger = Logger.new(nil, Logger::FATAL) + +class CircuitBreakerSyncServerTest < Minitest::Test + def setup + @socket_path = "/tmp/semian_test_#{Process.pid}_#{rand(10000)}.sock" + @server = Semian::Sync::CircuitBreakerServer.new( + socket_path: @socket_path, + resources: { + test_resource: { + error_threshold: 3, + error_timeout: 5, + success_threshold: 2, + }, + }, + ) + end + + def teardown + @server.stop if @server.running? + File.delete(@socket_path) if File.exist?(@socket_path) + end + + def test_register_resource + assert @server.resources.key?(:test_resource) + assert_equal :closed, @server.resources[:test_resource][:state] + end + + def test_resource_transitions_to_open_after_error_threshold + now = Time.now.to_f + + # Send 2 errors - should stay closed (using public RPC-accessible method) + @server.controller.report_error(:test_resource, now) + assert_equal :closed, @server.resources[:test_resource][:state] + + @server.controller.report_error(:test_resource, now + 1) + assert_equal :closed, @server.resources[:test_resource][:state] + + # Send 3rd error - should open + @server.controller.report_error(:test_resource, now + 2) + assert_equal :open, @server.resources[:test_resource][:state] + end + + def test_old_errors_expire + now = Time.now.to_f + + # Send 2 errors + @server.controller.report_error(:test_resource, now) + @server.controller.report_error(:test_resource, now + 1) + + # Send error 6 seconds later (outside 5s error_timeout window) + @server.controller.report_error(:test_resource, now + 6) + + # Should still be closed (only 1 recent error) + assert_equal :closed, @server.resources[:test_resource][:state] + end + + def test_error_in_half_open_reopens_circuit + now = Time.now.to_f + + # Open the circuit + 3.times { |i| @server.controller.report_error(:test_resource, now + i) } + assert_equal :open, @server.resources[:test_resource][:state] + + # Manually transition to half-open (via internal access for testing) + @server.resources[:test_resource][:state] = :half_open + + # Error in half-open should reopen + @server.controller.report_error(:test_resource, now + 10) + assert_equal :open, @server.resources[:test_resource][:state] + end + + def test_success_in_half_open_closes_circuit + # Put circuit in half-open state (via internal access for testing) + @server.resources[:test_resource][:state] = :half_open + + # Send successes + @server.controller.report_success(:test_resource) + assert_equal :half_open, @server.resources[:test_resource][:state] + + @server.controller.report_success(:test_resource) + assert_equal :closed, @server.resources[:test_resource][:state] + end + + def test_success_in_closed_state_is_ignored + @server.controller.report_success(:test_resource) + assert_equal :closed, @server.resources[:test_resource][:state] + assert_equal 0, @server.resources[:test_resource][:successes] + end + + # === Dynamic Resource Registration Tests === + + def test_dynamic_register_resource_creates_new_resource + # Create server with no initial resources + empty_server = Semian::Sync::CircuitBreakerServer.new( + socket_path: "/tmp/semian_empty_#{Process.pid}.sock", + resources: {}, + ) + + # Server starts empty + assert_equal 0, empty_server.resources.size + + # Register resource dynamically via public RPC method + result = empty_server.controller.register_resource( + "dynamic_resource", + error_threshold: 5, + error_timeout: 30, + success_threshold: 3, + ) + + # Should report as newly registered + assert result[:registered], "Should report as newly registered" + assert_equal "closed", result[:state] + + # Resource should now exist + assert_equal 1, empty_server.resources.size + assert empty_server.resources.key?(:dynamic_resource) + + # Check configuration was applied correctly + resource = empty_server.resources[:dynamic_resource] + assert_equal 5, resource[:error_threshold] + assert_equal 30, resource[:error_timeout] + assert_equal 3, resource[:success_threshold] + assert_equal :closed, resource[:state] + end + + def test_dynamic_register_resource_is_idempotent + # Create server with no initial resources + empty_server = Semian::Sync::CircuitBreakerServer.new( + socket_path: "/tmp/semian_empty_#{Process.pid}.sock", + resources: {}, + ) + + # Register resource first time + result1 = empty_server.controller.register_resource( + "idempotent_resource", + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + ) + assert result1[:registered], "First registration should report as new" + + # Change state to open (simulating circuit breaker opening) + empty_server.resources[:idempotent_resource][:state] = :open + + # Register same resource again with different config + result2 = empty_server.controller.register_resource( + "idempotent_resource", + error_threshold: 10, + error_timeout: 60, + success_threshold: 5, + ) + + # Should NOT be marked as newly registered + refute result2[:registered], "Second registration should not be marked as new" + + # Should return current state + assert_equal "open", result2[:state] + + # Original config should be preserved (not overwritten) + resource = empty_server.resources[:idempotent_resource] + assert_equal 3, resource[:error_threshold], "Original config should be preserved" + assert_equal 10, resource[:error_timeout], "Original config should be preserved" + end + + def test_dynamic_resource_can_report_errors_after_registration + # Create server with no initial resources + empty_server = Semian::Sync::CircuitBreakerServer.new( + socket_path: "/tmp/semian_empty_#{Process.pid}.sock", + resources: {}, + ) + + # Register resource dynamically + empty_server.controller.register_resource( + "error_test_resource", + error_threshold: 2, + error_timeout: 10, + success_threshold: 1, + ) + + # Report errors to open the circuit + now = Time.now.to_f + empty_server.controller.report_error(:error_test_resource, now) + assert_equal :closed, empty_server.resources[:error_test_resource][:state] + + empty_server.controller.report_error(:error_test_resource, now + 1) + assert_equal :open, empty_server.resources[:error_test_resource][:state] + end + + def test_subscribe_and_unsubscribe + subscriber1 = Object.new + subscriber2 = Object.new + + @server.controller.subscribe(:test_resource, subscriber1) + @server.controller.subscribe(:test_resource, subscriber2) + + # Should have 2 subscribers + assert_equal 2, @server.controller.statistics[:total_subscribers] + + # Unsubscribe one + @server.controller.unsubscribe(:test_resource, subscriber1) + assert_equal 1, @server.controller.statistics[:total_subscribers] + + # Unsubscribe the other + @server.controller.unsubscribe(:test_resource, subscriber2) + assert_equal 0, @server.controller.statistics[:total_subscribers] + end +end + +class CircuitBreakerSyncClientTest < Minitest::Test + def setup + Semian::Sync::Client.reset! + @received_states = [] + end + + def teardown + Semian::Sync::Client.reset! + end + + def test_queue_reports_when_disconnected + refute Semian::Sync::Client.connected? + + # Report should be queued, not fail + Semian::Sync::Client.report_error_async(:test_resource, Time.now.to_f) + + queue = Semian::Sync::Client.instance_variable_get(:@report_queue) + assert_equal 1, queue.size + assert_equal :error, queue.first[:type] + end + + def test_queue_overflow_drops_oldest + # Fill queue beyond max (default 1000, but we'll test the behavior) + max_queue = Semian::Sync::Client::MAX_QUEUE_SIZE + (max_queue + 5).times do |i| + Semian::Sync::Client.report_error_async("resource_#{i}", Time.now.to_f) + end + + queue = Semian::Sync::Client.instance_variable_get(:@report_queue) + assert_equal max_queue, queue.size + + # First few should be dropped, keeping newest + assert_equal "resource_5", queue.first[:resource] + end + + def test_subscribe_to_updates + Semian::Sync::Client.subscribe_to_updates(:test_resource) do |state| + @received_states << state + end + + # Simulate receiving a state update + Semian::Sync::Client.handle_state_change(:test_resource, :open) + + assert_equal [:open], @received_states + end + + def test_multiple_subscriptions + received1 = [] + received2 = [] + + Semian::Sync::Client.subscribe_to_updates(:test_resource) { |s| received1 << s } + Semian::Sync::Client.subscribe_to_updates(:test_resource) { |s| received2 << s } + + Semian::Sync::Client.handle_state_change(:test_resource, :open) + + assert_equal [:open], received1 + assert_equal [:open], received2 + end + + def test_callback_error_doesnt_break_other_callbacks + callback_called = false + + Semian::Sync::Client.subscribe_to_updates(:test_resource) { raise "intentional error" } + Semian::Sync::Client.subscribe_to_updates(:test_resource) { callback_called = true } + + Semian::Sync::Client.handle_state_change(:test_resource, :open) + + assert callback_called, "Second callback should be called despite first failing" + end + + def test_register_resource_returns_nil_when_disconnected + refute Semian::Sync::Client.connected? + + # Should return nil when not connected (can't register) + result = Semian::Sync::Client.register_resource(:test_resource, { + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + }) + + assert_nil result, "Should return nil when not connected" + end +end + +# Integration tests following async-bus test patterns: +# Both server and client run within a single Async reactor. +# Server is started as an async task, client connects within the same reactor. +class CircuitBreakerSyncIntegrationTest < Minitest::Test + def setup + @original_env = ENV["SEMIAN_SYNC_ENABLED"] + ENV["SEMIAN_SYNC_ENABLED"] = "1" + Semian::Sync::Client.reset! + end + + def teardown + Semian::Sync::Client.disconnect + Semian::Sync::Client.reset! + + if @original_env + ENV["SEMIAN_SYNC_ENABLED"] = @original_env + else + ENV.delete("SEMIAN_SYNC_ENABLED") + end + end + + def test_client_connects_to_server + connected = false + + Dir.mktmpdir do |dir| + socket_path = File.join(dir, "semian.sock") + + Async do |task| + # Create server components directly (following async-bus pattern) + # Pre-bind the endpoint to ensure socket is ready before client connects + endpoint = IO::Endpoint.unix(socket_path) + bound_endpoint = endpoint.bound + + bus_server = Async::Bus::Server.new(bound_endpoint) + controller = Semian::Sync::CircuitBreakerController.new + controller.register_resource(:test_resource, error_threshold: 3, error_timeout: 5, success_threshold: 2) + + # Start server as async task within the same reactor + server_task = task.async do + bus_server.accept do |connection| + connection.bind(:circuit_breaker, controller) + end + end + + # Small yield to let server start accepting + sleep(0.01) + + # Configure client + Semian::Sync::Client.configure(socket_path) + + # Trigger connection by calling get_state (which calls ensure_connection) + Semian::Sync::Client.get_state(:test_resource) + + # Check connection status + connected = Semian::Sync::Client.connected? + + # Clean up + server_task.stop + bound_endpoint.close + end + end + + assert connected, "Client should be connected to server" + end + + def test_client_receives_state_broadcasts + received_states = [] + + Dir.mktmpdir do |dir| + socket_path = File.join(dir, "semian.sock") + + Async do |task| + # Create server components with pre-bound endpoint + endpoint = IO::Endpoint.unix(socket_path) + bound_endpoint = endpoint.bound + + bus_server = Async::Bus::Server.new(bound_endpoint) + controller = Semian::Sync::CircuitBreakerController.new + controller.register_resource(:test_resource, error_threshold: 3, error_timeout: 5, success_threshold: 2) + + # Start server as async task + server_task = task.async do + bus_server.accept do |connection| + connection.bind(:circuit_breaker, controller) + end + end + + # Small yield to let server start + sleep(0.01) + + # Subscribe to updates before connecting + Semian::Sync::Client.subscribe_to_updates(:test_resource) do |state| + received_states << state + end + + # Configure client and trigger connection + Semian::Sync::Client.configure(socket_path) + + # Report errors (this triggers connection via ensure_connection) + now = Time.now.to_f + 3.times do |i| + Semian::Sync::Client.report_error_async(:test_resource, now + i) + end + + # Verify connected after reporting + assert Semian::Sync::Client.connected?, "Client should be connected" + + # Small yield to allow state change notification to propagate + sleep(0.01) + + # Clean up + server_task.stop + bound_endpoint.close + end + end + + assert_includes received_states, :open, "Client should have received open state" + end + + def test_client_queues_when_server_unavailable + queued = false + + Dir.mktmpdir do |dir| + socket_path = File.join(dir, "semian.sock") + + Async do + # Configure client with non-existent socket + Semian::Sync::Client.configure(socket_path) + + # Should not be connected (no server) + refute Semian::Sync::Client.connected? + + # Report should be queued, not fail + Semian::Sync::Client.report_error_async(:test_resource, Time.now.to_f) + + # Check queue + queue = Semian::Sync::Client.instance_variable_get(:@report_queue) + queued = queue.size == 1 && queue.first[:type] == :error + end + end + + assert queued, "Report should be queued when server unavailable" + end + + def test_client_registers_resource_dynamically + registration_result = nil + + Dir.mktmpdir do |dir| + socket_path = File.join(dir, "semian.sock") + + Async do |task| + # Create server with NO initial resources (empty) + endpoint = IO::Endpoint.unix(socket_path) + bound_endpoint = endpoint.bound + + bus_server = Async::Bus::Server.new(bound_endpoint) + controller = Semian::Sync::CircuitBreakerController.new + + # Verify server starts empty + assert_equal 0, controller.resources.size + + # Start server as async task + server_task = task.async do + bus_server.accept do |connection| + connection.bind(:circuit_breaker, controller) + end + end + + sleep(0.01) + + # Configure client and connect + Semian::Sync::Client.configure(socket_path) + + # Register resource dynamically via client + registration_result = Semian::Sync::Client.register_resource(:dynamic_mysql_shard, { + error_threshold: 6, + error_timeout: 45, + success_threshold: 2, + }) + + # Verify resource was registered on server + assert controller.resources.key?(:dynamic_mysql_shard), "Resource should be registered on server" + assert_equal :closed, controller.resources[:dynamic_mysql_shard][:state] + assert_equal 6, controller.resources[:dynamic_mysql_shard][:error_threshold] + + # Clean up + server_task.stop + bound_endpoint.close + end + end + + # Verify registration result + refute_nil registration_result + assert registration_result[:registered], "Should report as newly registered" + assert_equal "closed", registration_result[:state] + end + + def test_multiple_clients_can_register_same_resource + results = [] + + Dir.mktmpdir do |dir| + socket_path = File.join(dir, "semian.sock") + + Async do |task| + # Create server with NO initial resources + endpoint = IO::Endpoint.unix(socket_path) + bound_endpoint = endpoint.bound + + bus_server = Async::Bus::Server.new(bound_endpoint) + controller = Semian::Sync::CircuitBreakerController.new + + # Start server + server_task = task.async do + bus_server.accept do |connection| + connection.bind(:circuit_breaker, controller) + end + end + + sleep(0.01) + + # First client registers + Semian::Sync::Client.configure(socket_path) + result1 = Semian::Sync::Client.register_resource(:shared_resource, { + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + }) + results << result1 + + # Reset client to simulate second worker + Semian::Sync::Client.reset! + Semian::Sync::Client.configure(socket_path) + + # Second client tries to register same resource + result2 = Semian::Sync::Client.register_resource(:shared_resource, { + error_threshold: 3, + error_timeout: 10, + success_threshold: 2, + }) + results << result2 + + # Only one resource should exist on server + assert_equal 1, controller.resources.size + + # Clean up + server_task.stop + bound_endpoint.close + end + end + + # First registration should be new, second should be existing + assert results[0][:registered], "First registration should be new" + refute results[1][:registered], "Second registration should find existing" + end +end