diff --git a/composer.json b/composer.json index b2339bc..820aaf7 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,7 @@ }, "autoload": { "psr-4": { - "Soderlind\\RedisQueueDemo\\": "src/" + "Soderlind\\RedisQueue\\": "src/" } } } \ No newline at end of file diff --git a/redis-queue-demo.php b/redis-queue-demo.php index ae1ddfa..d6da4ee 100644 --- a/redis-queue-demo.php +++ b/redis-queue-demo.php @@ -1,17 +1,17 @@ enqueue_job( $job_type, $payload, $options ); -} - -function redis_queue_process_jobs( $queue = 'default', $max_jobs = null ) { - $instance = redis_queue_demo(); - if ( ! $instance->get_queue_manager() || ! $instance->get_job_processor() ) { - return false; - } - $worker = new \Soderlind\RedisQueueDemo\Workers\Sync_Worker( $instance->get_queue_manager(), $instance->get_job_processor() ); - return $worker->process_jobs( (array) $queue, $max_jobs ); -} +// All logic handled by redis-queue.php. diff --git a/redis-queue.php b/redis-queue.php new file mode 100644 index 0000000..8c59b5d --- /dev/null +++ b/redis-queue.php @@ -0,0 +1,75 @@ +enqueue_job( $job_type, $payload, $options ); +} + +function redis_queue_process_jobs( $queue = 'default', $max_jobs = null ) { + $instance = redis_queue(); + if ( ! $instance->get_queue_manager() || ! $instance->get_job_processor() ) { + return false; + } + $worker = new \Soderlind\RedisQueue\Workers\Sync_Worker( $instance->get_queue_manager(), $instance->get_job_processor() ); + return $worker->process_jobs( (array) $queue, $max_jobs ); +} diff --git a/src/Core/Job_Processor.php b/src/Core/Job_Processor.php index a392ace..f88e886 100644 --- a/src/Core/Job_Processor.php +++ b/src/Core/Job_Processor.php @@ -1,10 +1,10 @@ handle_failed_job( $job_id, $job, $result, 1, null ); } if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_job_processed', $job_id, $job, $result ); + \do_action( 'redis_queue_job_processed', $job_id, $job, $result ); } return $result; } catch (Exception $e) { @@ -62,7 +62,7 @@ public function process_job( $job_data ): Job_Result { $this->mark_job_failed( $job_id, $result ); } if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_job_failed', $job_id, $e, $job_data ); + \do_action( 'redis_queue_job_failed', $job_id, $e, $job_data ); } return $result; } finally { @@ -76,7 +76,7 @@ public function process_jobs( $queues = [ 'default' ], $max_jobs = 10 ): array { $start_time = microtime( true ); $start_memory = memory_get_usage( true ); if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_batch_start', $queues, $max_jobs ); + \do_action( 'redis_queue_batch_start', $queues, $max_jobs ); } while ( $processed < $max_jobs ) { $job_data = $this->queue_manager->dequeue( $queues ); @@ -93,7 +93,7 @@ public function process_jobs( $queues = [ 'default' ], $max_jobs = 10 ): array { $total_time = microtime( true ) - $start_time; $total_memory = memory_get_peak_usage( true ) - $start_memory; if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_batch_complete', $results, $processed, $total_time, $total_memory ); + \do_action( 'redis_queue_batch_complete', $results, $processed, $total_time, $total_memory ); } return [ 'processed' => $processed, 'total_time' => $total_time, 'total_memory' => $total_memory, 'results' => $results ]; } @@ -145,9 +145,9 @@ private function sanitize_job_data( array $job_data ): array { if ( is_string( $job_type ) && $job_type !== '' ) { // Canonical job type mapping only (legacy variants removed). $map = [ - 'email' => 'Soderlind\\RedisQueueDemo\\Jobs\\Email_Job', - 'image_processing' => 'Soderlind\\RedisQueueDemo\\Jobs\\Image_Processing_Job', - 'api_sync' => 'Soderlind\\RedisQueueDemo\\Jobs\\API_Sync_Job', + 'email' => 'Soderlind\\RedisQueue\\Jobs\\Email_Job', + 'image_processing' => 'Soderlind\\RedisQueue\\Jobs\\Image_Processing_Job', + 'api_sync' => 'Soderlind\\RedisQueue\\Jobs\\API_Sync_Job', ]; $key = $map[ strtolower( $job_type ) ] ?? null; if ( $key ) { @@ -180,7 +180,7 @@ private function get_job_class( $job_type ) { 'image_processing' => 'Soderlind\\RedisQueueDemo\\Jobs\\Image_Processing_Job', 'api_sync' => 'Soderlind\\RedisQueueDemo\\Jobs\\API_Sync_Job', ]; - $job_classes = function_exists( 'apply_filters' ) ? \apply_filters( 'redis_queue_demo_job_classes', $base_map ) : $base_map; + $job_classes = function_exists( 'apply_filters' ) ? \apply_filters( 'redis_queue_job_classes', $base_map ) : $base_map; if ( isset( $job_classes[ $job_type_normalized ] ) ) { return $job_classes[ $job_type_normalized ]; } @@ -199,7 +199,7 @@ private function handle_successful_job( $job_id, Job_Result $result ): void { $table = $wpdb->prefix . 'redis_queue_jobs'; $wpdb->update( $table, [ 'status' => 'completed', 'result' => ( function_exists( 'wp_json_encode' ) ? \wp_json_encode( $result->to_array() ) : json_encode( $result->to_array() ) ), 'updated_at' => ( function_exists( 'current_time' ) ? \current_time( 'mysql' ) : date( 'Y-m-d H:i:s' ) ) ], [ 'job_id' => $job_id ], [ '%s', '%s', '%s' ], [ '%s' ] ); if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_job_completed', $job_id, $result ); + \do_action( 'redis_queue_job_completed', $job_id, $result ); } } private function handle_failed_job( $job_id, Queue_Job $job, Job_Result $result, $attempt, $exception = null ): void { diff --git a/src/Core/Redis_Queue.php b/src/Core/Redis_Queue.php new file mode 100644 index 0000000..57081de --- /dev/null +++ b/src/Core/Redis_Queue.php @@ -0,0 +1,201 @@ +init_hooks(); + } + + private function init_hooks(): void { + \register_activation_hook( REDIS_QUEUE_PLUGIN_FILE, [ $this, 'activate' ] ); + \register_deactivation_hook( REDIS_QUEUE_PLUGIN_FILE, [ $this, 'deactivate' ] ); + \add_action( 'plugins_loaded', [ $this, 'load_textdomain' ] ); + \add_action( 'init', [ $this, 'init' ] ); + \add_action( 'rest_api_init', [ $this, 'init_rest_api' ] ); + } + + public function init(): void { + $this->load_dependencies(); + $this->init_components(); + \do_action( 'redis_queue_init', $this ); + } + + private function load_dependencies(): void { + // Autoloaded via Composer. + } + + private function init_components(): void { + $this->queue_manager = new Redis_Queue_Manager(); + $this->job_processor = new Job_Processor( $this->queue_manager ); + $this->rest_controller = new \Soderlind\RedisQueue\API\REST_Controller( $this->queue_manager, $this->job_processor ); + if ( \is_admin() ) { + $this->admin_interface = new \Soderlind\RedisQueue\Admin\Admin_Interface( $this->queue_manager, $this->job_processor ); + if ( method_exists( $this->admin_interface, 'init' ) ) { + $this->admin_interface->init(); + } + } + } + + public function init_rest_api(): void { + if ( $this->rest_controller ) { + $this->rest_controller->register_routes(); + } + } + + public function load_textdomain(): void { + \load_plugin_textdomain( 'redis-queue', false, dirname( REDIS_QUEUE_PLUGIN_BASENAME ) . '/languages' ); + } + + public function activate(): void { + if ( \version_compare( PHP_VERSION, REDIS_QUEUE_MIN_PHP, '<' ) ) { + \deactivate_plugins( REDIS_QUEUE_PLUGIN_BASENAME ); + \wp_die( \esc_html__( 'Redis Queue requires a newer PHP version.', 'redis-queue' ), \esc_html__( 'Plugin Activation Error', 'redis-queue' ), [ 'back_link' => true ] ); + } + if ( ! \extension_loaded( 'redis' ) && ! \class_exists( 'Predis\\Client' ) ) { + \deactivate_plugins( REDIS_QUEUE_PLUGIN_BASENAME ); + \wp_die( \esc_html__( 'Redis Queue requires either the Redis extension or Predis library.', 'redis-queue' ), \esc_html__( 'Plugin Activation Error', 'redis-queue' ), [ 'back_link' => true ] ); + } + $this->create_tables(); + $this->set_default_options(); + \flush_rewrite_rules(); + \do_action( 'redis_queue_activate' ); + } + + public function deactivate(): void { + \wp_clear_scheduled_hook( 'redis_queue_process_jobs' ); + \flush_rewrite_rules(); + \do_action( 'redis_queue_deactivate' ); + } + + private function create_tables(): void { + global $wpdb; + $charset_collate = $wpdb->get_charset_collate(); + $table_name = $wpdb->prefix . 'redis_queue_jobs'; + $sql = "CREATE TABLE $table_name ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + job_id varchar(255) NOT NULL, + job_type varchar(100) NOT NULL, + queue_name varchar(100) NOT NULL DEFAULT 'default', + priority int(11) NOT NULL DEFAULT 50, + status varchar(20) NOT NULL DEFAULT 'queued', + payload longtext, + result longtext, + attempts int(11) NOT NULL DEFAULT 0, + max_attempts int(11) NOT NULL DEFAULT 3, + timeout int(11) NOT NULL DEFAULT 300, + created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + processed_at datetime NULL, + failed_at datetime NULL, + error_message text, + PRIMARY KEY (id), + UNIQUE KEY job_id (job_id), + KEY status (status), + KEY queue_name (queue_name), + KEY priority (priority), + KEY created_at (created_at) + ) $charset_collate;"; + require_once ABSPATH . 'wp-admin/includes/upgrade.php'; + \dbDelta( $sql ); + } + + private function set_default_options(): void { + $default_options = [ + 'redis_host' => '127.0.0.1', + 'redis_port' => 6379, + 'redis_password' => '', + 'redis_database' => 0, + 'default_queue' => 'default', + 'max_jobs_per_run' => 10, + 'worker_timeout' => 300, + 'max_retries' => 3, + 'retry_backoff' => [ 60, 300, 900 ], + 'enable_logging' => true, + 'cleanup_completed_jobs' => true, + 'cleanup_after_days' => 7, + ]; + foreach ( $default_options as $option => $value ) { + $option_name = 'redis_queue_' . $option; + if ( false === \get_option( $option_name ) ) { + \add_option( $option_name, $value ); + } + } + } + + public function get_option( $option, $default = null ) { + return \get_option( 'redis_queue_' . $option, $default ); + } + public function update_option( $option, $value ) { + return \update_option( 'redis_queue_' . $option, $value ); + } + public function get_queue_manager() { + return $this->queue_manager; + } + public function get_job_processor() { + return $this->job_processor; + } + + public function enqueue_job( $job_type, $payload = [], $options = [] ) { + if ( ! $this->queue_manager ) { + return false; + } + $job = $this->create_job_instance( $job_type, $payload ); + if ( ! $job ) { + return false; + } + if ( isset( $options[ 'priority' ] ) ) { + $job->set_priority( (int) $options[ 'priority' ] ); + } + if ( isset( $options[ 'queue' ] ) ) { + $job->set_queue_name( $options[ 'queue' ] ); + } + if ( isset( $options[ 'delay' ] ) ) { + $job->set_delay_until( time() + (int) $options[ 'delay' ] ); + } + return $this->queue_manager->enqueue( $job ); + } + + private function create_job_instance( $job_type, $payload ) { + switch ( $job_type ) { + case 'email': + return new \Soderlind\RedisQueue\Jobs\Email_Job( $payload ); + case 'image_processing': + return new \Soderlind\RedisQueue\Jobs\Image_Processing_Job( $payload ); + case 'api_sync': + return new \Soderlind\RedisQueue\Jobs\API_Sync_Job( $payload ); + default: + return \apply_filters( 'redis_queue_create_job', null, $job_type, $payload ); + } + } +} diff --git a/src/Core/Redis_Queue_Manager.php b/src/Core/Redis_Queue_Manager.php index d8828a1..6d64d2e 100644 --- a/src/Core/Redis_Queue_Manager.php +++ b/src/Core/Redis_Queue_Manager.php @@ -1,9 +1,9 @@ connect(); - // One-time repair of legacy Redis job entries missing serialized_job.class + // One-time repair of legacy Redis job entries missing serialized_job.class (carried forward under new option prefix) if ( function_exists( 'get_option' ) && function_exists( 'update_option' ) ) { - $flag = \get_option( 'redis_queue_demo_repair_v1', '0' ); + $flag = \get_option( 'redis_queue_repair_v1', '0' ); if ( '0' === $flag ) { $this->repair_redis_jobs(); - \update_option( 'redis_queue_demo_repair_v1', '1' ); + \update_option( 'redis_queue_repair_v1', '1' ); + } + // Migrate legacy redis keys prefix redis_queue_demo: -> redis_queue: + $migrated_keys = \get_option( 'redis_queue_migrated_redis_keys_v2', '0' ); + if ( '0' === $migrated_keys && $this->connected ) { + $this->migrate_redis_keys_prefix(); + \update_option( 'redis_queue_migrated_redis_keys_v2', '1' ); } } } private function connect(): bool { try { - $host = \redis_queue_demo()->get_option( 'redis_host', '127.0.0.1' ); - $port = \redis_queue_demo()->get_option( 'redis_port', 6379 ); - $password = \redis_queue_demo()->get_option( 'redis_password', '' ); - $database = \redis_queue_demo()->get_option( 'redis_database', 0 ); + $host = \redis_queue()->get_option( 'redis_host', '127.0.0.1' ); + $port = \redis_queue()->get_option( 'redis_port', 6379 ); + $password = \redis_queue()->get_option( 'redis_password', '' ); + $database = \redis_queue()->get_option( 'redis_database', 0 ); if ( \extension_loaded( 'redis' ) ) { $this->redis = new \Redis(); @@ -56,7 +62,7 @@ private function connect(): bool { if ( $this->connected ) { if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_connected', $this ); + \do_action( 'redis_queue_connected', $this ); } } return $this->connected; @@ -102,7 +108,7 @@ public function enqueue( Queue_Job $job, $delay = null ) { $this->redis->zadd( $queue_key, $priority, json_encode( $job_data ) ); } if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_job_enqueued', $job_id, $job ); + \do_action( 'redis_queue_job_enqueued', $job_id, $job ); } return $job_id; } catch (Exception $e) { @@ -136,9 +142,9 @@ public function dequeue( $queues = [ 'default' ] ) { if ( empty( $decoded[ 'serialized_job' ][ 'class' ] ?? '' ) ) { $jt = $decoded[ 'job_type' ] ?? ''; $map = [ - 'email' => 'Soderlind\\RedisQueueDemo\\Jobs\\Email_Job', - 'image_processing' => 'Soderlind\\RedisQueueDemo\\Jobs\\Image_Processing_Job', - 'api_sync' => 'Soderlind\\RedisQueueDemo\\Jobs\\API_Sync_Job', + 'email' => 'Soderlind\\RedisQueue\\Jobs\\Email_Job', + 'image_processing' => 'Soderlind\\RedisQueue\\Jobs\\Image_Processing_Job', + 'api_sync' => 'Soderlind\\RedisQueue\\Jobs\\API_Sync_Job', ]; $inferred = $map[ strtolower( $jt ) ] ?? null; if ( ! $inferred && str_contains( $jt, '\\' ) && class_exists( $jt ) ) { @@ -157,7 +163,7 @@ public function dequeue( $queues = [ 'default' ] ) { } $this->update_job_status( $decoded[ 'job_id' ], 'processing' ); if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_job_dequeued', $decoded ); + \do_action( 'redis_queue_job_dequeued', $decoded ); } return $decoded; } @@ -216,7 +222,7 @@ public function clear_queue( $queue_name ) { $queue_key = $this->get_queue_key( $queue_name ); $result = $this->redis->del( $queue_key ); if ( function_exists( 'do_action' ) ) { - \do_action( 'redis_queue_demo_queue_cleared', $queue_name ); + \do_action( 'redis_queue_queue_cleared', $queue_name ); } return $result > 0; } catch (Exception $e) { @@ -225,6 +231,44 @@ public function clear_queue( $queue_name ) { } } + /** + * Migrate Redis keys from legacy prefix redis_queue_demo: to redis_queue: + */ + private function migrate_redis_keys_prefix(): void { + if ( ! $this->is_connected() ) { + return; + } + try { + $legacy_prefix = 'redis_queue_demo:'; + $scan = 0; + if ( $this->redis instanceof \Redis ) { + // Use SCAN cursor for native Redis extension. + do { + $keys = $this->redis->scan( $scan, $legacy_prefix . '*' ); + if ( $keys ) { + foreach ( $keys as $legacy_key ) { + $new_key = 'redis_queue:' . substr( $legacy_key, strlen( $legacy_prefix ) ); + if ( $new_key !== $legacy_key && ! $this->redis->exists( $new_key ) ) { + $this->redis->rename( $legacy_key, $new_key ); + } + } + } + } while ( $scan > 0 ); + } else { + // Predis fallback: KEYS (acceptable for small key counts; documented as best-effort). + $keys = $this->redis->keys( $legacy_prefix . '*' ); + foreach ( $keys as $legacy_key ) { + $new_key = 'redis_queue:' . substr( $legacy_key, strlen( $legacy_prefix ) ); + if ( $new_key !== $legacy_key && ! $this->redis->exists( $new_key ) ) { + $this->redis->rename( $legacy_key, $new_key ); + } + } + } + } catch (Exception $e) { + \error_log( 'Redis Queue: Key prefix migration failed - ' . $e->getMessage() ); + } + } + private function process_delayed_jobs(): int { if ( ! $this->is_connected() ) { return 0; @@ -267,9 +311,9 @@ private function repair_redis_jobs(): void { $pattern = $this->queue_prefix . 'queue:*'; $keys = $this->redis->keys( $pattern ); $map = [ - 'email' => 'Soderlind\\RedisQueueDemo\\Jobs\\Email_Job', - 'image_processing' => 'Soderlind\\RedisQueueDemo\\Jobs\\Image_Processing_Job', - 'api_sync' => 'Soderlind\\RedisQueueDemo\\Jobs\\API_Sync_Job', + 'email' => 'Soderlind\\RedisQueue\\Jobs\\Email_Job', + 'image_processing' => 'Soderlind\\RedisQueue\\Jobs\\Image_Processing_Job', + 'api_sync' => 'Soderlind\\RedisQueue\\Jobs\\API_Sync_Job', ]; $fixed = 0; $removed = 0; diff --git a/src/Workers/Sync_Worker.php b/src/Workers/Sync_Worker.php index 36a122a..34d0262 100644 --- a/src/Workers/Sync_Worker.php +++ b/src/Workers/Sync_Worker.php @@ -1,8 +1,8 @@ config[ 'max_jobs_per_run' ]; } function_exists( '\do_action' ) && \do_action( 'redis_queue_demo_worker_start', $this, $queues, $max_jobs ); + function_exists( '\do_action' ) && \do_action( 'redis_queue_worker_start', $this, $queues, $max_jobs ); try { $results = $this->job_processor->process_jobs( $queues, $max_jobs ); $this->stats[ 'jobs_processed' ] += $results[ 'processed' ]; @@ -66,6 +67,7 @@ function_exists( '\do_action' ) && \do_action( 'redis_queue_demo_worker_start', } $this->state = 'idle'; function_exists( '\do_action' ) && \do_action( 'redis_queue_demo_worker_complete', $this, $results ); + function_exists( '\do_action' ) && \do_action( 'redis_queue_worker_complete', $this, $results ); return [ 'success' => true, 'processed' => $results[ 'processed' ], @@ -77,6 +79,7 @@ function_exists( '\do_action' ) && \do_action( 'redis_queue_demo_worker_complete } catch (Exception $e) { $this->state = 'error'; function_exists( '\do_action' ) && \do_action( 'redis_queue_demo_worker_error', $this, $e ); + function_exists( '\do_action' ) && \do_action( 'redis_queue_worker_error', $this, $e ); return [ 'success' => false, 'error' => $e ? $e->getMessage() : 'Unknown error occurred', 'code' => $e ? $e->getCode() : 0 ]; } catch (Throwable $e) { $this->state = 'error'; @@ -139,9 +142,9 @@ public function process_job_by_id( $job_id ) { $job_class = $job_type; } else { $map = [ - 'email' => 'Soderlind\\RedisQueueDemo\\Jobs\\Email_Job', - 'image_processing' => 'Soderlind\\RedisQueueDemo\\Jobs\\Image_Processing_Job', - 'api_sync' => 'Soderlind\\RedisQueueDemo\\Jobs\\API_Sync_Job', + 'email' => 'Soderlind\\RedisQueue\\Jobs\\Email_Job', + 'image_processing' => 'Soderlind\\RedisQueue\\Jobs\\Image_Processing_Job', + 'api_sync' => 'Soderlind\\RedisQueue\\Jobs\\API_Sync_Job', ]; $job_class = $map[ $job_type ] ?? null; } @@ -232,9 +235,9 @@ public function should_stop() { private function parse_config( $config ) { $defaults = [ - 'max_jobs_per_run' => \redis_queue_demo()->get_option( 'max_jobs_per_run', 10 ), + 'max_jobs_per_run' => \redis_queue()->get_option( 'max_jobs_per_run', 10 ), 'memory_limit' => ini_get( 'memory_limit' ), - 'max_execution_time' => \redis_queue_demo()->get_option( 'worker_timeout', 300 ), + 'max_execution_time' => \redis_queue()->get_option( 'worker_timeout', 300 ), 'sleep_interval' => 1, 'retry_failed_jobs' => true, 'cleanup_on_shutdown' => true,