/** * Descomplicar® Crescimento Digital * https://descomplicar.pt */ redis = $redis; $this->model = $model; $this->entity_mapping = $entity_mapping; $this->error_handler = $error_handler; $this->retry_handler = $retry_handler; // Set memory and time limits ini_set('memory_limit', '512M'); set_time_limit(self::TIME_LIMIT); log_activity('Enhanced QueueProcessor initialized with dependency injection'); } /** * Add item to sync queue * * @param string $entity_type * @param int $entity_id * @param string $action * @param string $direction * @param int $priority * @param array $data * @param int $delay_seconds * @return string|false Queue job ID */ public function add_to_queue($entity_type, $entity_id, $action, $direction = 'perfex_to_moloni', $priority = self::PRIORITY_NORMAL, $data = [], $delay_seconds = 0) { // Validate parameters if (!$this->validate_queue_params($entity_type, $action, $direction, $priority)) { return false; } // Generate unique job ID $job_id = $this->generate_job_id($entity_type, $entity_id, $action); // Check for duplicate pending job if ($this->is_job_pending($job_id)) { log_activity("Job {$job_id} already pending, updating priority if higher"); return $this->update_job_priority($job_id, $priority) ? $job_id : false; } // Create job data $job_data = [ 'id' => $job_id, 'entity_type' => $entity_type, 'entity_id' => $entity_id, 'action' => $action, 'direction' => $direction, 'priority' => $priority, 'data' => $data, 'attempts' => 0, 'max_attempts' => self::MAX_ATTEMPTS, 'created_at' => time(), 'scheduled_at' => time() + $delay_seconds, 'status' => self::STATUS_PENDING, 'processing_node' => gethostname() ]; $job_json = json_encode($job_data); try { // Add to appropriate queue if ($delay_seconds > 0) { // Add to delay queue with score as execution time $this->redis->zAdd(self::REDIS_PREFIX . self::QUEUE_DELAY, $job_data['scheduled_at'], $job_json); } elseif ($priority >= self::PRIORITY_HIGH) { // Add to priority queue $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_PRIORITY, $job_json); } else { // Add to main queue $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_MAIN, $job_json); } // Store job data for tracking $this->redis->hSet(self::REDIS_PREFIX . 'jobs', $job_id, $job_json); // Update statistics $this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_queued', 1); $this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', "queued_{$entity_type}", 1); log_activity("Added {$entity_type} #{$entity_id} to sync queue: {$job_id} (priority: {$priority})"); return $job_id; } catch (\Exception $e) { $this->error_handler->log_error('queue', 'QUEUE_ADD_FAILED', $e->getMessage(), [ 'entity_type' => $entity_type, 'entity_id' => $entity_id, 'job_id' => $job_id ]); return false; } } /** * Process queue items * * @param int $limit * @param int $time_limit * @return array */ public function process_queue($limit = self::BATCH_SIZE, $time_limit = self::TIME_LIMIT) { $start_time = microtime(true); $processed = 0; $success = 0; $errors = 0; $details = []; try { // Check if queue processing is paused if ($this->is_queue_paused()) { return [ 'processed' => 0, 'success' => 0, 'errors' => 0, 'message' => 'Queue processing is paused', 'execution_time' => 0 ]; } // Move delayed jobs to main queue if ready $this->process_delayed_jobs(); // Process jobs while ($processed < $limit && (microtime(true) - $start_time) < ($time_limit - 30)) { $job = $this->get_next_job(); if (!$job) { break; // No more jobs } // Check memory usage if (memory_get_usage(true) > self::MEMORY_LIMIT) { log_message('warning', 'Memory limit approaching, stopping queue processing'); break; } $result = $this->process_job($job); $processed++; if ($result['success']) { $success++; } else { $errors++; } $details[] = [ 'job_id' => $job['id'], 'entity_type' => $job['entity_type'], 'entity_id' => $job['entity_id'], 'action' => $job['action'], 'direction' => $job['direction'], 'success' => $result['success'], 'message' => $result['message'], 'execution_time' => $result['execution_time'] ?? 0 ]; } $execution_time = microtime(true) - $start_time; // Update statistics $this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_processed', $processed); $this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_success', $success); $this->redis->hIncrBy(self::REDIS_PREFIX . 'stats', 'total_errors', $errors); log_activity("Queue processing completed: {$processed} processed, {$success} success, {$errors} errors in {$execution_time}s"); return [ 'processed' => $processed, 'success' => $success, 'errors' => $errors, 'details' => $details, 'execution_time' => $execution_time ]; } catch (\Exception $e) { $this->error_handler->log_error('queue', 'QUEUE_PROCESSING_FAILED', $e->getMessage()); return [ 'processed' => $processed, 'success' => $success, 'errors' => $errors + 1, 'message' => $e->getMessage(), 'execution_time' => microtime(true) - $start_time ]; } } /** * Get next job from queue * * @return array|null */ protected function get_next_job() { // First check priority queue $job_json = $this->redis->rPop(self::REDIS_PREFIX . self::QUEUE_PRIORITY); // Then check main queue if (!$job_json) { $job_json = $this->redis->rPop(self::REDIS_PREFIX . self::QUEUE_MAIN); } if (!$job_json) { return null; } $job = json_decode($job_json, true); // Move to processing queue $this->redis->hSet(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id'], $job_json); $this->redis->expire(self::REDIS_PREFIX . self::QUEUE_PROCESSING, self::PROCESSING_TIMEOUT); return $job; } /** * Process single job * * @param array $job * @return array */ protected function process_job($job) { $start_time = microtime(true); try { // Update job status $job['status'] = self::STATUS_PROCESSING; $job['started_at'] = time(); $job['attempts']++; $this->update_job_data($job); // Execute sync operation $result = $this->execute_sync_operation($job); if ($result['success']) { // Mark as completed $job['status'] = self::STATUS_COMPLETED; $job['completed_at'] = time(); $job['result'] = $result; $this->complete_job($job); log_activity("Job {$job['id']} processed successfully: {$job['entity_type']} #{$job['entity_id']} {$job['action']}"); return [ 'success' => true, 'message' => $result['message'], 'execution_time' => microtime(true) - $start_time ]; } else { throw new \Exception($result['message']); } } catch (\Exception $e) { $execution_time = microtime(true) - $start_time; if ($job['attempts'] >= $job['max_attempts']) { // Move to dead letter queue $job['status'] = self::STATUS_FAILED; $job['failed_at'] = time(); $job['error'] = $e->getMessage(); $this->move_to_dead_letter_queue($job); log_message('error', "Job {$job['id']} failed permanently after {$job['attempts']} attempts: " . $e->getMessage()); return [ 'success' => false, 'message' => "Failed permanently: " . $e->getMessage(), 'execution_time' => $execution_time ]; } else { // Schedule retry with exponential backoff $retry_delay = $this->retry_handler->calculate_retry_delay($job['attempts']); $job['status'] = self::STATUS_RETRYING; $job['retry_at'] = time() + $retry_delay; $job['last_error'] = $e->getMessage(); $this->schedule_retry($job, $retry_delay); log_message('info', "Job {$job['id']} scheduled for retry #{$job['attempts']} in {$retry_delay}s: " . $e->getMessage()); return [ 'success' => false, 'message' => "Retry #{$job['attempts']} scheduled: " . $e->getMessage(), 'execution_time' => $execution_time ]; } } } /** * Execute sync operation * * @param array $job * @return array */ protected function execute_sync_operation($job) { // Load appropriate sync service $sync_service = $this->get_sync_service($job['entity_type']); if (!$sync_service) { throw new \Exception("No sync service available for entity type: {$job['entity_type']}"); } // Execute sync based on direction switch ($job['direction']) { case 'perfex_to_moloni': return $sync_service->sync_perfex_to_moloni($job['entity_id'], $job['action'] === 'update', $job['data']); case 'moloni_to_perfex': return $sync_service->sync_moloni_to_perfex($job['entity_id'], $job['action'] === 'update', $job['data']); case 'bidirectional': // Handle bidirectional sync with conflict detection return $this->handle_bidirectional_sync($sync_service, $job); default: throw new \Exception("Unknown sync direction: {$job['direction']}"); } } /** * Handle bidirectional sync with conflict detection * * @param object $sync_service * @param array $job * @return array */ protected function handle_bidirectional_sync($sync_service, $job) { // Get entity mapping $mapping = $this->entity_mapping->get_mapping_by_perfex_id($job['entity_type'], $job['entity_id']); if (!$mapping) { // No mapping exists, sync from Perfex to Moloni return $sync_service->sync_perfex_to_moloni($job['entity_id'], false, $job['data']); } // Check for conflicts $conflict_check = $sync_service->check_sync_conflicts($mapping); if ($conflict_check['has_conflict']) { // Mark mapping as conflict and require manual resolution $this->entity_mapping->update_mapping_status($mapping->id, EntityMappingService::STATUS_CONFLICT, $conflict_check['conflict_details']); return [ 'success' => false, 'message' => 'Sync conflict detected, manual resolution required', 'conflict_details' => $conflict_check['conflict_details'] ]; } // Determine sync direction based on modification timestamps $sync_direction = $this->determine_sync_direction($mapping, $job); if ($sync_direction === 'perfex_to_moloni') { return $sync_service->sync_perfex_to_moloni($job['entity_id'], true, $job['data']); } else { return $sync_service->sync_moloni_to_perfex($mapping->moloni_id, true, $job['data']); } } /** * Determine sync direction based on timestamps * * @param object $mapping * @param array $job * @return string */ protected function determine_sync_direction($mapping, $job) { $perfex_modified = strtotime($mapping->last_sync_perfex ?: '1970-01-01'); $moloni_modified = strtotime($mapping->last_sync_moloni ?: '1970-01-01'); // If one side was never synced, sync from the other if ($perfex_modified === false || $perfex_modified < 1) { return 'moloni_to_perfex'; } if ($moloni_modified === false || $moloni_modified < 1) { return 'perfex_to_moloni'; } // Sync from most recently modified return $perfex_modified > $moloni_modified ? 'perfex_to_moloni' : 'moloni_to_perfex'; } /** * Get sync service for entity type * * @param string $entity_type * @return object|null */ protected function get_sync_service($entity_type) { $service_class = null; switch ($entity_type) { case EntityMappingService::ENTITY_CUSTOMER: $service_class = 'DeskMoloni\\Libraries\\ClientSyncService'; break; case EntityMappingService::ENTITY_PRODUCT: $service_class = 'DeskMoloni\\Libraries\\ProductSyncService'; break; case EntityMappingService::ENTITY_INVOICE: $service_class = 'DeskMoloni\\Libraries\\InvoiceSyncService'; break; case EntityMappingService::ENTITY_ESTIMATE: $service_class = 'DeskMoloni\\Libraries\\EstimateSyncService'; break; case EntityMappingService::ENTITY_CREDIT_NOTE: $service_class = 'DeskMoloni\\Libraries\\CreditNoteSyncService'; break; } if ($service_class && class_exists($service_class)) { return new $service_class(); } return null; } /** * Complete job successfully * * @param array $job */ protected function complete_job($job) { // Remove from processing queue $this->redis->hDel(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id']); // Update job data $this->update_job_data($job); // Set expiration for completed job (7 days) $this->redis->expire(self::REDIS_PREFIX . 'jobs:' . $job['id'], 7 * 24 * 3600); } /** * Schedule job retry * * @param array $job * @param int $delay_seconds */ protected function schedule_retry($job, $delay_seconds) { // Remove from processing queue $this->redis->hDel(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id']); // Add to delay queue $this->redis->zAdd(self::REDIS_PREFIX . self::QUEUE_DELAY, time() + $delay_seconds, json_encode($job)); // Update job data $this->update_job_data($job); } /** * Move job to dead letter queue * * @param array $job */ protected function move_to_dead_letter_queue($job) { // Remove from processing queue $this->redis->hDel(self::REDIS_PREFIX . self::QUEUE_PROCESSING, $job['id']); // Add to dead letter queue $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_DEAD_LETTER, json_encode($job)); // Update job data $this->update_job_data($job); // Log to error handler $this->error_handler->log_error('queue', 'JOB_DEAD_LETTER', 'Job moved to dead letter queue', [ 'job_id' => $job['id'], 'entity_type' => $job['entity_type'], 'entity_id' => $job['entity_id'], 'attempts' => $job['attempts'], 'error' => $job['error'] ?? 'Unknown error' ]); } /** * Process delayed jobs that are ready */ protected function process_delayed_jobs() { $current_time = time(); // Get jobs that are ready to process $ready_jobs = $this->redis->zRangeByScore( self::REDIS_PREFIX . self::QUEUE_DELAY, 0, $current_time, ['limit' => [0, 100]] ); foreach ($ready_jobs as $job_json) { $job = json_decode($job_json, true); // Remove from delay queue $this->redis->zRem(self::REDIS_PREFIX . self::QUEUE_DELAY, $job_json); // Add to appropriate queue based on priority if ($job['priority'] >= self::PRIORITY_HIGH) { $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_PRIORITY, $job_json); } else { $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_MAIN, $job_json); } } } /** * Update job data in Redis * * @param array $job */ protected function update_job_data($job) { $this->redis->hSet(self::REDIS_PREFIX . 'jobs', $job['id'], json_encode($job)); } /** * Generate unique job ID * * @param string $entity_type * @param int $entity_id * @param string $action * @return string */ protected function generate_job_id($entity_type, $entity_id, $action) { return "{$entity_type}_{$entity_id}_{$action}_" . uniqid(); } /** * Check if job is already pending * * @param string $job_id * @return bool */ protected function is_job_pending($job_id) { return $this->redis->hExists(self::REDIS_PREFIX . 'jobs', $job_id); } /** * Update job priority * * @param string $job_id * @param int $new_priority * @return bool */ protected function update_job_priority($job_id, $new_priority) { $job_json = $this->redis->hGet(self::REDIS_PREFIX . 'jobs', $job_id); if (!$job_json) { return false; } $job = json_decode($job_json, true); if ($new_priority <= $job['priority']) { return true; // No update needed } $job['priority'] = $new_priority; $this->update_job_data($job); return true; } /** * Validate queue parameters * * @param string $entity_type * @param string $action * @param string $direction * @param int $priority * @return bool */ protected function validate_queue_params($entity_type, $action, $direction, $priority) { $valid_entities = [ EntityMappingService::ENTITY_CUSTOMER, EntityMappingService::ENTITY_PRODUCT, EntityMappingService::ENTITY_INVOICE, EntityMappingService::ENTITY_ESTIMATE, EntityMappingService::ENTITY_CREDIT_NOTE ]; $valid_actions = ['create', 'update', 'delete']; $valid_directions = ['perfex_to_moloni', 'moloni_to_perfex', 'bidirectional']; $valid_priorities = [self::PRIORITY_LOW, self::PRIORITY_NORMAL, self::PRIORITY_HIGH, self::PRIORITY_CRITICAL]; if (!in_array($entity_type, $valid_entities)) { log_message('error', "Invalid entity type: {$entity_type}"); return false; } if (!in_array($action, $valid_actions)) { log_message('error', "Invalid action: {$action}"); return false; } if (!in_array($direction, $valid_directions)) { log_message('error', "Invalid direction: {$direction}"); return false; } if (!in_array($priority, $valid_priorities)) { log_message('error', "Invalid priority: {$priority}"); return false; } return true; } /** * Get queue statistics * * @return array */ public function get_queue_statistics() { $stats = $this->redis->hGetAll(self::REDIS_PREFIX . 'stats'); return [ 'pending_main' => $this->redis->lLen(self::REDIS_PREFIX . self::QUEUE_MAIN), 'pending_priority' => $this->redis->lLen(self::REDIS_PREFIX . self::QUEUE_PRIORITY), 'delayed' => $this->redis->zCard(self::REDIS_PREFIX . self::QUEUE_DELAY), 'processing' => $this->redis->hLen(self::REDIS_PREFIX . self::QUEUE_PROCESSING), 'dead_letter' => $this->redis->lLen(self::REDIS_PREFIX . self::QUEUE_DEAD_LETTER), 'total_queued' => (int)($stats['total_queued'] ?? 0), 'total_processed' => (int)($stats['total_processed'] ?? 0), 'total_success' => (int)($stats['total_success'] ?? 0), 'total_errors' => (int)($stats['total_errors'] ?? 0), 'success_rate' => $this->calculate_success_rate($stats), 'memory_usage' => memory_get_usage(true), 'peak_memory' => memory_get_peak_usage(true) ]; } /** * Calculate success rate * * @param array $stats * @return float */ protected function calculate_success_rate($stats) { $total_processed = (int)($stats['total_processed'] ?? 0); $total_success = (int)($stats['total_success'] ?? 0); return $total_processed > 0 ? round(($total_success / $total_processed) * 100, 2) : 0; } /** * Check if queue is paused * * @return bool */ public function is_queue_paused() { return $this->redis->get(self::REDIS_PREFIX . 'paused') === '1'; } /** * Pause queue processing */ public function pause_queue() { $this->redis->set(self::REDIS_PREFIX . 'paused', '1'); log_activity('Queue processing paused'); } /** * Resume queue processing */ public function resume_queue() { $this->redis->del(self::REDIS_PREFIX . 'paused'); log_activity('Queue processing resumed'); } /** * Clear all queues (development/testing only) */ public function clear_all_queues() { if (ENVIRONMENT === 'production') { throw new \Exception('Cannot clear queues in production environment'); } $keys = [ self::REDIS_PREFIX . self::QUEUE_MAIN, self::REDIS_PREFIX . self::QUEUE_PRIORITY, self::REDIS_PREFIX . self::QUEUE_DELAY, self::REDIS_PREFIX . self::QUEUE_PROCESSING, self::REDIS_PREFIX . 'jobs', self::REDIS_PREFIX . 'stats' ]; foreach ($keys as $key) { $this->redis->del($key); } log_activity('All queues cleared (development mode)'); } /** * Requeue dead letter jobs * * @param int $limit * @return array */ public function requeue_dead_letter_jobs($limit = 10) { $results = [ 'total' => 0, 'success' => 0, 'errors' => 0 ]; for ($i = 0; $i < $limit; $i++) { $job_json = $this->redis->rPop(self::REDIS_PREFIX . self::QUEUE_DEAD_LETTER); if (!$job_json) { break; } $job = json_decode($job_json, true); $results['total']++; // Reset job for retry $job['attempts'] = 0; $job['status'] = self::STATUS_PENDING; unset($job['error'], $job['failed_at']); // Add back to queue if ($job['priority'] >= self::PRIORITY_HIGH) { $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_PRIORITY, json_encode($job)); } else { $this->redis->lPush(self::REDIS_PREFIX . self::QUEUE_MAIN, json_encode($job)); } $this->update_job_data($job); $results['success']++; log_activity("Requeued dead letter job: {$job['id']}"); } return $results; } /** * Health check for queue system * * @return array */ public function health_check() { $health = [ 'status' => 'healthy', 'checks' => [] ]; try { // Check Redis connection $this->redis->ping(); $health['checks']['redis'] = 'ok'; } catch (\Exception $e) { $health['status'] = 'unhealthy'; $health['checks']['redis'] = 'failed: ' . $e->getMessage(); } // Check queue sizes $stats = $this->get_queue_statistics(); if ($stats['dead_letter'] > 100) { $health['status'] = 'warning'; $health['checks']['dead_letter'] = "high count: {$stats['dead_letter']}"; } else { $health['checks']['dead_letter'] = 'ok'; } if ($stats['processing'] > 50) { $health['status'] = 'warning'; $health['checks']['processing'] = "high count: {$stats['processing']}"; } else { $health['checks']['processing'] = 'ok'; } // Check memory usage $memory_usage_percent = (memory_get_usage(true) / self::MEMORY_LIMIT) * 100; if ($memory_usage_percent > 80) { $health['status'] = 'warning'; $health['checks']['memory'] = "high usage: {$memory_usage_percent}%"; } else { $health['checks']['memory'] = 'ok'; } return $health; } }