Files
desk-moloni/modules/desk_moloni/libraries/TaskWorker.php
Emanuel Almeida 8c4f68576f chore: add spec-kit and standardize signatures
- Added GitHub spec-kit for development workflow
- Standardized file signatures to Descomplicar® format
- Updated development configuration

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-12 01:27:37 +01:00

603 lines
18 KiB
PHP

/**
* Descomplicar® Crescimento Digital
* https://descomplicar.pt
*/
<?php
defined('BASEPATH') or exit('No direct script access allowed');
/**
* Task Worker Library
*
* Handles concurrent task execution for the queue processing system
* Provides worker management, task execution, and concurrency control
*
* @package DeskMoloni
* @subpackage Libraries
* @version 3.0.0
* @author Descomplicar®
*/
class TaskWorker
{
private $CI;
private $worker_id;
private $is_running = false;
private $current_task = null;
private $memory_limit;
private $execution_timeout;
private $max_tasks_per_worker = 100;
private $task_count = 0;
// Worker coordination
private $worker_lock_file;
private $worker_pid;
private $heartbeat_interval = 30; // seconds
// Task handlers
private $task_handlers = [];
/**
* Constructor - Initialize worker
*/
public function __construct()
{
$this->CI = &get_instance();
// Load required models and libraries
$this->CI->load->model('desk_moloni/desk_moloni_sync_queue_model', 'sync_queue_model');
$this->CI->load->model('desk_moloni/desk_moloni_sync_log_model', 'sync_log_model');
$this->CI->load->library('desk_moloni/moloni_api_client');
$this->CI->load->library('desk_moloni/client_sync_service');
$this->CI->load->library('desk_moloni/invoice_sync_service');
// Generate unique worker ID
$this->worker_id = uniqid('worker_', true);
$this->worker_pid = getmypid();
// Set memory and execution limits
$this->memory_limit = $this->convert_to_bytes(ini_get('memory_limit'));
$this->execution_timeout = (int) get_option('desk_moloni_worker_timeout', 300); // 5 minutes default
// Initialize worker lock file
$this->worker_lock_file = APPPATH . "logs/desk_moloni_worker_{$this->worker_id}.lock";
// Register task handlers
$this->register_task_handlers();
// Register shutdown handler
register_shutdown_function([$this, 'shutdown_handler']);
log_message('info', "TaskWorker {$this->worker_id} initialized with PID {$this->worker_pid}");
}
/**
* Start the worker process
*
* @param array $options Worker configuration options
* @return void
*/
public function start($options = [])
{
$this->is_running = true;
// Process options
if (isset($options['max_tasks'])) {
$this->max_tasks_per_worker = (int) $options['max_tasks'];
}
// Create worker lock file
$this->create_lock_file();
log_message('info', "TaskWorker {$this->worker_id} starting...");
try {
$this->worker_loop();
} catch (Exception $e) {
log_message('error', "TaskWorker {$this->worker_id} error: " . $e->getMessage());
} finally {
$this->cleanup();
}
}
/**
* Stop the worker process
*/
public function stop()
{
$this->is_running = false;
log_message('info', "TaskWorker {$this->worker_id} stopping...");
}
/**
* Main worker loop
*/
private function worker_loop()
{
$last_heartbeat = time();
while ($this->is_running && $this->task_count < $this->max_tasks_per_worker) {
// Check memory usage
if ($this->is_memory_limit_exceeded()) {
log_message('warning', "TaskWorker {$this->worker_id} memory limit exceeded, stopping");
break;
}
// Update heartbeat
if (time() - $last_heartbeat >= $this->heartbeat_interval) {
$this->update_heartbeat();
$last_heartbeat = time();
}
// Get next task from queue
$task = $this->CI->sync_queue_model->get_next_task($this->worker_id);
if (!$task) {
// No tasks available, sleep briefly
sleep(1);
continue;
}
// Execute task
$this->execute_task($task);
$this->task_count++;
// Brief pause between tasks
usleep(100000); // 0.1 second
}
log_message('info', "TaskWorker {$this->worker_id} completed {$this->task_count} tasks");
}
/**
* Execute a single task
*
* @param array $task Task data
*/
private function execute_task($task)
{
$this->current_task = $task;
$start_time = microtime(true);
try {
// Update task status to processing
$this->CI->sync_queue_model->update_task_status($task['id'], 'processing', [
'worker_id' => $this->worker_id,
'started_at' => date('Y-m-d H:i:s'),
'pid' => $this->worker_pid
]);
log_message('info', "TaskWorker {$this->worker_id} executing task {$task['id']} ({$task['task_type']})");
// Set execution timeout
set_time_limit($this->execution_timeout);
// Get appropriate task handler
$handler = $this->get_task_handler($task['task_type']);
if (!$handler) {
throw new Exception("No handler found for task type: {$task['task_type']}");
}
// Execute task
$result = call_user_func($handler, $task);
$execution_time = microtime(true) - $start_time;
// Update task as completed
$this->CI->sync_queue_model->update_task_status($task['id'], 'completed', [
'completed_at' => date('Y-m-d H:i:s'),
'execution_time' => $execution_time,
'result' => json_encode($result),
'worker_id' => $this->worker_id
]);
// Log successful execution
$this->CI->sync_log_model->log_event([
'task_id' => $task['id'],
'event_type' => 'task_completed',
'entity_type' => $task['entity_type'],
'entity_id' => $task['entity_id'],
'message' => "Task executed successfully by worker {$this->worker_id}",
'execution_time' => $execution_time,
'worker_id' => $this->worker_id
]);
log_message('info', "TaskWorker {$this->worker_id} completed task {$task['id']} in " .
number_format($execution_time, 3) . "s");
} catch (Exception $e) {
$execution_time = microtime(true) - $start_time;
// Update task as failed
$this->CI->sync_queue_model->update_task_status($task['id'], 'failed', [
'failed_at' => date('Y-m-d H:i:s'),
'error_message' => $e->getMessage(),
'execution_time' => $execution_time,
'worker_id' => $this->worker_id,
'retry_count' => ($task['retry_count'] ?? 0) + 1
]);
// Log error
$this->CI->sync_log_model->log_event([
'task_id' => $task['id'],
'event_type' => 'task_failed',
'entity_type' => $task['entity_type'],
'entity_id' => $task['entity_id'],
'message' => "Task failed: " . $e->getMessage(),
'log_level' => 'error',
'execution_time' => $execution_time,
'worker_id' => $this->worker_id
]);
log_message('error', "TaskWorker {$this->worker_id} failed task {$task['id']}: " . $e->getMessage());
// Schedule retry if appropriate
$this->schedule_retry($task, $e);
}
$this->current_task = null;
}
/**
* Register task handlers
*/
private function register_task_handlers()
{
$this->task_handlers = [
'client_sync' => [$this, 'handle_client_sync'],
'invoice_sync' => [$this, 'handle_invoice_sync'],
'oauth_refresh' => [$this, 'handle_oauth_refresh'],
'cleanup' => [$this, 'handle_cleanup'],
'notification' => [$this, 'handle_notification'],
'bulk_sync' => [$this, 'handle_bulk_sync'],
'data_validation' => [$this, 'handle_data_validation'],
'mapping_discovery' => [$this, 'handle_mapping_discovery']
];
}
/**
* Get task handler for task type
*
* @param string $task_type Task type
* @return callable|null Handler function
*/
private function get_task_handler($task_type)
{
return $this->task_handlers[$task_type] ?? null;
}
/**
* Handle client synchronization task
*
* @param array $task Task data
* @return array Result
*/
private function handle_client_sync($task)
{
$client_id = $task['entity_id'];
$payload = json_decode($task['payload'], true) ?? [];
return $this->CI->client_sync_service->sync_client($client_id, $payload);
}
/**
* Handle invoice synchronization task
*
* @param array $task Task data
* @return array Result
*/
private function handle_invoice_sync($task)
{
$invoice_id = $task['entity_id'];
$payload = json_decode($task['payload'], true) ?? [];
return $this->CI->invoice_sync_service->sync_invoice($invoice_id, $payload);
}
/**
* Handle OAuth token refresh task
*
* @param array $task Task data
* @return array Result
*/
private function handle_oauth_refresh($task)
{
$this->CI->load->library('desk_moloni/moloni_oauth');
$success = $this->CI->moloni_oauth->refresh_access_token();
return [
'success' => $success,
'refreshed_at' => date('Y-m-d H:i:s')
];
}
/**
* Handle cleanup task
*
* @param array $task Task data
* @return array Result
*/
private function handle_cleanup($task)
{
$payload = json_decode($task['payload'], true) ?? [];
$cleanup_type = $payload['type'] ?? 'general';
$cleaned = 0;
switch ($cleanup_type) {
case 'logs':
$days = $payload['days'] ?? 30;
$cleaned = $this->CI->sync_log_model->cleanup_old_logs($days);
break;
case 'queue':
$status = $payload['status'] ?? 'completed';
$cleaned = $this->CI->sync_queue_model->cleanup_old_tasks($status);
break;
default:
// General cleanup
$cleaned += $this->CI->sync_log_model->cleanup_old_logs(30);
$cleaned += $this->CI->sync_queue_model->cleanup_old_tasks('completed');
}
return [
'cleanup_type' => $cleanup_type,
'items_cleaned' => $cleaned
];
}
/**
* Handle notification task
*
* @param array $task Task data
* @return array Result
*/
private function handle_notification($task)
{
// Placeholder for notification handling
return [
'notification_sent' => false,
'message' => 'Notification handling not yet implemented'
];
}
/**
* Handle bulk synchronization task
*
* @param array $task Task data
* @return array Result
*/
private function handle_bulk_sync($task)
{
$payload = json_decode($task['payload'], true) ?? [];
$entity_type = $payload['entity_type'] ?? 'all';
$batch_size = $payload['batch_size'] ?? 50;
$processed = 0;
$errors = 0;
// Implementation would depend on entity type
// For now, return a placeholder result
return [
'entity_type' => $entity_type,
'batch_size' => $batch_size,
'processed' => $processed,
'errors' => $errors
];
}
/**
* Handle data validation task
*
* @param array $task Task data
* @return array Result
*/
private function handle_data_validation($task)
{
// Placeholder for data validation
return [
'validated' => true,
'issues_found' => 0
];
}
/**
* Handle mapping discovery task
*
* @param array $task Task data
* @return array Result
*/
private function handle_mapping_discovery($task)
{
$payload = json_decode($task['payload'], true) ?? [];
$entity_type = $payload['entity_type'] ?? 'client';
$this->CI->load->model('desk_moloni/desk_moloni_mapping_model', 'mapping_model');
$discovered_mappings = $this->CI->mapping_model->discover_mappings($entity_type, true);
return [
'entity_type' => $entity_type,
'discovered_count' => count($discovered_mappings),
'mappings' => $discovered_mappings
];
}
/**
* Schedule task retry
*
* @param array $task Task data
* @param Exception $error Error that caused failure
*/
private function schedule_retry($task, $error)
{
$retry_count = ($task['retry_count'] ?? 0) + 1;
$max_retries = (int) get_option('desk_moloni_max_retries', 3);
if ($retry_count <= $max_retries) {
// Calculate backoff delay
$delay = min(pow(2, $retry_count) * 60, 3600); // Exponential backoff, max 1 hour
$this->CI->sync_queue_model->schedule_retry($task['id'], $delay);
log_message('info', "TaskWorker {$this->worker_id} scheduled retry {$retry_count}/{$max_retries} " .
"for task {$task['id']} in {$delay}s");
} else {
log_message('warning', "TaskWorker {$this->worker_id} task {$task['id']} exceeded max retries");
}
}
/**
* Create worker lock file
*/
private function create_lock_file()
{
$lock_data = [
'worker_id' => $this->worker_id,
'pid' => $this->worker_pid,
'started_at' => date('Y-m-d H:i:s'),
'last_heartbeat' => time()
];
file_put_contents($this->worker_lock_file, json_encode($lock_data));
}
/**
* Update worker heartbeat
*/
private function update_heartbeat()
{
if (file_exists($this->worker_lock_file)) {
$lock_data = json_decode(file_get_contents($this->worker_lock_file), true);
$lock_data['last_heartbeat'] = time();
$lock_data['task_count'] = $this->task_count;
$lock_data['current_task'] = $this->current_task['id'] ?? null;
file_put_contents($this->worker_lock_file, json_encode($lock_data));
}
}
/**
* Check if memory limit is exceeded
*
* @return bool Memory limit exceeded
*/
private function is_memory_limit_exceeded()
{
if ($this->memory_limit === -1) {
return false; // No memory limit
}
$current_usage = memory_get_usage(true);
$percentage = ($current_usage / $this->memory_limit) * 100;
return $percentage > 80; // Stop at 80% memory usage
}
/**
* Convert memory limit to bytes
*
* @param string $val Memory limit string
* @return int Bytes
*/
private function convert_to_bytes($val)
{
if ($val === '-1') {
return -1;
}
$val = trim($val);
$last = strtolower($val[strlen($val) - 1]);
$val = (int) $val;
switch ($last) {
case 'g':
$val *= 1024;
case 'm':
$val *= 1024;
case 'k':
$val *= 1024;
}
return $val;
}
/**
* Cleanup worker resources
*/
private function cleanup()
{
// Remove lock file
if (file_exists($this->worker_lock_file)) {
unlink($this->worker_lock_file);
}
// Release any pending tasks assigned to this worker
if ($this->current_task) {
$this->CI->sync_queue_model->release_task($this->current_task['id']);
}
log_message('info', "TaskWorker {$this->worker_id} cleanup completed");
}
/**
* Shutdown handler
*/
public function shutdown_handler()
{
if ($this->is_running) {
log_message('warning', "TaskWorker {$this->worker_id} unexpected shutdown");
$this->cleanup();
}
}
/**
* Get worker status
*
* @return array Worker status
*/
public function get_status()
{
$status = [
'worker_id' => $this->worker_id,
'pid' => $this->worker_pid,
'is_running' => $this->is_running,
'task_count' => $this->task_count,
'max_tasks' => $this->max_tasks_per_worker,
'current_task' => $this->current_task,
'memory_usage' => memory_get_usage(true),
'memory_limit' => $this->memory_limit,
'execution_timeout' => $this->execution_timeout
];
if (file_exists($this->worker_lock_file)) {
$lock_data = json_decode(file_get_contents($this->worker_lock_file), true);
$status['lock_data'] = $lock_data;
}
return $status;
}
/**
* Check if worker is healthy
*
* @return bool Worker health status
*/
public function is_healthy()
{
// Check if lock file exists and is recent
if (!file_exists($this->worker_lock_file)) {
return false;
}
$lock_data = json_decode(file_get_contents($this->worker_lock_file), true);
$last_heartbeat = $lock_data['last_heartbeat'] ?? 0;
// Worker is healthy if heartbeat is within 2 intervals
return (time() - $last_heartbeat) < ($this->heartbeat_interval * 2);
}
}