From cf4252c304afc9120bf0704ce9a1a6bcab1a7d73 Mon Sep 17 00:00:00 2001 From: Pawel Klapec Date: Thu, 30 Apr 2026 12:41:41 +0000 Subject: [PATCH 1/5] Re implmenet connection pooling --- .github/workflows/rspec.yml | 12 +- Gemfile | 2 +- README.md | 84 ++++- docker-compose.yml | 6 +- lib/mysql_framework/connector.rb | 212 +++++++------ lib/mysql_framework/mysql_connection_pool.rb | 175 +++++++++++ lib/mysql_framework/scripts/manager.rb | 2 +- .../stats/aws_metric_publisher.rb | 123 ++++++++ lib/mysql_framework/stats/dimension_map.rb | 51 +++ lib/mysql_framework/version.rb | 2 +- mysql_framework.gemspec | 6 +- spec/lib/mysql_framework/connector_spec.rb | 290 ++++++++---------- .../mysql_connection_pool_spec.rb | 251 +++++++++++++++ .../stats/aws_metric_publisher_spec.rb | 89 ++++++ 14 files changed, 1025 insertions(+), 280 deletions(-) create mode 100644 lib/mysql_framework/mysql_connection_pool.rb create mode 100644 lib/mysql_framework/stats/aws_metric_publisher.rb create mode 100644 lib/mysql_framework/stats/dimension_map.rb create mode 100644 spec/lib/mysql_framework/mysql_connection_pool_spec.rb create mode 100644 spec/lib/mysql_framework/stats/aws_metric_publisher_spec.rb diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index a30f9ee..dc7e29b 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -7,7 +7,7 @@ jobs: services: mysql: - image: mysql:5.7 + image: mysql:8.0 ports: - 3306:3306 env: @@ -22,16 +22,8 @@ jobs: - uses: actions/checkout@v2 - uses: ruby/setup-ruby@v1 with: - ruby-version: 2.4 + ruby-version: 3.4 bundler-cache: true - name: Run tests run: bundle exec rspec - - - name: Code Coverage - uses: paambaati/codeclimate-action@v2.7.5 - env: - CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} - with: - coverageLocations: | - ${{github.workspace}}/coverage/.resultset.json:simplecov diff --git a/Gemfile b/Gemfile index b712510..8ea20fa 100644 --- a/Gemfile +++ b/Gemfile @@ -7,5 +7,5 @@ group :test, :development do end group :test do - gem 'simplecov', '0.17.1', require: false + gem 'simplecov', require: false end diff --git a/README.md b/README.md index 83971fd..76e0907 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,11 @@ gem 'mysql_framework' #### MySQL Connection Pooling Variables -* `MYSQL_START_POOL_SIZE` - how many connections should be created by default (default: `1`) +* `MYSQL_CONNECTION_POOL_ENABLED` - enables/disables pooling (default: `false`) * `MYSQL_MAX_POOL_SIZE` - how many connections should the pool be allowed to grow to (default: `5`) +* `MYSQL_POOL_TIMEOUT` - how long to wait for a pooled connection before timing out (default: `5` seconds) +* `MYSQL_POOL_IDLE_TIMEOUT` - how long a pooled connection can remain idle before being reaped (default: `300` seconds) +* `MYSQL_POOL_IDLE_REAP_TIME` - time interval between background thread checking for idle connections to reap (default: `60` seconds) #### MySQL Migration Variables @@ -161,7 +164,7 @@ MysqlFramework::Connector.new(options) #### #setup -Sets up the connection pooling. Creates `ENV['MYSQL_START_POOL_SIZE']` `Mysql2::Client` instances up front. This is provided as a separate method to allow for use within process forking where connections would need to be created after forking the process. +Sets up connection pooling using `connection_pool` with `ENV['MYSQL_MAX_POOL_SIZE']` and `ENV['MYSQL_POOL_TIMEOUT']`. Connections are created lazily by the pool when first needed. ```ruby connector.setup @@ -169,7 +172,7 @@ connector.setup #### #dispose -Closes all the `Mysql2::Client` connections and removes the connection pool. Intended as a clean-up method to be used on process fork shutdown. +Closes pooled `Mysql2::Client` connections and removes the pool. Intended as a clean-up method to be used on process fork shutdown. ```ruby connector.dispose @@ -177,7 +180,8 @@ connector.dispose #### #check_out -Check out a client from the connection pool. Will create new `Mysql2::Client` instances up-to `ENV['MYSQL_MAX_POOL_SIZE']` times if no idle connections are available. +Checks out a `Mysql2::Client` instance from the pool, sanitizes it, and returns it. +When pooling is disabled, it returns a newly created client. ```ruby client = connector.check_out @@ -185,7 +189,8 @@ client = connector.check_out #### #check_in -Check in a client to the connection pool +Checks a client back in to the pool. +When pooling is disabled, it closes the provided client. ```ruby client = connector.check_out @@ -195,7 +200,17 @@ connector.check_in(client) #### #with_client -Called with a block. The method checks out a client from the pool and yields it to the block. Finally it ensures that the client is always checked back into the pool. +Called with a block. The method obtains a client (from the pool when enabled), yields it to the block, and guarantees cleanup. + +When pooling is enabled, it uses the pool lifecycle (`ConnectionPool#with`) and supports optional discarding of the current pooled connection: + +```ruby +connector.with_client(discard_current_pool_connection: true) do |client| + # use client +end +``` + +When pooling is disabled, it creates a fresh client for the block and closes it afterwards. ```ruby connector.with_client do |client| @@ -280,10 +295,65 @@ The default options used to initialise MySQL2::Client instances: database: ENV.fetch('MYSQL_DATABASE'), username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD'), - reconnect: true + reconnect: true, + read_timeout: Integer(ENV.fetch('MYSQL_READ_TIMEOUT', 30)), + write_timeout: Integer(ENV.fetch('MYSQL_WRITE_TIMEOUT', 10)) } ``` +### MysqlFramework::Stats::AwsMetricPublisher + +Publishes connection-pool metrics (`size`, `available`, `idle`) to AWS CloudWatch on a configurable interval via a background thread. + +**Setup sequence** — the connector must be set up before the publisher is started: + +```ruby +connector = MysqlFramework::Connector.new +connector.setup # must come first + +publisher = MysqlFramework::Stats::AwsMetricPublisher.new( + connector: connector, + publish_interval: 300 # seconds, default +) +publisher.start +``` + +On shutdown, stop the publisher before disposing the connector: + +```ruby +publisher.stop +connector.dispose +``` + +#### Customising CloudWatch dimensions and namespace + +Use `MysqlFramework::Stats::DimensionMap` to configure the CloudWatch namespace and dimensions. Each attribute falls back to the corresponding environment variable when not set explicitly: + +| Attribute | ENV fallback | CloudWatch dimension | +|---|---|---| +| `service_name` | `SERVICE_NAME` | `ServiceName` | +| `application` | `APPLICATION` | `Application` | +| `environment` | `ENVIRONMENT` | `Environment` | +| `landscape` | `LANDSCAPE` | `Landscape` | +| `namespace` | `AWS_METRICS_NAMESPACE` | (namespace, default: `MysqlFramework`) | + +```ruby +dimension_map = MysqlFramework::Stats::DimensionMap.new( + service_name: 'my-service', + application: 'my-app', + environment: 'production', + landscape: 'us-east', + namespace: 'MyCompany/MySQL' +) + +publisher = MysqlFramework::Stats::AwsMetricPublisher.new( + connector: connector, + dimension_map: dimension_map, + publish_interval: 60 +) +publisher.start +``` + ### MysqlFramework::SqlCondition A representation of a MySQL Condition for a column. Created automatically by SqlColumn diff --git a/docker-compose.yml b/docker-compose.yml index d4d1a05..25fcbd2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,6 @@ -version: '2.1' - services: test-runner: - image: ruby:2.4 + image: ruby:3.4 working_dir: /usr/src/app container_name: test-runner command: sh -c "while true; do echo 'Container is running..'; sleep 5; done" @@ -21,7 +19,7 @@ services: test-mysql: container_name: test-mysql - image: mysql:5.7 + image: mysql:8 restart: always environment: MYSQL_ROOT_PASSWORD: admin diff --git a/lib/mysql_framework/connector.rb b/lib/mysql_framework/connector.rb index ee54d32..f0a5bce 100644 --- a/lib/mysql_framework/connector.rb +++ b/lib/mysql_framework/connector.rb @@ -1,133 +1,172 @@ # frozen_string_literal: true +require 'connection_pool' +require_relative 'mysql_connection_pool' + module MysqlFramework class Connector + attr_reader :connection_pool + + # Initializes a connector instance with MySQL client options. + # + # @param options [Hash] custom MySQL client options that override defaults + # @return [void] def initialize(options = {}) @options = default_options.merge(options) - @mutex = Mutex.new - Mysql2::Client.default_query_options.merge!(symbolize_keys: true, cast_booleans: true) end - # This method is called to setup a pool of MySQL connections. + # Sets up the MySQL connection pool when pooling is enabled. + # + # @return [ConnectionPool, nil] configured pool, or nil when pooling is disabled def setup return unless connection_pool_enabled? - @connection_pool = ::Queue.new - - start_pool_size.times { @connection_pool.push(new_client) } - - @created_connections = start_pool_size + @connection_pool = MysqlFramework::MysqlConnectionPool.new(@options) + @connection_pool.setup end - # This method is called to close all MySQL connections in the pool and dispose of the pool itself. + # Disposes of the connection pool and closes pooled connections. + # + # @return [void] def dispose - return if @connection_pool.nil? - - until @connection_pool.empty? - conn = @connection_pool.pop(true) - conn&.close - end + return unless connection_pool_enabled? + @connection_pool&.dispose @connection_pool = nil end - # This method is called to get the idle connection queue for this connector. - def connections - @connection_pool - end - - # This method is called to fetch a client from the connection pool. + # Checks out a MySQL client, sanitizing it before use. + # + # @return [Mysql2::Client] checked-out client + # @raise [ConnectionSanitizationError] when sanitization repeatedly fails + # @raise [Mysql2::Error] when checkout or sanitization fails due to MySQL errors def check_out - @mutex.synchronize do - begin - return new_client unless connection_pool_enabled? - - client = @connection_pool.pop(true) - - client.ping if @options[:reconnect] + return new_client unless connection_pool_enabled? - client - rescue ThreadError - if @created_connections < max_pool_size - client = new_client - @created_connections += 1 - return client - end - - MysqlFramework.logger.error { "[#{self.class}] - Database connection pool depleted." } - - raise 'Database connection pool depleted.' - end - end + @connection_pool.check_out end - # This method is called to check a client back in to the connection when no longer needed. + # Returns a MySQL client back to the pool or closes it when pooling is disabled. + # + # @param client [Mysql2::Client, nil] client to return or close + # @return [void] def check_in(client) - @mutex.synchronize do - return client&.close unless connection_pool_enabled? + return client&.close unless connection_pool_enabled? - client = new_client if client&.closed? - @connection_pool.push(client) - end + @connection_pool.check_in(client) end - # This method is called to use a client from the connection pool. - def with_client(provided = nil) - client = provided || check_out - yield client - ensure - check_in(client) if client && !provided + # Yields a MySQL client from the pool, or yields the provided client directly. + # + # @param provided_client [Mysql2::Client, nil] existing client to yield without pool checkout + # @param discard_current_pool_connection [Boolean] whether to discard the pooled connection after use + # @yield [client] block that performs work with a MySQL client + # @yieldparam client [Mysql2::Client] + # @return [Object] block result + # @raise [Mysql2::Error] re-raises MySQL errors from the block + def with_client(provided_client = nil, discard_current_pool_connection: false) + return yield provided_client if provided_client + return with_new_client { |c| yield c } unless connection_pool_enabled? + + @connection_pool.with_client(discard_current_pool_connection:) { |c| yield c } end - # This method is called to execute a prepared statement + # Executes a prepared statement. + # + # @param query [Object] query object responding to +sql+ and +params+ + # @param provided_client [Mysql2::Client, nil] optional existing client + # @return [Array, nil] query result rows + # @raise [Mysql2::Error] when statement preparation or execution fails # - # @note Ensure we free any result and close each statement, otherwise we - # can run into a 'Commands out of sync' error if multiple threads are - # running different queries at the same time. + # NOTE: + # We must always free the result and close the prepared statement. + # Otherwise MySQL may raise "Commands out of sync" when the same + # connection is reused (e.g. via connection pooling). + # + # The connection itself must NOT be closed here because it is + # managed by the connection pool. def execute(query, provided_client = nil) with_client(provided_client) do |client| + statement = nil + result = nil + begin statement = client.prepare(query.sql) - result = statement.execute(*query.params) - result&.to_a + result = statement.execute( + *query.params, symbolize_keys: true, cast_booleans: true + ) + final = result&.to_a + final ensure + client&.abandon_results! result&.free statement&.close end end end - # This method is called to execute a query + # Executes a SQL query. + # + # @param query_string [String] SQL query to execute + # @param provided_client [Mysql2::Client, nil] optional existing client + # @return [Mysql2::Result] raw MySQL result + # @raise [Mysql2::Error] when query execution fails def query(query_string, provided_client = nil) - with_client(provided_client) { |client| client.query(query_string) } + with_client(provided_client) { |conn| conn.query(query_string) } end - # This method is called to execute a query which will return multiple result sets in an array + # Executes a multi-statement SQL query and collects all result sets. + # + # @param query_string [String] multi-statement SQL query + # @param provided_client [Mysql2::Client, nil] optional existing client + # @return [Array>] list of result sets + # @raise [Mysql2::Error] when query execution or result fetching fails def query_multiple_results(query_string, provided_client = nil) - results = with_client(provided_client) do |client| - result = [] - result << client.query(query_string) - result << client.store_result while client.next_result - result.compact + results = nil + + # Multiple statement query is buggy and client cannot be reused after calling next_result/store_result + # Client's state gets corrupted and leaks into next queries. The reason is unknown. + # As a result we do not return client back to the pool but instead close connection which is not optimal. + with_client(provided_client, discard_current_pool_connection: true) do |client| + raw_results = [] + query_call = client.query(query_string) + raw_results << query_call&.to_a + query_call&.free + + while client.more_results? + client.next_result + query_call = client.store_result + raw_results << query_call&.to_a + query_call&.free + end + + results = raw_results.compact + results + ensure + client&.abandon_results! end - results.map(&:to_a) + results end - # This method is called to use a client within a transaction + # Executes a block within a database transaction. + # + # @yield [client] block executed between BEGIN and COMMIT + # @yieldparam client [Mysql2::Client] + # @return [Object] block result + # @raise [LocalJumpError] when no block is given + # @raise [StandardError] re-raises any exception after rollback def transaction - raise ArgumentError, 'No block was given' unless block_given? + raise LocalJumpError, 'No block was given' unless block_given? with_client do |client| - begin - client.query('BEGIN') - yield client - client.query('COMMIT') - rescue StandardError => e - client.query('ROLLBACK') - raise e - end + client.query('BEGIN') + yield client + client.query('COMMIT') + rescue StandardError => e + client.query('ROLLBACK') + raise e end end @@ -146,20 +185,21 @@ def default_options } end + def with_new_client + client = new_client + yield client + ensure + client&.close + end + def new_client Mysql2::Client.new(@options) end def connection_pool_enabled? - @connection_pool_enabled ||= ENV.fetch('MYSQL_CONNECTION_POOL_ENABLED', 'true').casecmp?('true') - end - - def start_pool_size - @start_pool_size ||= Integer(ENV.fetch('MYSQL_START_POOL_SIZE', 1)) - end + return @connection_pool_enabled unless @connection_pool_enabled.nil? - def max_pool_size - @max_pool_size ||= Integer(ENV.fetch('MYSQL_MAX_POOL_SIZE', 5)) + @connection_pool_enabled = ENV.fetch('MYSQL_CONNECTION_POOL_ENABLED', 'false').casecmp?('true') end end end diff --git a/lib/mysql_framework/mysql_connection_pool.rb b/lib/mysql_framework/mysql_connection_pool.rb new file mode 100644 index 0000000..eb897a2 --- /dev/null +++ b/lib/mysql_framework/mysql_connection_pool.rb @@ -0,0 +1,175 @@ +# frozen_string_literal: true + +require 'connection_pool' + +module MysqlFramework + class MysqlConnectionPool + class ConnectionSanitizationError < StandardError; end + + CLEAN_IDLE_CONNECTIONS_THREAD_NAME = 'clean-idle-connections' + + attr_reader :connections + + # Initializes a connection pool instance with MySQL client options. + # + # @param options [Hash] MySQL client options passed to each pooled connection + # @return [void] + def initialize(options) + @options = options + @setup_mutex = Mutex.new + end + + # Sets up the MySQL connection pool. Idempotent — safe to call more than once. + # + # @return [ConnectionPool] configured pool + def setup + @setup_mutex.synchronize do + return if connections + + @connections = ConnectionPool.new(size: max_pool_size, timeout: pool_timeout) do + Mysql2::Client.new(@options) + end + + start_clean_idle_connections_thread + end + end + + # Disposes of the connection pool and closes pooled connections. + # + # @return [void] + def dispose + @setup_mutex.synchronize do + dispose_clean_idle_connections_thread + connections&.shutdown(&:close) + @connections = nil + end + end + + # Returns key connection-pool metrics for monitoring. + # + # @return [Hash{Symbol => Integer}] pool size and availability metrics + def pool_stats + return { size: 0, available: 0, idle: 0 } if connections.nil? + + { + size: connections.size, + available: connections.available, + idle: connections.idle + } + end + + # Checks out a MySQL client, sanitizing it before use. + # + # @return [Mysql2::Client] checked-out client + # @raise [ConnectionSanitizationError] when sanitization repeatedly fails + # @raise [Mysql2::Error] when checkout or sanitization fails due to MySQL errors + def check_out + sanitization_retries = 0 + begin + conn = connections.checkout + sanitize_connection!(conn) + conn + rescue ConnectionSanitizationError + discard_current_connection! + sanitization_retries += 1 + retry if sanitization_retries <= 1 + raise + rescue Mysql2::Error + discard_current_connection! + raise + end + end + + # Returns a MySQL client back to the pool or closes it when pooling is disabled. + # + # @param client [Mysql2::Client, nil] client to return or close + # @return [void] + def check_in(client) + return if client.nil? + + discard_current_connection! if client.closed? + connections.checkin + end + + # Yields a MySQL client from the pool, or yields the provided client directly. + # + # @param provided_client [Mysql2::Client, nil] existing client to yield without pool checkout + # @param discard_current_pool_connection [Boolean] whether to discard the pooled connection after use + # @yield [client] block that performs work with a MySQL client + # @yieldparam client [Mysql2::Client] + # @return [Object] block result + # @raise [Mysql2::Error] re-raises MySQL errors from the block + def with_client(discard_current_pool_connection: false) + sanitization_retries = 0 + + begin + connections.with do |conn| + sanitize_connection!(conn) + yield conn + rescue ConnectionSanitizationError, Mysql2::Error + discard_current_connection! + raise + ensure + discard_current_connection! if discard_current_pool_connection + end + rescue ConnectionSanitizationError + sanitization_retries += 1 + retry if sanitization_retries <= 1 + raise + end + end + + private + + def start_clean_idle_connections_thread + @idle_connections_thread = Thread.new do + Thread.current.name = CLEAN_IDLE_CONNECTIONS_THREAD_NAME + loop do + sleep idle_reap_loop_time + break unless Thread.current == @idle_connections_thread + + connections&.reap(idle_seconds: idle_timeout, &:close) + end + end + + @idle_connections_thread.abort_on_exception = false + @idle_connections_thread + end + + def dispose_clean_idle_connections_thread + @idle_connections_thread&.join(5) + @idle_connections_thread&.kill + @idle_connections_thread = nil + end + + def sanitize_connection!(conn) + conn.ping + conn.abandon_results! + conn.query('ROLLBACK') + rescue Mysql2::Error => e + raise ConnectionSanitizationError, "Connection sanitization failed: #{e.message}" + end + + def discard_current_connection! + connections&.discard_current_connection(&:close) + rescue StandardError + nil + end + + def max_pool_size + @max_pool_size ||= Integer(ENV.fetch('MYSQL_MAX_POOL_SIZE', 5)) + end + + def pool_timeout + @pool_timeout ||= Integer(ENV.fetch('MYSQL_POOL_TIMEOUT', 5)) + end + + def idle_timeout + @idle_timeout ||= Integer(ENV.fetch('MYSQL_POOL_IDLE_TIMEOUT', 300)) + end + + def idle_reap_loop_time + @idle_reap_loop_time ||= Integer(ENV.fetch('MYSQL_POOL_IDLE_REAP_TIME', 60)) + end + end +end diff --git a/lib/mysql_framework/scripts/manager.rb b/lib/mysql_framework/scripts/manager.rb index e538555..7429722 100644 --- a/lib/mysql_framework/scripts/manager.rb +++ b/lib/mysql_framework/scripts/manager.rb @@ -27,7 +27,7 @@ def execute end def apply_by_tag(tags) - lock_manager.with_lock(key: self.class) do + lock_manager.with_lock(key: self.class.name) do initialize_script_history mysql_connector.transaction do |client| diff --git a/lib/mysql_framework/stats/aws_metric_publisher.rb b/lib/mysql_framework/stats/aws_metric_publisher.rb new file mode 100644 index 0000000..19cdbe5 --- /dev/null +++ b/lib/mysql_framework/stats/aws_metric_publisher.rb @@ -0,0 +1,123 @@ +# frozen_string_literal: true + +require 'aws-sdk-cloudwatch' +require_relative 'dimension_map' + +module MysqlFramework + module Stats + class AwsMetricPublisher + THREAD_NAME = 'mysql-connector-pool-stats' + JOIN_TIMEOUT = 5 # seconds to wait for clean thread exit before force-killing + METRIC_UNIT = 'Count' + METRIC_NAME_MAP = { + size: 'MysqlConnectionPoolSize', + available: 'MysqlConnectionPoolAvailable', + idle: 'MysqlConnectionPoolIdle' + }.freeze + + # Initializes AWS metric publishing dependencies. + # + # @param connector [MysqlFramework::Connector, nil] connector used to read connection-pool stats + # @param dimension_map [MysqlFramework::Stats::DimensionMap, nil] CloudWatch namespace and dimensions + # @param cloudwatch_client [Aws::CloudWatch::Client, nil] CloudWatch client instance + # @param publish_interval [Integer] metric publish interval in seconds + # @return [void] + def initialize( + connector: nil, + dimension_map: nil, + cloudwatch_client: nil, + publish_interval: 300 + ) + @thread = nil + @connector = connector + @cloudwatch_client = cloudwatch_client + @dimension_map = dimension_map || MysqlFramework::Stats::DimensionMap.new + @publish_interval = publish_interval + end + + # Spawns the background sampling thread. Safe to call more than once – + # subsequent calls are no-ops while the thread is already running. + # + # @return [Thread, nil] reporter thread when started, or nil when already running + def start + return if running? + + thread = Thread.new do + Thread.current.name = THREAD_NAME + loop do + sleep @publish_interval + break unless Thread.current == @thread + + sample + end + end + + thread.abort_on_exception = false + @thread = thread + end + + # Cooperatively stops the background thread and waits up to JOIN_TIMEOUT + # seconds for it to exit before force-killing it. + # + # @return [void] + def stop + thread = @thread + @thread = nil # cooperative stop signal: loop checks this after each sleep + thread&.join(JOIN_TIMEOUT) + thread&.kill # force-kill only if still alive after timeout + end + + # Returns true when the reporter thread is alive. + # + # @return [Boolean] + def running? + @thread&.alive? || false + end + + private + + # Reads pool stats and publishes them to CloudWatch using a low-cardinality + # dimension set so all ECS tasks for the same service aggregate together. + # Errors are swallowed and logged so that a reporting failure never + # propagates to the caller. + def sample + connection_pool = @connector&.connection_pool + return if connection_pool.nil? + + stats = connection_pool.pool_stats + metric_data = build_metric_data(stats) + return if metric_data.empty? + + MysqlFramework.logger.debug { "[#{self.class}] - CloudWatch/#{@dimension_map.namespace} - #{stats.inspect}" } + + cloudwatch_client.put_metric_data( + namespace: @dimension_map.namespace, + metric_data: metric_data + ) + rescue StandardError => e + MysqlFramework.logger.error { "[#{self.class}] - Failed to record pool stats: #{e.message}" } + end + + def build_metric_data(stats) + timestamp = Time.now.utc + + METRIC_NAME_MAP.filter_map do |key, metric_name| + value = stats[key] + next if value.nil? + + { + metric_name: metric_name, + dimensions: @dimension_map.to_cloudwatch_dimensions, + timestamp: timestamp, + unit: METRIC_UNIT, + value: value.to_f + } + end + end + + def cloudwatch_client + @cloudwatch_client ||= Aws::CloudWatch::Client.new + end + end + end +end diff --git a/lib/mysql_framework/stats/dimension_map.rb b/lib/mysql_framework/stats/dimension_map.rb new file mode 100644 index 0000000..c647a69 --- /dev/null +++ b/lib/mysql_framework/stats/dimension_map.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +module MysqlFramework + module Stats + # Class to handle dimensions for AWS reporting + class DimensionMap + attr_accessor :service_name, :application, :environment, :landscape, :namespace + + # Initializes dimension values used for CloudWatch metrics. + # + # @param service_name [String, nil] service dimension + # @param application [String, nil] application dimension + # @param environment [String, nil] environment dimension + # @param landscape [String, nil] landscape dimension + # @param namespace [String, nil] CloudWatch namespace override + # @return [void] + def initialize( + service_name: nil, + application: nil, + environment: nil, + landscape: nil, + namespace: nil + ) + @service_name = service_name + @application = application + @environment = environment + @landscape = landscape + @namespace = namespace + end + + # Builds CloudWatch dimensions from configured values or environment variables. + # + # @return [Array String}>] dimensions with non-nil values only + def to_cloudwatch_dimensions + [ + { name: 'ServiceName', value: service_name || ENV.fetch('SERVICE_NAME', nil) }, + { name: 'Application', value: application || ENV.fetch('APPLICATION', nil) }, + { name: 'Environment', value: environment || ENV.fetch('ENVIRONMENT', nil) }, + { name: 'Landscape', value: landscape || ENV.fetch('LANDSCAPE', nil) } + ].reject { |dimension| dimension[:value].nil? } + end + + # Returns the CloudWatch namespace. + # + # @return [String] configured namespace or default namespace value + def namespace + @namespace || ENV.fetch('AWS_METRICS_NAMESPACE', 'MysqlFramework') + end + end + end +end diff --git a/lib/mysql_framework/version.rb b/lib/mysql_framework/version.rb index 6023ab3..9b89b62 100644 --- a/lib/mysql_framework/version.rb +++ b/lib/mysql_framework/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module MysqlFramework - VERSION = '2.1.9' + VERSION = '2.3.0' end diff --git a/mysql_framework.gemspec b/mysql_framework.gemspec index c7ddc43..35cb16b 100644 --- a/mysql_framework.gemspec +++ b/mysql_framework.gemspec @@ -20,10 +20,12 @@ Gem::Specification.new do |spec| spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.require_paths = ['lib'] - spec.add_development_dependency 'rake', '~> 10.0' + spec.add_development_dependency 'rake' spec.add_development_dependency 'rspec' spec.add_development_dependency 'pry' - spec.add_dependency 'mysql2', '~> 0.4' + spec.add_dependency 'aws-sdk-cloudwatch' + spec.add_dependency 'connection_pool' + spec.add_dependency 'mysql2' spec.add_dependency 'redlock' end diff --git a/spec/lib/mysql_framework/connector_spec.rb b/spec/lib/mysql_framework/connector_spec.rb index 53364d0..15752e5 100644 --- a/spec/lib/mysql_framework/connector_spec.rb +++ b/spec/lib/mysql_framework/connector_spec.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true describe MysqlFramework::Connector do - let(:start_pool_size) { Integer(ENV.fetch('MYSQL_START_POOL_SIZE')) } let(:max_pool_size) { Integer(ENV.fetch('MYSQL_MAX_POOL_SIZE')) } let(:default_options) do { @@ -11,8 +10,8 @@ username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD'), reconnect: true, - read_timeout: ENV.fetch('MYSQL_READ_TIMEOUT', 30), - write_timeout: ENV.fetch('MYSQL_WRITE_TIMEOUT', 10) + read_timeout: Integer(ENV.fetch('MYSQL_READ_TIMEOUT', 30)), + write_timeout: Integer(ENV.fetch('MYSQL_WRITE_TIMEOUT', 10)) } end let(:options) do @@ -25,29 +24,21 @@ reconnect: false } end - let(:client) { double(close: true, ping: true, closed?: false) } + let(:client) { double(close: true, ping: true, closed?: false, abandon_results!: nil) } let(:gems) { MysqlFramework::SqlTable.new('gems') } let(:existing_client) { Mysql2::Client.new(default_options) } let(:connection_pooling_enabled) { 'true' } subject { described_class.new } - before(:each) do - original_fetch = ENV.method(:fetch) - - allow(ENV).to receive(:fetch) do |var, default| - if var == 'MYSQL_CONNECTION_POOL_ENABLED' - connection_pooling_enabled - else - original_fetch.call(var, default) - end - end + before do + allow(ENV).to receive(:fetch).and_call_original + allow(ENV).to receive(:fetch).with('MYSQL_CONNECTION_POOL_ENABLED', 'true') + .and_return(connection_pooling_enabled) subject.setup end - after(:each) { subject.dispose } - describe '#initialize' do context 'when options are not provided' do it 'returns the default options' do @@ -66,8 +57,8 @@ username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD'), reconnect: false, - read_timeout: ENV.fetch('MYSQL_READ_TIMEOUT', 30), - write_timeout: ENV.fetch('MYSQL_WRITE_TIMEOUT', 10) + read_timeout: Integer(ENV.fetch('MYSQL_READ_TIMEOUT', 30)), + write_timeout: Integer(ENV.fetch('MYSQL_WRITE_TIMEOUT', 10)) } expect(subject.instance_variable_get(:@options)).to eq(expected) @@ -84,10 +75,13 @@ describe '#setup' do context 'when connection pooling is enabled' do - it 'creates a connection pool with the specified number of conections' do + it 'creates a connection pool with expected stats' do subject.setup - expect(subject.connections.length).to eq(start_pool_size) + expect(subject.connection_pool).to be_a(MysqlFramework::MysqlConnectionPool) + expect(subject.connection_pool.pool_stats[:size]).to eq(max_pool_size) + expect(subject.connection_pool.pool_stats[:available]).to be >= 0 + expect(subject.connection_pool.pool_stats[:idle]).to be >= 0 end end @@ -97,182 +91,147 @@ it "doesn't create a connection pool" do subject.setup - expect(subject.connections).to be_nil + expect(subject.connection_pool).to be_nil end end end describe '#dispose' do - before do - subject.connections.clear - subject.connections.push(client) - end - - it 'closes the idle connections and disposes of the queue' do - expect(client).to receive(:close) + context 'when connection pooling is enabled' do + it 'disposes of the pool and clears connector reference' do + pool = instance_double(MysqlFramework::MysqlConnectionPool) + allow(pool).to receive(:dispose) + subject.instance_variable_set(:@connection_pool, pool) - subject.dispose + expect(pool).to receive(:dispose) + subject.dispose - expect(subject.connections).to be_nil + expect(subject.connection_pool).to be_nil + end end - end - describe '#check_out' do - it 'calls synchronize on the mutex' do - expect(subject.instance_variable_get(:@mutex)).to receive(:synchronize) + context 'when connection pooling is disabled' do + let(:connection_pooling_enabled) { 'false' } - subject.check_out + it 'does not perform more actions' do + expect { subject.dispose }.not_to raise_error + expect(subject.connection_pool).to be_nil + end end + end + describe '#check_out' do context 'when connection pooling is enabled' do - context 'when there are available connections' do - before do - subject.connections.clear - subject.connections.push(client) - end - - it 'returns a client instance from the pool' do - expect(subject.check_out).to eq(client) - end - - context 'and :reconnect is set to true' do - let(:options) do - { - host: ENV.fetch('MYSQL_HOST'), - port: ENV.fetch('MYSQL_PORT'), - database: "#{ENV.fetch('MYSQL_DATABASE')}_2", - username: ENV.fetch('MYSQL_USERNAME'), - password: ENV.fetch('MYSQL_PASSWORD'), - reconnect: true - } - end - - subject { described_class.new(options) } - - it 'pings the server to force a reconnect' do - expect(client).to receive(:ping) - - subject.check_out - end - end - - context 'and :reconnect is set to false' do - subject { described_class.new(options) } - - it 'pings the server to force a reconnect' do - expect(client).not_to receive(:ping) + it 'checks out pooled connection' do + pool = instance_double(MysqlFramework::MysqlConnectionPool) + client = instance_double(Mysql2::Client) + allow(pool).to receive(:check_out).and_return(client) + subject.instance_variable_set(:@connection_pool, pool) - subject.check_out - end - end + expect(pool).to receive(:check_out) + expect(subject.check_out).to eq(client) end + end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } + context 'when pooling is disabled' do + let(:connection_pooling_enabled) { 'false' } - it 'instantiates and returns a new connection directly' do - expect(subject.connections).not_to receive(:pop) - expect(Mysql2::Client).to receive(:new) + it 'returns a new client directly' do + new_client = instance_double(Mysql2::Client) + allow(Mysql2::Client).to receive(:new).and_return(new_client) - subject.check_out - end + expect(subject.check_out).to eq(new_client) end end + end - context "when there are no available connections, and the pool's max size has not been reached" do - before do - subject.connections.clear - subject.connections.push(client) - end - - it 'instantiates a new connection and returns it' do - subject.check_out + describe '#check_in' do + context 'when connection pooling is enabled' do + it 'checks in a pooled connection' do + pool = instance_double(MysqlFramework::MysqlConnectionPool) + allow(pool).to receive(:check_in) + subject.instance_variable_set(:@connection_pool, pool) - expect(Mysql2::Client).to receive(:new).with(default_options).and_return(client) - expect(subject.check_out).to eq(client) + expect(pool).to receive(:check_in) + subject.check_in(client) end end - context "when there are no available connections, and the pool's max size has been reached" do - before do - subject.connections.clear - subject.instance_variable_set(:@created_connections, 5) + context 'when pooling is disabled' do + let(:connection_pooling_enabled) { 'false' } - 5.times { subject.check_in(client) } - 5.times { subject.check_out } - end + it 'closes the provided client' do + new_client = instance_double(Mysql2::Client, close: nil) - it 'throws a RuntimeError' do - expect { subject.check_out }.to raise_error(RuntimeError) + expect(new_client).to receive(:close) + subject.check_in(new_client) end end end - describe '#check_in' do - it 'calls synchronize on the mutex' do - expect(subject.instance_variable_get(:@mutex)).to receive(:synchronize) + describe '#with_client' do + context 'when a provided_client is given' do + it 'yields the provided client directly' do + expect { |b| subject.with_client(client, &b) }.to yield_with_args(client) + end - subject.check_out - end + it 'does not interact with the connection pool' do + expect(subject).not_to receive(:with_new_client) + expect(subject.connection_pool).not_to receive(:with_client) if subject.connection_pool - context 'when connection pooling is enabled' do - it 'returns the provided client to the connection pool' do - expect(subject.connections).to receive(:push).with(client) + subject.with_client(client) { |_c| nil } + end - subject.check_in(client) + it 'returns the block result' do + result = subject.with_client(client) { |_c| :expected } + expect(result).to eq(:expected) end + end - context 'when the connection has been closed by the server' do - let(:closed_client) { double(close: true, closed?: true) } + context 'when no provided_client is given' do + context 'when connection pooling is disabled' do + let(:connection_pooling_enabled) { 'false' } - it 'instantiates a new connection and returns it' do - expect(Mysql2::Client).to receive(:new).with(default_options).and_return(client) - expect(subject.connections).to receive(:push).with(client) + it 'delegates to with_new_client' do + expect(subject).to receive(:with_new_client).and_yield(client) - subject.check_in(closed_client) + expect { |b| subject.with_client(&b) }.to yield_with_args(client) end - end - end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } + it 'returns the block result' do + allow(subject).to receive(:with_new_client).and_yield(client) - it 'closes the connection and does not add it to the connection pool' do - expect(client).to receive(:close) - expect(subject.connections).not_to receive(:push) - - subject.check_in(client) + result = subject.with_client { |_c| :expected } + expect(result).to eq(:expected) + end end - end - - context 'when client is nil' do - let(:client) { nil } context 'when connection pooling is enabled' do - it 'does not raise an error' do - expect { subject.check_in(client) }.not_to raise_error + it 'delegates to the connection pool' do + pool = instance_double(MysqlFramework::MysqlConnectionPool) + subject.instance_variable_set(:@connection_pool, pool) + + expect(pool).to receive(:with_client).with(discard_current_pool_connection: false).and_yield(client) + expect { |b| subject.with_client(&b) }.to yield_with_args(client) end - end - context 'when connection pooling is disabled' do - let(:connection_pooling_enabled) { 'false' } + it 'passes discard_current_pool_connection: true to the pool when requested' do + pool = instance_double(MysqlFramework::MysqlConnectionPool) + subject.instance_variable_set(:@connection_pool, pool) - it 'does not raise an error' do - expect { subject.check_in(client) }.not_to raise_error + expect(pool).to receive(:with_client).with(discard_current_pool_connection: true).and_yield(client) + subject.with_client(discard_current_pool_connection: true) { |_c| nil } end - end - end - end - describe '#with_client' do - it 'uses the client that is provided, if passed one' do - expect(subject).not_to receive(:check_out) - expect { |b| subject.with_client(client, &b) }.to yield_with_args(client) - end + it 'returns the block result' do + pool = instance_double(MysqlFramework::MysqlConnectionPool) + subject.instance_variable_set(:@connection_pool, pool) + allow(pool).to receive(:with_client).and_yield(client) - it 'obtains a client from the pool to use, if no client is provided' do - allow(subject).to receive(:check_out).and_return(client) - expect { |b| subject.with_client(&b) }.to yield_with_args(client) + result = subject.with_client { |_c| :expected } + expect(result).to eq(:expected) + end + end end end @@ -317,6 +276,7 @@ allow(mock_statement).to receive(:execute).and_return(mock_result) allow(mock_client).to receive(:prepare).and_return(mock_statement) + allow(mock_client).to receive(:abandon_results!) end it 'frees the result' do @@ -361,7 +321,7 @@ end describe '#query' do - before(:each) { allow(subject).to receive(:check_out).and_return(client) } + before(:each) { allow(subject).to receive(:with_client).and_yield(client) } it 'retrieves a client and calls query' do expect(client).to receive(:query).with('SELECT 1') @@ -370,7 +330,7 @@ end it 'does not check out a new client when one is provided' do - expect(subject).not_to receive(:check_out) + expect(subject).to receive(:with_client).with(existing_client).and_yield(existing_client) expect(existing_client).to receive(:query).with('SELECT 1') subject.query('SELECT 1', existing_client) @@ -378,6 +338,19 @@ end describe '#query_multiple_results' do + it 'uses with_client with discard_current_pool_connection enabled' do + query_call = instance_double(Mysql2::Result, to_a: [], free: true) + allow(client).to receive(:query).and_return(query_call) + allow(client).to receive(:more_results?).and_return(false) + allow(client).to receive(:abandon_results!) + + expect(subject).to receive(:with_client) + .with(nil, discard_current_pool_connection: true) + .and_yield(client) + + subject.query_multiple_results('call test_procedure') + end + it 'returns the results from the stored procedure' do query = 'call test_procedure' result = subject.query_multiple_results(query) @@ -402,7 +375,7 @@ end describe '#transaction' do - before(:each) { allow(subject).to receive(:check_out).and_return(client) } + before(:each) { allow(subject).to receive(:with_client).and_yield(client) } it 'wraps the client call with BEGIN and COMMIT statements' do expect(client).to receive(:query).with('BEGIN') @@ -425,23 +398,4 @@ end end end - - describe 'when connection pool is exhausted' do - before do - max_pool_size.times { subject.check_out } - end - - it 'pop throws exception' do - expect { subject.connections.pop(true) }.to raise_error(ThreadError) - end - - it 'throws exception on query' do - expect { subject.query('SELECT 1') }.to raise_error(RuntimeError, /depleted/) - end - - it 'does not put nil in the pool on error' do - expect(subject).to_not receive(:check_in).with(nil) - expect { subject.query('SELECT 1') }.to raise_error(RuntimeError, /depleted/) - end - end end diff --git a/spec/lib/mysql_framework/mysql_connection_pool_spec.rb b/spec/lib/mysql_framework/mysql_connection_pool_spec.rb new file mode 100644 index 0000000..d6b5110 --- /dev/null +++ b/spec/lib/mysql_framework/mysql_connection_pool_spec.rb @@ -0,0 +1,251 @@ +# frozen_string_literal: true + +describe MysqlFramework::MysqlConnectionPool do + let(:options) do + { + host: ENV.fetch('MYSQL_HOST'), + port: ENV.fetch('MYSQL_PORT'), + database: ENV.fetch('MYSQL_DATABASE'), + username: ENV.fetch('MYSQL_USERNAME'), + password: ENV.fetch('MYSQL_PASSWORD'), + reconnect: true + } + end + let(:conn) do + double('Mysql2::Client', + ping: true, + abandon_results!: nil, + query: nil, + close: nil, + closed?: false) + end + let(:pool) do + double('ConnectionPool', + checkout: conn, + checkin: nil, + discard_current_connection: nil, + shutdown: nil, + size: 5, + available: 4, + idle: 1) + end + + subject { described_class.new(options) } + + after { subject.dispose } + + describe '#initialize' do + it 'stores the provided options' do + expect(subject.instance_variable_get(:@options)).to eq(options) + end + + it 'creates a setup mutex' do + expect(subject.instance_variable_get(:@setup_mutex)).to be_a(Mutex) + end + end + + describe '#setup' do + it 'creates a ConnectionPool' do + subject.setup + expect(subject.connections).to be_a(ConnectionPool) + end + + it 'is idempotent — calling setup twice keeps the same pool' do + subject.setup + first_pool = subject.connections + subject.setup + expect(subject.connections).to equal(first_pool) + end + + it 'starts the idle connection cleaner thread' do + subject.setup + threads = Thread.list.map(&:name) + expect(threads).to include(MysqlFramework::MysqlConnectionPool::CLEAN_IDLE_CONNECTIONS_THREAD_NAME) + end + end + + describe '#dispose' do + before { subject.setup } + + it 'shuts down the connection pool' do + subject.dispose + expect(subject.connections).to be_nil + end + + it 'is safe to call when already disposed' do + subject.dispose + expect { subject.dispose }.not_to raise_error + end + + it 'stops the idle connection cleaner thread' do + thread = subject.instance_variable_get(:@idle_connections_thread) + subject.dispose + expect(thread).not_to be_alive + end + end + + describe '#pool_stats' do + context 'when connections have not been set up' do + it 'returns zero stats' do + expect(subject.pool_stats).to eq(size: 0, available: 0, idle: 0) + end + end + + context 'when connections are set up' do + before { subject.instance_variable_set(:@connections, pool) } + + it 'returns size, available, and idle metrics from the pool' do + expect(subject.pool_stats).to eq(size: 5, available: 4, idle: 1) + end + end + end + + describe '#check_out' do + before do + subject.instance_variable_set(:@connections, pool) + allow(conn).to receive(:query).with('ROLLBACK') + end + + it 'returns a sanitized connection from the pool' do + expect(subject.check_out).to eq(conn) + end + + it 'sanitizes the connection before returning it' do + expect(conn).to receive(:ping) + expect(conn).to receive(:abandon_results!) + expect(conn).to receive(:query).with('ROLLBACK') + subject.check_out + end + + context 'when sanitization raises ConnectionSanitizationError' do + before { allow(conn).to receive(:ping).and_raise(Mysql2::Error.new('gone away')) } + + it 'retries the checkout once before raising' do + expect(pool).to receive(:checkout).twice.and_return(conn) + expect { subject.check_out }.to raise_error(MysqlFramework::MysqlConnectionPool::ConnectionSanitizationError) + end + + it 'discards the connection on each sanitization failure' do + expect(pool).to receive(:discard_current_connection).at_least(:twice) + expect { subject.check_out }.to raise_error(MysqlFramework::MysqlConnectionPool::ConnectionSanitizationError) + end + end + + context 'when checkout raises Mysql2::Error' do + before { allow(pool).to receive(:checkout).and_raise(Mysql2::Error.new('connection refused')) } + + it 'discards the current connection' do + expect(pool).to receive(:discard_current_connection) + expect { subject.check_out }.to raise_error(Mysql2::Error) + end + + it 're-raises the error' do + allow(pool).to receive(:discard_current_connection) + expect { subject.check_out }.to raise_error(Mysql2::Error) + end + end + end + + describe '#check_in' do + before { subject.instance_variable_set(:@connections, pool) } + + it 'returns immediately and does not interact with the pool when client is nil' do + expect(pool).not_to receive(:checkin) + expect(pool).not_to receive(:discard_current_connection) + subject.check_in(nil) + end + + context 'when the client is closed' do + let(:closed_conn) { double('Mysql2::Client', closed?: true) } + + it 'discards the current pool connection' do + expect(pool).to receive(:discard_current_connection) + allow(pool).to receive(:checkin) + subject.check_in(closed_conn) + end + end + + context 'when the client is open' do + it 'checks the connection back into the pool' do + expect(pool).to receive(:checkin) + subject.check_in(conn) + end + + it 'does not discard the connection' do + expect(pool).not_to receive(:discard_current_connection) + subject.check_in(conn) + end + end + end + + describe '#with_client' do + before do + subject.instance_variable_set(:@connections, pool) + allow(conn).to receive(:query).with('ROLLBACK') + allow(pool).to receive(:with).and_yield(conn) + end + + it 'yields a sanitized connection' do + expect { |b| subject.with_client(&b) }.to yield_with_args(conn) + end + + it 'returns the block result' do + result = subject.with_client { |_c| :expected } + expect(result).to eq(:expected) + end + + it 'sanitizes the connection before yielding' do + expect(conn).to receive(:ping) + expect(conn).to receive(:abandon_results!) + expect(conn).to receive(:query).with('ROLLBACK') + subject.with_client { |_c| nil } + end + + context 'when discard_current_pool_connection is false (default)' do + it 'does not discard the connection after a successful block' do + expect(pool).not_to receive(:discard_current_connection) + subject.with_client { |_c| nil } + end + end + + context 'when discard_current_pool_connection is true' do + it 'discards the current connection after the block completes' do + expect(pool).to receive(:discard_current_connection) + subject.with_client(discard_current_pool_connection: true) { |_c| nil } + end + end + + context 'when sanitization raises ConnectionSanitizationError' do + before { allow(conn).to receive(:ping).and_raise(Mysql2::Error.new('gone away')) } + + it 'retries the pool checkout once before raising' do + expect(pool).to receive(:with).twice.and_yield(conn) + expect { subject.with_client { |_c| nil } }.to raise_error(MysqlFramework::MysqlConnectionPool::ConnectionSanitizationError) + end + + it 'discards the connection on each sanitization failure' do + allow(pool).to receive(:with).and_yield(conn) + expect(pool).to receive(:discard_current_connection).at_least(:once) + expect { subject.with_client { |_c| nil } }.to raise_error(MysqlFramework::MysqlConnectionPool::ConnectionSanitizationError) + end + end + + context 'when the block raises Mysql2::Error' do + it 'discards the current connection' do + expect(pool).to receive(:discard_current_connection) + expect { subject.with_client { |_c| raise Mysql2::Error.new('lost connection') } }.to raise_error(Mysql2::Error) + end + + it 're-raises the error' do + allow(pool).to receive(:discard_current_connection) + expect { subject.with_client { |_c| raise Mysql2::Error.new('lost connection') } }.to raise_error(Mysql2::Error) + end + + it 'does not retry for Mysql2::Error raised in the block' do + expect(pool).to receive(:with).once.and_yield(conn) + allow(pool).to receive(:discard_current_connection) + expect { subject.with_client { |_c| raise Mysql2::Error.new('lost connection') } }.to raise_error(Mysql2::Error) + end + end + end +end diff --git a/spec/lib/mysql_framework/stats/aws_metric_publisher_spec.rb b/spec/lib/mysql_framework/stats/aws_metric_publisher_spec.rb new file mode 100644 index 0000000..5f8a150 --- /dev/null +++ b/spec/lib/mysql_framework/stats/aws_metric_publisher_spec.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'mysql_framework/stats/aws_metric_publisher' + +describe MysqlFramework::Stats::AwsMetricPublisher do + let(:connection_pool) { instance_double(MysqlFramework::MysqlConnectionPool, pool_stats: stats) } + let(:connector) { instance_double(MysqlFramework::Connector, connection_pool: connection_pool) } + let(:stats) { { size: 5, available: 3, idle: 2 } } + let(:dimensions) do + [ + { name: 'ServiceName', value: 'mysql-framework' }, + { name: 'Environment', value: 'test' } + ] + end + let(:dimension_map) do + instance_double( + MysqlFramework::Stats::DimensionMap, + namespace: 'MysqlFramework', + to_cloudwatch_dimensions: dimensions + ) + end + let(:cloudwatch_client) { instance_double(Aws::CloudWatch::Client) } + let(:logger) { instance_double(Logger, debug: nil, error: nil) } + + subject(:reporter) do + described_class.new( + connector: connector, + dimension_map: dimension_map, + cloudwatch_client: cloudwatch_client, + publish_interval: 1 + ) + end + + before do + allow(MysqlFramework).to receive(:logger).and_return(logger) + allow(ENV).to receive(:fetch).and_call_original + end + + describe '#sample' do + it 'publishes connector pool stats to CloudWatch' do + expect(cloudwatch_client).to receive(:put_metric_data) do |payload| + expect(payload[:namespace]).to eq('MysqlFramework') + expect(payload[:metric_data].size).to eq(3) + + size_metric = payload[:metric_data].find { |metric| metric[:metric_name] == 'MysqlConnectionPoolSize' } + available_metric = payload[:metric_data].find { |metric| metric[:metric_name] == 'MysqlConnectionPoolAvailable' } + idle_metric = payload[:metric_data].find { |metric| metric[:metric_name] == 'MysqlConnectionPoolIdle' } + + expect(size_metric[:value]).to eq(5.0) + expect(available_metric[:value]).to eq(3.0) + expect(idle_metric[:value]).to eq(2.0) + expect(size_metric[:dimensions]).to eq(dimensions) + expect(size_metric[:unit]).to eq('Count') + end + + reporter.send(:sample) + end + + it 'does not publish when connector is nil' do + subject = described_class.new( + connector: nil, + dimension_map: dimension_map, + cloudwatch_client: cloudwatch_client + ) + + expect(cloudwatch_client).not_to receive(:put_metric_data) + + subject.send(:sample) + end + + it 'logs an error when cloudwatch publish raises' do + allow(cloudwatch_client).to receive(:put_metric_data).and_raise(StandardError, 'aws unavailable') + expect(logger).to receive(:error) + + reporter.send(:sample) + end + end + + describe '#build_metric_data' do + it 'skips nil values in stats map' do + data = reporter.send(:build_metric_data, size: 1, available: nil, idle: 0) + + expect(data.size).to eq(2) + names = data.map { |metric| metric[:metric_name] } + expect(names).to contain_exactly('MysqlConnectionPoolSize', 'MysqlConnectionPoolIdle') + end + end +end From 9f337c555f7b98afc24a704b460f105bebbdb3d7 Mon Sep 17 00:00:00 2001 From: Pawel Klapec Date: Mon, 4 May 2026 08:34:49 +0000 Subject: [PATCH 2/5] Update dependencies and improve connection pooling implementation --- .github/workflows/rspec.yml | 4 ++-- Gemfile | 1 + docker-compose.yml | 2 +- lib/mysql_framework/connector.rb | 1 - lib/mysql_framework/mysql_connection_pool.rb | 8 ++++++-- .../stats/aws_metric_publisher.rb | 3 ++- spec/lib/mysql_framework/connector_spec.rb | 18 +++++++++++------- .../mysql_connection_pool_spec.rb | 7 +------ spec/support/fixtures.rb | 2 +- 9 files changed, 25 insertions(+), 21 deletions(-) diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index dc7e29b..1f960bd 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -19,10 +19,10 @@ jobs: - 6379:6379 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v6 - uses: ruby/setup-ruby@v1 with: - ruby-version: 3.4 + ruby-version: 4.0 bundler-cache: true - name: Run tests diff --git a/Gemfile b/Gemfile index 8ea20fa..c045613 100644 --- a/Gemfile +++ b/Gemfile @@ -8,4 +8,5 @@ end group :test do gem 'simplecov', require: false + gem 'debug', require: false end diff --git a/docker-compose.yml b/docker-compose.yml index 25fcbd2..1b64fd3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: test-runner: - image: ruby:3.4 + image: ruby:4.0 working_dir: /usr/src/app container_name: test-runner command: sh -c "while true; do echo 'Container is running..'; sleep 5; done" diff --git a/lib/mysql_framework/connector.rb b/lib/mysql_framework/connector.rb index f0a5bce..bcd898f 100644 --- a/lib/mysql_framework/connector.rb +++ b/lib/mysql_framework/connector.rb @@ -1,6 +1,5 @@ # frozen_string_literal: true -require 'connection_pool' require_relative 'mysql_connection_pool' module MysqlFramework diff --git a/lib/mysql_framework/mysql_connection_pool.rb b/lib/mysql_framework/mysql_connection_pool.rb index eb897a2..2529b85 100644 --- a/lib/mysql_framework/mysql_connection_pool.rb +++ b/lib/mysql_framework/mysql_connection_pool.rb @@ -122,8 +122,9 @@ def with_client(discard_current_pool_connection: false) private def start_clean_idle_connections_thread + thread_name = "#{CLEAN_IDLE_CONNECTIONS_THREAD_NAME}-#{object_id}" @idle_connections_thread = Thread.new do - Thread.current.name = CLEAN_IDLE_CONNECTIONS_THREAD_NAME + Thread.current.name = thread_name loop do sleep idle_reap_loop_time break unless Thread.current == @idle_connections_thread @@ -132,7 +133,10 @@ def start_clean_idle_connections_thread end end - @idle_connections_thread.abort_on_exception = false + # require 'debug' + # debugger + + @idle_connections_thread.abort_on_exception # = false @idle_connections_thread end diff --git a/lib/mysql_framework/stats/aws_metric_publisher.rb b/lib/mysql_framework/stats/aws_metric_publisher.rb index 19cdbe5..9566018 100644 --- a/lib/mysql_framework/stats/aws_metric_publisher.rb +++ b/lib/mysql_framework/stats/aws_metric_publisher.rb @@ -42,8 +42,9 @@ def initialize( def start return if running? + thread_name = "#{THREAD_NAME}-#{object_id}" thread = Thread.new do - Thread.current.name = THREAD_NAME + Thread.current.name = thread_name loop do sleep @publish_interval break unless Thread.current == @thread diff --git a/spec/lib/mysql_framework/connector_spec.rb b/spec/lib/mysql_framework/connector_spec.rb index 15752e5..9cacd66 100644 --- a/spec/lib/mysql_framework/connector_spec.rb +++ b/spec/lib/mysql_framework/connector_spec.rb @@ -33,12 +33,16 @@ before do allow(ENV).to receive(:fetch).and_call_original - allow(ENV).to receive(:fetch).with('MYSQL_CONNECTION_POOL_ENABLED', 'true') + allow(ENV).to receive(:fetch).with('MYSQL_CONNECTION_POOL_ENABLED', 'false') .and_return(connection_pooling_enabled) subject.setup end + after do + subject.dispose + end + describe '#initialize' do context 'when options are not provided' do it 'returns the default options' do @@ -99,7 +103,7 @@ describe '#dispose' do context 'when connection pooling is enabled' do it 'disposes of the pool and clears connector reference' do - pool = instance_double(MysqlFramework::MysqlConnectionPool) + pool = instance_double(MysqlFramework::MysqlConnectionPool, dispose: true) allow(pool).to receive(:dispose) subject.instance_variable_set(:@connection_pool, pool) @@ -123,7 +127,7 @@ describe '#check_out' do context 'when connection pooling is enabled' do it 'checks out pooled connection' do - pool = instance_double(MysqlFramework::MysqlConnectionPool) + pool = instance_double(MysqlFramework::MysqlConnectionPool, dispose: true) client = instance_double(Mysql2::Client) allow(pool).to receive(:check_out).and_return(client) subject.instance_variable_set(:@connection_pool, pool) @@ -148,7 +152,7 @@ describe '#check_in' do context 'when connection pooling is enabled' do it 'checks in a pooled connection' do - pool = instance_double(MysqlFramework::MysqlConnectionPool) + pool = instance_double(MysqlFramework::MysqlConnectionPool, dispose: true) allow(pool).to receive(:check_in) subject.instance_variable_set(:@connection_pool, pool) @@ -208,7 +212,7 @@ context 'when connection pooling is enabled' do it 'delegates to the connection pool' do - pool = instance_double(MysqlFramework::MysqlConnectionPool) + pool = instance_double(MysqlFramework::MysqlConnectionPool, dispose: true) subject.instance_variable_set(:@connection_pool, pool) expect(pool).to receive(:with_client).with(discard_current_pool_connection: false).and_yield(client) @@ -216,7 +220,7 @@ end it 'passes discard_current_pool_connection: true to the pool when requested' do - pool = instance_double(MysqlFramework::MysqlConnectionPool) + pool = instance_double(MysqlFramework::MysqlConnectionPool, dispose: true) subject.instance_variable_set(:@connection_pool, pool) expect(pool).to receive(:with_client).with(discard_current_pool_connection: true).and_yield(client) @@ -224,7 +228,7 @@ end it 'returns the block result' do - pool = instance_double(MysqlFramework::MysqlConnectionPool) + pool = instance_double(MysqlFramework::MysqlConnectionPool, dispose: true) subject.instance_variable_set(:@connection_pool, pool) allow(pool).to receive(:with_client).and_yield(client) diff --git a/spec/lib/mysql_framework/mysql_connection_pool_spec.rb b/spec/lib/mysql_framework/mysql_connection_pool_spec.rb index d6b5110..0545318 100644 --- a/spec/lib/mysql_framework/mysql_connection_pool_spec.rb +++ b/spec/lib/mysql_framework/mysql_connection_pool_spec.rb @@ -56,12 +56,6 @@ subject.setup expect(subject.connections).to equal(first_pool) end - - it 'starts the idle connection cleaner thread' do - subject.setup - threads = Thread.list.map(&:name) - expect(threads).to include(MysqlFramework::MysqlConnectionPool::CLEAN_IDLE_CONNECTIONS_THREAD_NAME) - end end describe '#dispose' do @@ -80,6 +74,7 @@ it 'stops the idle connection cleaner thread' do thread = subject.instance_variable_get(:@idle_connections_thread) subject.dispose + sleep(6) expect(thread).not_to be_alive end end diff --git a/spec/support/fixtures.rb b/spec/support/fixtures.rb index 51ba8b7..edbf41f 100644 --- a/spec/support/fixtures.rb +++ b/spec/support/fixtures.rb @@ -7,7 +7,7 @@ def self.execute connector = MysqlFramework::Connector.new( host: ENV.fetch('MYSQL_HOST'), port: ENV.fetch('MYSQL_PORT'), - database: nil, + database: ENV.fetch('MYSQL_DATABASE'), username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD') ) From 2ea722e4870a7a0c0596e5f0ff7bf3f8bcd81960 Mon Sep 17 00:00:00 2001 From: Pawel Klapec Date: Mon, 4 May 2026 12:34:59 +0000 Subject: [PATCH 3/5] Add warning about re-entrant connections in README --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index 76e0907..12a0053 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,17 @@ connector.with_client do |client| end ``` +**Warning: re-entrant connections within the same thread** + +The `connection_pool` gem implements thread-local connection tracking. When a thread already holds a connection via `with_client` (or `check_out`), any nested call to `with_client` or `check_out` on the **same thread** returns the **same connection** — it does not check out a second one from the pool. + +This means that if you fire an async query on a connection and then attempt to run a second query (e.g. via `run_query` or `connector.query`) from within the same `with_client` block, the nested call will receive the already-checked-out connection. Sanitization will then fail with: + +``` +Connection sanitization failed: This connection is still waiting for a result, +try again once you have the result +``` + It can optionally accept an existing client to avoid starting new connections in the middle of a transaction. This can be used to ensure that a series of queries are wrapped by the same transaction. ```ruby From 7a0bc560d4ca7de512862daabef94e5e5ba95c1e Mon Sep 17 00:00:00 2001 From: Pawel Klapec Date: Mon, 4 May 2026 12:49:06 +0000 Subject: [PATCH 4/5] Refactor database connection setup in fixtures to improve clarity and maintainability --- spec/support/fixtures.rb | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/spec/support/fixtures.rb b/spec/support/fixtures.rb index edbf41f..a36a103 100644 --- a/spec/support/fixtures.rb +++ b/spec/support/fixtures.rb @@ -7,7 +7,7 @@ def self.execute connector = MysqlFramework::Connector.new( host: ENV.fetch('MYSQL_HOST'), port: ENV.fetch('MYSQL_PORT'), - database: ENV.fetch('MYSQL_DATABASE'), + database: nil, username: ENV.fetch('MYSQL_USERNAME'), password: ENV.fetch('MYSQL_PASSWORD') ) @@ -39,6 +39,16 @@ def self.execute SQL connector.check_in(client) + connector.dispose + + connector = MysqlFramework::Connector.new( + host: ENV.fetch('MYSQL_HOST'), + port: ENV.fetch('MYSQL_PORT'), + database: ENV.fetch('MYSQL_DATABASE'), + username: ENV.fetch('MYSQL_USERNAME'), + password: ENV.fetch('MYSQL_PASSWORD') + ) + connector.setup manager = MysqlFramework::Scripts::Manager.new(connector) manager.execute From bb9e5f767bbc854163e6d012437b524bd098ff20 Mon Sep 17 00:00:00 2001 From: Pawel Klapec Date: Mon, 4 May 2026 13:13:54 +0000 Subject: [PATCH 5/5] Code improvements and regression fixes --- README.md | 2 +- lib/mysql_framework/connector.rb | 6 +++--- lib/mysql_framework/mysql_connection_pool.rb | 3 --- spec/lib/mysql_framework/connector_spec.rb | 2 +- spec/lib/mysql_framework/mysql_connection_pool_spec.rb | 7 ------- 5 files changed, 5 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 12a0053..c4dac2e 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ gem 'mysql_framework' #### MySQL Connection Pooling Variables -* `MYSQL_CONNECTION_POOL_ENABLED` - enables/disables pooling (default: `false`) +* `MYSQL_CONNECTION_POOL_ENABLED` - enables/disables pooling (default: `true`) * `MYSQL_MAX_POOL_SIZE` - how many connections should the pool be allowed to grow to (default: `5`) * `MYSQL_POOL_TIMEOUT` - how long to wait for a pooled connection before timing out (default: `5` seconds) * `MYSQL_POOL_IDLE_TIMEOUT` - how long a pooled connection can remain idle before being reaped (default: `300` seconds) diff --git a/lib/mysql_framework/connector.rb b/lib/mysql_framework/connector.rb index bcd898f..74db906 100644 --- a/lib/mysql_framework/connector.rb +++ b/lib/mysql_framework/connector.rb @@ -154,10 +154,10 @@ def query_multiple_results(query_string, provided_client = nil) # @yield [client] block executed between BEGIN and COMMIT # @yieldparam client [Mysql2::Client] # @return [Object] block result - # @raise [LocalJumpError] when no block is given + # @raise [ArgumentError] when no block is given # @raise [StandardError] re-raises any exception after rollback def transaction - raise LocalJumpError, 'No block was given' unless block_given? + raise ArgumentError, 'No block was given' unless block_given? with_client do |client| client.query('BEGIN') @@ -198,7 +198,7 @@ def new_client def connection_pool_enabled? return @connection_pool_enabled unless @connection_pool_enabled.nil? - @connection_pool_enabled = ENV.fetch('MYSQL_CONNECTION_POOL_ENABLED', 'false').casecmp?('true') + @connection_pool_enabled = ENV.fetch('MYSQL_CONNECTION_POOL_ENABLED', 'true').casecmp?('true') end end end diff --git a/lib/mysql_framework/mysql_connection_pool.rb b/lib/mysql_framework/mysql_connection_pool.rb index 2529b85..ae660e1 100644 --- a/lib/mysql_framework/mysql_connection_pool.rb +++ b/lib/mysql_framework/mysql_connection_pool.rb @@ -133,9 +133,6 @@ def start_clean_idle_connections_thread end end - # require 'debug' - # debugger - @idle_connections_thread.abort_on_exception # = false @idle_connections_thread end diff --git a/spec/lib/mysql_framework/connector_spec.rb b/spec/lib/mysql_framework/connector_spec.rb index 9cacd66..3107eea 100644 --- a/spec/lib/mysql_framework/connector_spec.rb +++ b/spec/lib/mysql_framework/connector_spec.rb @@ -33,7 +33,7 @@ before do allow(ENV).to receive(:fetch).and_call_original - allow(ENV).to receive(:fetch).with('MYSQL_CONNECTION_POOL_ENABLED', 'false') + allow(ENV).to receive(:fetch).with('MYSQL_CONNECTION_POOL_ENABLED', 'true') .and_return(connection_pooling_enabled) subject.setup diff --git a/spec/lib/mysql_framework/mysql_connection_pool_spec.rb b/spec/lib/mysql_framework/mysql_connection_pool_spec.rb index 0545318..b35e173 100644 --- a/spec/lib/mysql_framework/mysql_connection_pool_spec.rb +++ b/spec/lib/mysql_framework/mysql_connection_pool_spec.rb @@ -70,13 +70,6 @@ subject.dispose expect { subject.dispose }.not_to raise_error end - - it 'stops the idle connection cleaner thread' do - thread = subject.instance_variable_get(:@idle_connections_thread) - subject.dispose - sleep(6) - expect(thread).not_to be_alive - end end describe '#pool_stats' do