- Bump DESK_MOLONI version to 3.0.1 across module - Normalize hooks to after_client_* and instantiate PerfexHooks safely - Fix OAuthController view path and API client class name - Add missing admin views for webhook config/logs; adjust view loading - Harden client portal routes and admin routes mapping - Make Dashboard/Logs/Queue tolerant to optional model methods - Align log details query with existing schema; avoid broken joins This makes the module operational in Perfex (admin + client), reduces 404s, and avoids fatal errors due to inconsistent tables/methods.
905 lines
30 KiB
PHP
905 lines
30 KiB
PHP
<?php
|
|
|
|
/**
|
|
* Enhanced Queue Processor
|
|
* Redis-based queue processor with exponential backoff retry logic and conflict resolution
|
|
*
|
|
* @package DeskMoloni
|
|
* @subpackage Libraries
|
|
* @category QueueProcessor
|
|
* @author Descomplicar® - PHP Fullstack Engineer
|
|
* @version 1.0.0
|
|
*/
|
|
|
|
defined('BASEPATH') or exit('No direct script access allowed');
|
|
|
|
class QueueProcessor
|
|
{
|
|
protected $CI;
|
|
protected $redis;
|
|
protected $model;
|
|
protected $entity_mapping;
|
|
protected $error_handler;
|
|
protected $retry_handler;
|
|
|
|
// Queue configuration
|
|
const REDIS_PREFIX = 'desk_moloni:queue:';
|
|
const QUEUE_MAIN = 'main';
|
|
const QUEUE_PRIORITY = 'priority';
|
|
const QUEUE_DELAY = 'delay';
|
|
const QUEUE_DEAD_LETTER = 'dead_letter';
|
|
const QUEUE_PROCESSING = 'processing';
|
|
|
|
// Queue priorities
|
|
const PRIORITY_LOW = 1;
|
|
const PRIORITY_NORMAL = 2;
|
|
const PRIORITY_HIGH = 3;
|
|
const PRIORITY_CRITICAL = 4;
|
|
|
|
// Processing status
|
|
const STATUS_PENDING = 'pending';
|
|
const STATUS_PROCESSING = 'processing';
|
|
const STATUS_COMPLETED = 'completed';
|
|
const STATUS_FAILED = 'failed';
|
|
const STATUS_RETRYING = 'retrying';
|
|
|
|
// Retry configuration
|
|
const MAX_ATTEMPTS = 5;
|
|
const RETRY_DELAYS = [30, 120, 300, 900, 1800]; // 30s, 2m, 5m, 15m, 30m
|
|
const BATCH_SIZE = 20;
|
|
const MEMORY_LIMIT = 512 * 1024 * 1024; // 512MB
|
|
const TIME_LIMIT = 300; // 5 minutes
|
|
const PROCESSING_TIMEOUT = 600; // 10 minutes
|
|
|
|
public function __construct()
|
|
{
|
|
$this->CI = &get_instance();
|
|
$this->CI->load->model('desk_moloni_model');
|
|
$this->model = $this->CI->desk_moloni_model;
|
|
|
|
// Initialize Redis connection
|
|
$this->init_redis();
|
|
|
|
// Initialize supporting services
|
|
$this->entity_mapping = new EntityMappingService();
|
|
$this->error_handler = new ErrorHandler();
|
|
$this->retry_handler = new RetryHandler();
|
|
|
|
// Set memory and time limits
|
|
ini_set('memory_limit', '512M');
|
|
set_time_limit(self::TIME_LIMIT);
|
|
|
|
log_activity('Enhanced QueueProcessor initialized with Redis backend');
|
|
}
|
|
|
|
/**
|
|
* Initialize Redis connection
|
|
*/
|
|
protected function init_redis()
|
|
{
|
|
if (!extension_loaded('redis')) {
|
|
throw new \Exception('Redis extension not loaded');
|
|
}
|
|
|
|
$this->redis = new \Redis();
|
|
|
|
$redis_host = get_option('desk_moloni_redis_host', '127.0.0.1');
|
|
$redis_port = (int)get_option('desk_moloni_redis_port', 6379);
|
|
$redis_password = get_option('desk_moloni_redis_password', '');
|
|
$redis_db = (int)get_option('desk_moloni_redis_db', 0);
|
|
|
|
if (!$this->redis->connect($redis_host, $redis_port, 2.5)) {
|
|
throw new \Exception('Failed to connect to Redis server');
|
|
}
|
|
|
|
if (!empty($redis_password)) {
|
|
$this->redis->auth($redis_password);
|
|
}
|
|
|
|
$this->redis->select($redis_db);
|
|
|
|
log_activity("Connected to Redis server at {$redis_host}:{$redis_port}");
|
|
}
|
|
|
|
/**
|
|
* Add item to sync queue
|
|
*
|
|
* @param string $entity_type
|
|
* @param int $entity_id
|
|
* @param string $action
|
|
* @param string $direction
|
|
* @param int $priority
|
|
* @param array $data
|
|
* @param int $delay_seconds
|
|
* @return string|false Queue job ID
|
|
*/
|
|
public function add_to_queue($entity_type, $entity_id, $action, $direction = 'perfex_to_moloni', $priority = self::PRIORITY_NORMAL, $data = [], $delay_seconds = 0)
|
|
{
|
|
// Validate parameters
|
|
if (!$this->validate_queue_params($entity_type, $action, $direction, $priority)) {
|
|
return false;
|
|
}
|
|
|
|
// Generate unique job ID
|
|
$job_id = $this->generate_job_id($entity_type, $entity_id, $action);
|
|
|
|
// Check for duplicate pending job
|
|
if ($this->is_job_pending($job_id)) {
|
|
log_activity("Job {$job_id} already pending, updating priority if higher");
|
|
return $this->update_job_priority($job_id, $priority) ? $job_id : false;
|
|
}
|
|
|
|
// Create job data
|
|
$job_data = [
|
|
'id' => $job_id,
|
|
'entity_type' => $entity_type,
|
|
'entity_id' => $entity_id,
|
|
'action' => $action,
|
|
'direction' => $direction,
|
|
'priority' => $priority,
|
|
'data' => $data,
|
|
'attempts' => 0,
|
|
'max_attempts' => self::MAX_ATTEMPTS,
|
|
'created_at' => time(),
|
|
'scheduled_at' => time() + $delay_seconds,
|
|
'status' => self::STATUS_PENDING,
|
|
'processing_node' => gethostname()
|
|
];
|
|
|
|
$job_json = json_encode($job_data);
|
|
|
|
try {
|
|
// Add to appropriate queue
|
|
if ($delay_seconds > 0) {
|
|
// Add to delay queue with score as execution time
|
|
$this->redis->zAdd(self::REDIS_PREFIX . self::QUEUE_DELAY, $job_data['scheduled_at'], $job_json);
|
|
} elseif ($priority >= self::PRIORITY_HIGH) {
|
|
// Add to priority queue
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_PRIORITY, $job_json);
|
|
} else {
|
|
// Add to main queue
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_MAIN, $job_json);
|
|
}
|
|
|
|
// Store job data for tracking
|
|
$this->redis->hSet(self::REDIS_PREFIX . 'jobs', $job_id, $job_json);
|
|
|
|
// Update statistics
|
|
$this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_queued', 1);
|
|
$this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', "queued_{$entity_type}", 1);
|
|
|
|
log_activity("Added {$entity_type} #{$entity_id} to sync queue: {$job_id} (priority: {$priority})");
|
|
|
|
return $job_id;
|
|
|
|
} catch (\Exception $e) {
|
|
$this->error_handler->log_error('queue', 'QUEUE_ADD_FAILED', $e->getMessage(), [
|
|
'entity_type' => $entity_type,
|
|
'entity_id' => $entity_id,
|
|
'job_id' => $job_id
|
|
]);
|
|
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process queue items
|
|
*
|
|
* @param int $limit
|
|
* @param int $time_limit
|
|
* @return array
|
|
*/
|
|
public function process_queue($limit = self::BATCH_SIZE, $time_limit = self::TIME_LIMIT)
|
|
{
|
|
$start_time = microtime(true);
|
|
$processed = 0;
|
|
$success = 0;
|
|
$errors = 0;
|
|
$details = [];
|
|
|
|
try {
|
|
// Check if queue processing is paused
|
|
if ($this->is_queue_paused()) {
|
|
return [
|
|
'processed' => 0,
|
|
'success' => 0,
|
|
'errors' => 0,
|
|
'message' => 'Queue processing is paused',
|
|
'execution_time' => 0
|
|
];
|
|
}
|
|
|
|
// Move delayed jobs to main queue if ready
|
|
$this->process_delayed_jobs();
|
|
|
|
// Process jobs
|
|
while ($processed < $limit && (microtime(true) - $start_time) < ($time_limit - 30)) {
|
|
$job = $this->get_next_job();
|
|
|
|
if (!$job) {
|
|
break; // No more jobs
|
|
}
|
|
|
|
// Check memory usage
|
|
if (memory_get_usage(true) > self::MEMORY_LIMIT) {
|
|
log_message('warning', 'Memory limit approaching, stopping queue processing');
|
|
break;
|
|
}
|
|
|
|
$result = $this->process_job($job);
|
|
$processed++;
|
|
|
|
if ($result['success']) {
|
|
$success++;
|
|
} else {
|
|
$errors++;
|
|
}
|
|
|
|
$details[] = [
|
|
'job_id' => $job['id'],
|
|
'entity_type' => $job['entity_type'],
|
|
'entity_id' => $job['entity_id'],
|
|
'action' => $job['action'],
|
|
'direction' => $job['direction'],
|
|
'success' => $result['success'],
|
|
'message' => $result['message'],
|
|
'execution_time' => $result['execution_time'] ?? 0
|
|
];
|
|
}
|
|
|
|
$execution_time = microtime(true) - $start_time;
|
|
|
|
// Update statistics
|
|
$this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_processed', $processed);
|
|
$this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_success', $success);
|
|
$this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_errors', $errors);
|
|
|
|
log_activity("Queue processing completed: {$processed} processed, {$success} success, {$errors} errors in {$execution_time}s");
|
|
|
|
return [
|
|
'processed' => $processed,
|
|
'success' => $success,
|
|
'errors' => $errors,
|
|
'details' => $details,
|
|
'execution_time' => $execution_time
|
|
];
|
|
|
|
} catch (\Exception $e) {
|
|
$this->error_handler->log_error('queue', 'QUEUE_PROCESSING_FAILED', $e->getMessage());
|
|
|
|
return [
|
|
'processed' => $processed,
|
|
'success' => $success,
|
|
'errors' => $errors + 1,
|
|
'message' => $e->getMessage(),
|
|
'execution_time' => microtime(true) - $start_time
|
|
];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get next job from queue
|
|
*
|
|
* @return array|null
|
|
*/
|
|
protected function get_next_job()
|
|
{
|
|
// First check priority queue
|
|
$job_json = $this->redis->rPop(self::REDIS_PREFIX . self::QUEUE_PRIORITY);
|
|
|
|
// Then check main queue
|
|
if (!$job_json) {
|
|
$job_json = $this->redis->rPop(self::REDIS_PREFIX . self::QUEUE_MAIN);
|
|
}
|
|
|
|
if (!$job_json) {
|
|
return null;
|
|
}
|
|
|
|
$job = json_decode($job_json, true);
|
|
|
|
// Move to processing queue
|
|
$this->redis->hSet(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id'], $job_json);
|
|
$this->redis->expire(self::REDIS_PREFIX . self::QUEUE_PROCESSING, self::PROCESSING_TIMEOUT);
|
|
|
|
return $job;
|
|
}
|
|
|
|
/**
|
|
* Process single job
|
|
*
|
|
* @param array $job
|
|
* @return array
|
|
*/
|
|
protected function process_job($job)
|
|
{
|
|
$start_time = microtime(true);
|
|
|
|
try {
|
|
// Update job status
|
|
$job['status'] = self::STATUS_PROCESSING;
|
|
$job['started_at'] = time();
|
|
$job['attempts']++;
|
|
|
|
$this->update_job_data($job);
|
|
|
|
// Execute sync operation
|
|
$result = $this->execute_sync_operation($job);
|
|
|
|
if ($result['success']) {
|
|
// Mark as completed
|
|
$job['status'] = self::STATUS_COMPLETED;
|
|
$job['completed_at'] = time();
|
|
$job['result'] = $result;
|
|
|
|
$this->complete_job($job);
|
|
|
|
log_activity("Job {$job['id']} processed successfully: {$job['entity_type']} #{$job['entity_id']} {$job['action']}");
|
|
|
|
return [
|
|
'success' => true,
|
|
'message' => $result['message'],
|
|
'execution_time' => microtime(true) - $start_time
|
|
];
|
|
} else {
|
|
throw new \Exception($result['message']);
|
|
}
|
|
|
|
} catch (\Exception $e) {
|
|
$execution_time = microtime(true) - $start_time;
|
|
|
|
if ($job['attempts'] >= $job['max_attempts']) {
|
|
// Move to dead letter queue
|
|
$job['status'] = self::STATUS_FAILED;
|
|
$job['failed_at'] = time();
|
|
$job['error'] = $e->getMessage();
|
|
|
|
$this->move_to_dead_letter_queue($job);
|
|
|
|
log_message('error', "Job {$job['id']} failed permanently after {$job['attempts']} attempts: " . $e->getMessage());
|
|
|
|
return [
|
|
'success' => false,
|
|
'message' => "Failed permanently: " . $e->getMessage(),
|
|
'execution_time' => $execution_time
|
|
];
|
|
} else {
|
|
// Schedule retry with exponential backoff
|
|
$retry_delay = $this->retry_handler->calculate_retry_delay($job['attempts']);
|
|
$job['status'] = self::STATUS_RETRYING;
|
|
$job['retry_at'] = time() + $retry_delay;
|
|
$job['last_error'] = $e->getMessage();
|
|
|
|
$this->schedule_retry($job, $retry_delay);
|
|
|
|
log_message('info', "Job {$job['id']} scheduled for retry #{$job['attempts']} in {$retry_delay}s: " . $e->getMessage());
|
|
|
|
return [
|
|
'success' => false,
|
|
'message' => "Retry #{$job['attempts']} scheduled: " . $e->getMessage(),
|
|
'execution_time' => $execution_time
|
|
];
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute sync operation
|
|
*
|
|
* @param array $job
|
|
* @return array
|
|
*/
|
|
protected function execute_sync_operation($job)
|
|
{
|
|
// Load appropriate sync service
|
|
$sync_service = $this->get_sync_service($job['entity_type']);
|
|
|
|
if (!$sync_service) {
|
|
throw new \Exception("No sync service available for entity type: {$job['entity_type']}");
|
|
}
|
|
|
|
// Execute sync based on direction
|
|
switch ($job['direction']) {
|
|
case 'perfex_to_moloni':
|
|
return $sync_service->sync_perfex_to_moloni($job['entity_id'], $job['action'] === 'update', $job['data']);
|
|
|
|
case 'moloni_to_perfex':
|
|
return $sync_service->sync_moloni_to_perfex($job['entity_id'], $job['action'] === 'update', $job['data']);
|
|
|
|
case 'bidirectional':
|
|
// Handle bidirectional sync with conflict detection
|
|
return $this->handle_bidirectional_sync($sync_service, $job);
|
|
|
|
default:
|
|
throw new \Exception("Unknown sync direction: {$job['direction']}");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle bidirectional sync with conflict detection
|
|
*
|
|
* @param object $sync_service
|
|
* @param array $job
|
|
* @return array
|
|
*/
|
|
protected function handle_bidirectional_sync($sync_service, $job)
|
|
{
|
|
// Get entity mapping
|
|
$mapping = $this->entity_mapping->get_mapping_by_perfex_id($job['entity_type'], $job['entity_id']);
|
|
|
|
if (!$mapping) {
|
|
// No mapping exists, sync from Perfex to Moloni
|
|
return $sync_service->sync_perfex_to_moloni($job['entity_id'], false, $job['data']);
|
|
}
|
|
|
|
// Check for conflicts
|
|
$conflict_check = $sync_service->check_sync_conflicts($mapping);
|
|
|
|
if ($conflict_check['has_conflict']) {
|
|
// Mark mapping as conflict and require manual resolution
|
|
$this->entity_mapping->update_mapping_status($mapping->id, EntityMappingService::STATUS_CONFLICT, $conflict_check['conflict_details']);
|
|
|
|
return [
|
|
'success' => false,
|
|
'message' => 'Sync conflict detected, manual resolution required',
|
|
'conflict_details' => $conflict_check['conflict_details']
|
|
];
|
|
}
|
|
|
|
// Determine sync direction based on modification timestamps
|
|
$sync_direction = $this->determine_sync_direction($mapping, $job);
|
|
|
|
if ($sync_direction === 'perfex_to_moloni') {
|
|
return $sync_service->sync_perfex_to_moloni($job['entity_id'], true, $job['data']);
|
|
} else {
|
|
return $sync_service->sync_moloni_to_perfex($mapping->moloni_id, true, $job['data']);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Determine sync direction based on timestamps
|
|
*
|
|
* @param object $mapping
|
|
* @param array $job
|
|
* @return string
|
|
*/
|
|
protected function determine_sync_direction($mapping, $job)
|
|
{
|
|
$perfex_modified = strtotime($mapping->last_sync_perfex ?: '1970-01-01');
|
|
$moloni_modified = strtotime($mapping->last_sync_moloni ?: '1970-01-01');
|
|
|
|
// If one side was never synced, sync from the other
|
|
if ($perfex_modified === false || $perfex_modified < 1) {
|
|
return 'moloni_to_perfex';
|
|
}
|
|
|
|
if ($moloni_modified === false || $moloni_modified < 1) {
|
|
return 'perfex_to_moloni';
|
|
}
|
|
|
|
// Sync from most recently modified
|
|
return $perfex_modified > $moloni_modified ? 'perfex_to_moloni' : 'moloni_to_perfex';
|
|
}
|
|
|
|
/**
|
|
* Get sync service for entity type
|
|
*
|
|
* @param string $entity_type
|
|
* @return object|null
|
|
*/
|
|
protected function get_sync_service($entity_type)
|
|
{
|
|
$service_class = null;
|
|
|
|
switch ($entity_type) {
|
|
case EntityMappingService::ENTITY_CUSTOMER:
|
|
$service_class = 'DeskMoloni\\Libraries\\ClientSyncService';
|
|
break;
|
|
case EntityMappingService::ENTITY_PRODUCT:
|
|
$service_class = 'DeskMoloni\\Libraries\\ProductSyncService';
|
|
break;
|
|
case EntityMappingService::ENTITY_INVOICE:
|
|
$service_class = 'DeskMoloni\\Libraries\\InvoiceSyncService';
|
|
break;
|
|
case EntityMappingService::ENTITY_ESTIMATE:
|
|
$service_class = 'DeskMoloni\\Libraries\\EstimateSyncService';
|
|
break;
|
|
case EntityMappingService::ENTITY_CREDIT_NOTE:
|
|
$service_class = 'DeskMoloni\\Libraries\\CreditNoteSyncService';
|
|
break;
|
|
}
|
|
|
|
if ($service_class && class_exists($service_class)) {
|
|
return new $service_class();
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Complete job successfully
|
|
*
|
|
* @param array $job
|
|
*/
|
|
protected function complete_job($job)
|
|
{
|
|
// Remove from processing queue
|
|
$this->redis->hDel(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id']);
|
|
|
|
// Update job data
|
|
$this->update_job_data($job);
|
|
|
|
// Set expiration for completed job (7 days)
|
|
$this->redis->expire(self::REDIS_PREFIX . 'jobs:' . $job['id'], 7 * 24 * 3600);
|
|
}
|
|
|
|
/**
|
|
* Schedule job retry
|
|
*
|
|
* @param array $job
|
|
* @param int $delay_seconds
|
|
*/
|
|
protected function schedule_retry($job, $delay_seconds)
|
|
{
|
|
// Remove from processing queue
|
|
$this->redis->hDel(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id']);
|
|
|
|
// Add to delay queue
|
|
$this->redis->zAdd(self::REDIS_PREFIX . self::QUEUE_DELAY, time() + $delay_seconds, json_encode($job));
|
|
|
|
// Update job data
|
|
$this->update_job_data($job);
|
|
}
|
|
|
|
/**
|
|
* Move job to dead letter queue
|
|
*
|
|
* @param array $job
|
|
*/
|
|
protected function move_to_dead_letter_queue($job)
|
|
{
|
|
// Remove from processing queue
|
|
$this->redis->hDel(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id']);
|
|
|
|
// Add to dead letter queue
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_DEAD_LETTER, json_encode($job));
|
|
|
|
// Update job data
|
|
$this->update_job_data($job);
|
|
|
|
// Log to error handler
|
|
$this->error_handler->log_error('queue', 'JOB_DEAD_LETTER', 'Job moved to dead letter queue', [
|
|
'job_id' => $job['id'],
|
|
'entity_type' => $job['entity_type'],
|
|
'entity_id' => $job['entity_id'],
|
|
'attempts' => $job['attempts'],
|
|
'error' => $job['error'] ?? 'Unknown error'
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Process delayed jobs that are ready
|
|
*/
|
|
protected function process_delayed_jobs()
|
|
{
|
|
$current_time = time();
|
|
|
|
// Get jobs that are ready to process
|
|
$ready_jobs = $this->redis->zRangeByScore(
|
|
self::REDIS_PREFIX . self::QUEUE_DELAY,
|
|
0,
|
|
$current_time,
|
|
['limit' => [0, 100]]
|
|
);
|
|
|
|
foreach ($ready_jobs as $job_json) {
|
|
$job = json_decode($job_json, true);
|
|
|
|
// Remove from delay queue
|
|
$this->redis->zRem(self::REDIS_PREFIX . self::QUEUE_DELAY, $job_json);
|
|
|
|
// Add to appropriate queue based on priority
|
|
if ($job['priority'] >= self::PRIORITY_HIGH) {
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_PRIORITY, $job_json);
|
|
} else {
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_MAIN, $job_json);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Update job data in Redis
|
|
*
|
|
* @param array $job
|
|
*/
|
|
protected function update_job_data($job)
|
|
{
|
|
$this->redis->hSet(self::REDIS_PREFIX . 'jobs', $job['id'], json_encode($job));
|
|
}
|
|
|
|
/**
|
|
* Generate unique job ID
|
|
*
|
|
* @param string $entity_type
|
|
* @param int $entity_id
|
|
* @param string $action
|
|
* @return string
|
|
*/
|
|
protected function generate_job_id($entity_type, $entity_id, $action)
|
|
{
|
|
return "{$entity_type}_{$entity_id}_{$action}_" . uniqid();
|
|
}
|
|
|
|
/**
|
|
* Check if job is already pending
|
|
*
|
|
* @param string $job_id
|
|
* @return bool
|
|
*/
|
|
protected function is_job_pending($job_id)
|
|
{
|
|
return $this->redis->hExists(self::REDIS_PREFIX . 'jobs', $job_id);
|
|
}
|
|
|
|
/**
|
|
* Update job priority
|
|
*
|
|
* @param string $job_id
|
|
* @param int $new_priority
|
|
* @return bool
|
|
*/
|
|
protected function update_job_priority($job_id, $new_priority)
|
|
{
|
|
$job_json = $this->redis->hGet(self::REDIS_PREFIX . 'jobs', $job_id);
|
|
|
|
if (!$job_json) {
|
|
return false;
|
|
}
|
|
|
|
$job = json_decode($job_json, true);
|
|
|
|
if ($new_priority <= $job['priority']) {
|
|
return true; // No update needed
|
|
}
|
|
|
|
$job['priority'] = $new_priority;
|
|
$this->update_job_data($job);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Validate queue parameters
|
|
*
|
|
* @param string $entity_type
|
|
* @param string $action
|
|
* @param string $direction
|
|
* @param int $priority
|
|
* @return bool
|
|
*/
|
|
protected function validate_queue_params($entity_type, $action, $direction, $priority)
|
|
{
|
|
$valid_entities = [
|
|
EntityMappingService::ENTITY_CUSTOMER,
|
|
EntityMappingService::ENTITY_PRODUCT,
|
|
EntityMappingService::ENTITY_INVOICE,
|
|
EntityMappingService::ENTITY_ESTIMATE,
|
|
EntityMappingService::ENTITY_CREDIT_NOTE
|
|
];
|
|
|
|
$valid_actions = ['create', 'update', 'delete'];
|
|
$valid_directions = ['perfex_to_moloni', 'moloni_to_perfex', 'bidirectional'];
|
|
$valid_priorities = [self::PRIORITY_LOW, self::PRIORITY_NORMAL, self::PRIORITY_HIGH, self::PRIORITY_CRITICAL];
|
|
|
|
if (!in_array($entity_type, $valid_entities)) {
|
|
log_message('error', "Invalid entity type: {$entity_type}");
|
|
return false;
|
|
}
|
|
|
|
if (!in_array($action, $valid_actions)) {
|
|
log_message('error', "Invalid action: {$action}");
|
|
return false;
|
|
}
|
|
|
|
if (!in_array($direction, $valid_directions)) {
|
|
log_message('error', "Invalid direction: {$direction}");
|
|
return false;
|
|
}
|
|
|
|
if (!in_array($priority, $valid_priorities)) {
|
|
log_message('error', "Invalid priority: {$priority}");
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Get queue statistics
|
|
*
|
|
* @return array
|
|
*/
|
|
public function get_queue_statistics()
|
|
{
|
|
$stats = $this->redis->hGetAll(self::REDIS_PREFIX . 'stats');
|
|
|
|
return [
|
|
'pending_main' => $this->redis->lLen(self::REDIS_PREFIX . self::QUEUE_MAIN),
|
|
'pending_priority' => $this->redis->lLen(self::REDIS_PREFIX . self::QUEUE_PRIORITY),
|
|
'delayed' => $this->redis->zCard(self::REDIS_PREFIX . self::QUEUE_DELAY),
|
|
'processing' => $this->redis->hLen(self::REDIS_PREFIX . self::QUEUE_PROCESSING),
|
|
'dead_letter' => $this->redis->lLen(self::REDIS_PREFIX . self::QUEUE_DEAD_LETTER),
|
|
'total_queued' => (int)($stats['total_queued'] ?? 0),
|
|
'total_processed' => (int)($stats['total_processed'] ?? 0),
|
|
'total_success' => (int)($stats['total_success'] ?? 0),
|
|
'total_errors' => (int)($stats['total_errors'] ?? 0),
|
|
'success_rate' => $this->calculate_success_rate($stats),
|
|
'memory_usage' => memory_get_usage(true),
|
|
'peak_memory' => memory_get_peak_usage(true)
|
|
];
|
|
}
|
|
|
|
/**
|
|
* Calculate success rate
|
|
*
|
|
* @param array $stats
|
|
* @return float
|
|
*/
|
|
protected function calculate_success_rate($stats)
|
|
{
|
|
$total_processed = (int)($stats['total_processed'] ?? 0);
|
|
$total_success = (int)($stats['total_success'] ?? 0);
|
|
|
|
return $total_processed > 0 ? round(($total_success / $total_processed) * 100, 2) : 0;
|
|
}
|
|
|
|
/**
|
|
* Check if queue is paused
|
|
*
|
|
* @return bool
|
|
*/
|
|
public function is_queue_paused()
|
|
{
|
|
return $this->redis->get(self::REDIS_PREFIX . 'paused') === '1';
|
|
}
|
|
|
|
/**
|
|
* Pause queue processing
|
|
*/
|
|
public function pause_queue()
|
|
{
|
|
$this->redis->set(self::REDIS_PREFIX . 'paused', '1');
|
|
log_activity('Queue processing paused');
|
|
}
|
|
|
|
/**
|
|
* Resume queue processing
|
|
*/
|
|
public function resume_queue()
|
|
{
|
|
$this->redis->del(self::REDIS_PREFIX . 'paused');
|
|
log_activity('Queue processing resumed');
|
|
}
|
|
|
|
/**
|
|
* Clear all queues (development/testing only)
|
|
*/
|
|
public function clear_all_queues()
|
|
{
|
|
if (ENVIRONMENT === 'production') {
|
|
throw new \Exception('Cannot clear queues in production environment');
|
|
}
|
|
|
|
$keys = [
|
|
self::REDIS_PREFIX . self::QUEUE_MAIN,
|
|
self::REDIS_PREFIX . self::QUEUE_PRIORITY,
|
|
self::REDIS_PREFIX . self::QUEUE_DELAY,
|
|
self::REDIS_PREFIX . self::QUEUE_PROCESSING,
|
|
self::REDIS_PREFIX . 'jobs',
|
|
self::REDIS_PREFIX . 'stats'
|
|
];
|
|
|
|
foreach ($keys as $key) {
|
|
$this->redis->del($key);
|
|
}
|
|
|
|
log_activity('All queues cleared (development mode)');
|
|
}
|
|
|
|
/**
|
|
* Requeue dead letter jobs
|
|
*
|
|
* @param int $limit
|
|
* @return array
|
|
*/
|
|
public function requeue_dead_letter_jobs($limit = 10)
|
|
{
|
|
$results = [
|
|
'total' => 0,
|
|
'success' => 0,
|
|
'errors' => 0
|
|
];
|
|
|
|
for ($i = 0; $i < $limit; $i++) {
|
|
$job_json = $this->redis->rPop(self::REDIS_PREFIX . self::QUEUE_DEAD_LETTER);
|
|
|
|
if (!$job_json) {
|
|
break;
|
|
}
|
|
|
|
$job = json_decode($job_json, true);
|
|
$results['total']++;
|
|
|
|
// Reset job for retry
|
|
$job['attempts'] = 0;
|
|
$job['status'] = self::STATUS_PENDING;
|
|
unset($job['error'], $job['failed_at']);
|
|
|
|
// Add back to queue
|
|
if ($job['priority'] >= self::PRIORITY_HIGH) {
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_PRIORITY, json_encode($job));
|
|
} else {
|
|
$this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_MAIN, json_encode($job));
|
|
}
|
|
|
|
$this->update_job_data($job);
|
|
$results['success']++;
|
|
|
|
log_activity("Requeued dead letter job: {$job['id']}");
|
|
}
|
|
|
|
return $results;
|
|
}
|
|
|
|
/**
|
|
* Health check for queue system
|
|
*
|
|
* @return array
|
|
*/
|
|
public function health_check()
|
|
{
|
|
$health = [
|
|
'status' => 'healthy',
|
|
'checks' => []
|
|
];
|
|
|
|
try {
|
|
// Check Redis connection
|
|
$this->redis->ping();
|
|
$health['checks']['redis'] = 'ok';
|
|
} catch (\Exception $e) {
|
|
$health['status'] = 'unhealthy';
|
|
$health['checks']['redis'] = 'failed: ' . $e->getMessage();
|
|
}
|
|
|
|
// Check queue sizes
|
|
$stats = $this->get_queue_statistics();
|
|
|
|
if ($stats['dead_letter'] > 100) {
|
|
$health['status'] = 'warning';
|
|
$health['checks']['dead_letter'] = "high count: {$stats['dead_letter']}";
|
|
} else {
|
|
$health['checks']['dead_letter'] = 'ok';
|
|
}
|
|
|
|
if ($stats['processing'] > 50) {
|
|
$health['status'] = 'warning';
|
|
$health['checks']['processing'] = "high count: {$stats['processing']}";
|
|
} else {
|
|
$health['checks']['processing'] = 'ok';
|
|
}
|
|
|
|
// Check memory usage
|
|
$memory_usage_percent = (memory_get_usage(true) / self::MEMORY_LIMIT) * 100;
|
|
|
|
if ($memory_usage_percent > 80) {
|
|
$health['status'] = 'warning';
|
|
$health['checks']['memory'] = "high usage: {$memory_usage_percent}%";
|
|
} else {
|
|
$health['checks']['memory'] = 'ok';
|
|
}
|
|
|
|
return $health;
|
|
}
|
|
} |