#!/usr/bin/env python3 """ wp-translate-ptpt.py Sistema eficiente de traduções WordPress PT-PT. Author: Descomplicar® Crescimento Digital Date: 2026-02-23 Version: 1.1.0 """ import os import sys import re import json import time import sqlite3 import hashlib import argparse import subprocess import shutil from pathlib import Path from typing import Dict, List, Tuple, Optional from dataclasses import dataclass, field from urllib.request import Request, urlopen from urllib.error import URLError # Version __version__ = "1.1.0" # ============================================================================= # Data Classes # ============================================================================= @dataclass class PoEntry: """Represents a .po file entry.""" msgid: str = "" msgid_plural: str = "" msgstr: str = "" msgstr_plural: Dict[int, str] = field(default_factory=dict) comments: List[str] = field(default_factory=list) line_start: int = 0 msgid_line: int = 0 msgstr_line: int = 0 @dataclass class ProcessResult: """Result of processing a .po file.""" success: bool total: int = 0 translated: int = 0 cached: int = 0 brands_fixed: int = 0 errors: List[Dict] = field(default_factory=list) error: str = "" # ============================================================================= # PT-BR to PT-PT Conversion Rules # ============================================================================= PTBR_TO_PTPT = { # Verbs r'\bsalvar\b': 'guardar', r'\bsalvo\b': 'guardado', r'\bsalva\b': 'guardada', r'\bdeletar\b': 'eliminar', r'\bdeletado\b': 'eliminado', r'\bdeletada\b': 'eliminada', r'\bgerenciar\b': 'gerir', r'\bgerenciamento\b': 'gestão', r'\bgerenciado\b': 'gerido', r'\bhabilitar\b': 'activar', r'\bhabilitado\b': 'activado', r'\bhabilitada\b': 'activada', r'\bdesabilitar\b': 'desactivar', r'\bdesabilitado\b': 'desactivado', r'\bdesabilitada\b': 'desactivada', r'\bacessar\b': 'aceder', r'\bacessado\b': 'acedido', r'\bbaixar\b': 'transferir', r'\bcadastrar\b': 'registar', r'\bcadastro\b': 'registo', r'\bcadastrado\b': 'registado', r'\bcompartilhar\b': 'partilhar', r'\bcompartilhado\b': 'partilhado', r'\bvisualizar\b': 'pré-visualizar', # Nouns r'\bsenha\b': 'palavra-passe', r'\bsenhas\b': 'palavras-passe', r'\barquivo\b': 'ficheiro', r'\barquivos\b': 'ficheiros', r'\btela\b': 'ecrã', r'\btelas\b': 'ecrãs', r'\bcelular\b': 'telemóvel', r'\busuário\b': 'utilizador', r'\busuários\b': 'utilizadores', r'\bconfiguração\b': 'definição', r'\bconfigurações\b': 'definições', r'\blixeira\b': 'lixo', r'\bequipe\b': 'equipa', # Orthography (consoantes mudas) r'\batualiz': 'actualiz', r'\bfatura': 'factura', r'\bselecion': 'seleccion', r'\bação\b': 'acção', r'\bações\b': 'acções', r'\bprojeto\b': 'projecto', r'\bprojetos\b': 'projectos', r'\bdireção\b': 'direcção', r'\bproteção\b': 'protecção', r'\bcoleção\b': 'colecção', r'\bcorreção\b': 'correcção', r'\bótimo\b': 'óptimo', # Gerund to infinitive r'\bprocessando\b': 'a processar', r'\bcarregando\b': 'a carregar', r'\batualizando\b': 'a actualizar', r'\bgerando\b': 'a gerar', r'\bsalvando\b': 'a guardar', r'\bdeletando\b': 'a eliminar', } def apply_ptbr_fixes(text: str) -> Tuple[str, int]: """Apply PT-BR to PT-PT conversions.""" fixed = text count = 0 for pattern, replacement in PTBR_TO_PTPT.items(): before = fixed fixed = re.sub(pattern, replacement, fixed, flags=re.IGNORECASE) if fixed != before: count += 1 return fixed, count # ============================================================================= # Seed Brands # ============================================================================= SEED_BRANDS = [ # 115 plugins from current library "Fluent Forms", "FluentCRM", "Fluent SMTP", "Fluent Booking", "FluentCampaign Pro", "Fluent Support", "Rank Math", "Rank Math Pro", "Element Pack", "Element Pack Lite", "Elementor", "Elementor Pro", "ElementsKit", "ElementsKit Lite", "Happy Addons", "Happy Elementor Addons", "WooCommerce", "WPForms", "WPForms Lite", "Wordfence", "UpdraftPlus", "Real Cookie Banner", "Loco Translate", "WP Fastest Cache", "Forminator", "Bit Integrations", "Bit Social", "Bit Pi", "KiviCare", "KiviCare Pro", "Astra", "Branda", "TablePress", "AI Engine", "BetterDocs", "Cookie Notice", "Docket Cache", "Envato Elements", "Email Candy Pro", "Eventin Pro", "Fast Indexing API", "FileBird", "FileBird Document Library", "GUM Elementor Addon", "HappyFiles Pro", "Insert Headers and Footers", "Iqonic Extensions", "Iqonic Layouts", "JEG Elementor Kit", "Jet Engine", "JWT Authentication", "LoginPress", "MainWP BackWPup Extension", "MetForm", "PowerPack Elements", "Print My Blog", "Product Import Export for WooCommerce", "Shipper", "SkyBoot Custom Icons", "Testimonial Pro", "Ultimate Branding", "Uncanny Automator", "WebP Express", "WholesaleX", "WooCommerce Dashboard Stats", "Woo Save Abandoned Carts", "WPConsent", "WP Defender", "WP Event Solution", "WP Hummingbird", "WP Mail SMTP", "WPMU DEV SEO", "WPMU DEV Updates", "WP Optimize", "WP Rocket", "WP Security Audit Log", "WP Smush Pro", "WPFunnels", "WPFunnels Pro", # Common services "Google", "Facebook", "Instagram", "Twitter", "LinkedIn", "PayPal", "Stripe", "Mailchimp", "Zapier", "HubSpot", "OpenAI", "ChatGPT", "YouTube", "TikTok", "Gmail", "Outlook", # WordPress core "WordPress", "Gutenberg", "Jetpack", ] # ============================================================================= # CacheManager # ============================================================================= class CacheManager: """Manages SQLite cache for translations and brands.""" # Esquema esperado: tabela -> lista de colunas obrigatorias EXPECTED_SCHEMA = { "brands": ["id", "name", "variations", "auto_detected", "confidence_score", "last_seen", "plugin_slug"], "translations": ["msgid_hash", "msgid", "msgstr", "plugin_name", "validated", "timestamp"], "corrections": ["id", "original", "corrected", "rule_applied", "plugin_name", "timestamp"], "translation_backups": ["id", "msgid_hash", "msgid", "original_msgstr", "new_msgstr", "plugin_name", "po_file", "timestamp"], } def __init__(self, db_path: str): """Initialize database connection, validate and create tables.""" self.db_path = db_path self.conn = self._safe_connect(db_path) self._init_db() self._validate_schema() def _safe_connect(self, db_path: str) -> sqlite3.Connection: """Establish connection with integrity check.""" try: conn = sqlite3.connect(db_path) # Verificar integridade da BD result = conn.execute("PRAGMA integrity_check").fetchone() if result[0] != "ok": raise sqlite3.DatabaseError( f"Base de dados corrompida ({db_path}): {result[0]}" ) # Activar WAL para melhor concorrencia conn.execute("PRAGMA journal_mode=WAL") return conn except sqlite3.DatabaseError as e: if "corrompida" in str(e): raise raise sqlite3.DatabaseError( f"Erro ao conectar a base de dados ({db_path}): {e}" ) def _init_db(self): """Create database schema.""" # Brands table self.conn.execute(""" CREATE TABLE IF NOT EXISTS brands ( id INTEGER PRIMARY KEY, name TEXT UNIQUE NOT NULL, variations TEXT, auto_detected BOOLEAN DEFAULT 0, confidence_score REAL DEFAULT 1.0, last_seen TIMESTAMP, plugin_slug TEXT ) """) # Translations cache self.conn.execute(""" CREATE TABLE IF NOT EXISTS translations ( msgid_hash TEXT PRIMARY KEY, msgid TEXT, msgstr TEXT, plugin_name TEXT, validated BOOLEAN DEFAULT 0, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Corrections history self.conn.execute(""" CREATE TABLE IF NOT EXISTS corrections ( id INTEGER PRIMARY KEY, original TEXT, corrected TEXT, rule_applied TEXT, plugin_name TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Backups de traducoes (nova tabela v1.1.0) self.conn.execute(""" CREATE TABLE IF NOT EXISTS translation_backups ( id INTEGER PRIMARY KEY, msgid_hash TEXT NOT NULL, msgid TEXT, original_msgstr TEXT, new_msgstr TEXT, plugin_name TEXT, po_file TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) self.conn.commit() def _validate_schema(self): """Validate that all expected tables and columns exist.""" for table_name, expected_cols in self.EXPECTED_SCHEMA.items(): if not self._table_exists(table_name): raise sqlite3.DatabaseError( f"Tabela '{table_name}' nao existe apos init_db. " f"BD possivelmente corrompida: {self.db_path}" ) actual_cols = self._get_columns(table_name) missing = set(expected_cols) - set(actual_cols) if missing: # Tentar adicionar colunas em falta (migracao) for col in missing: try: self.conn.execute( f"ALTER TABLE {table_name} ADD COLUMN {col} TEXT" ) except sqlite3.OperationalError: raise sqlite3.DatabaseError( f"Coluna(s) em falta na tabela '{table_name}': " f"{', '.join(missing)}. Migracao falhou." ) self.conn.commit() def _table_exists(self, table_name: str) -> bool: """Check if a table exists in the database.""" cursor = self.conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,) ) return cursor.fetchone() is not None def _get_columns(self, table_name: str) -> List[str]: """Get list of column names for a table.""" cursor = self.conn.execute(f"PRAGMA table_info({table_name})") return [row[1] for row in cursor.fetchall()] def verify_health(self) -> Dict[str, any]: """Run full health check on the database. Returns status dict.""" report = {"healthy": True, "tables": {}, "integrity": "ok", "errors": []} # Verificar integridade try: result = self.conn.execute("PRAGMA integrity_check").fetchone() report["integrity"] = result[0] if result[0] != "ok": report["healthy"] = False report["errors"].append(f"Integridade: {result[0]}") except Exception as e: report["healthy"] = False report["errors"].append(f"Erro integrity_check: {e}") # Verificar tabelas e contagens for table_name, expected_cols in self.EXPECTED_SCHEMA.items(): table_info = {"exists": False, "rows": 0, "columns_ok": False} if self._table_exists(table_name): table_info["exists"] = True try: count = self.conn.execute( f"SELECT COUNT(*) FROM {table_name}" ).fetchone()[0] table_info["rows"] = count except Exception as e: report["errors"].append(f"Erro contagem {table_name}: {e}") report["healthy"] = False actual_cols = self._get_columns(table_name) missing = set(expected_cols) - set(actual_cols) table_info["columns_ok"] = len(missing) == 0 if missing: table_info["missing_columns"] = list(missing) report["healthy"] = False else: report["healthy"] = False report["errors"].append(f"Tabela '{table_name}' nao existe") report["tables"][table_name] = table_info return report def get_cached_translation(self, msgid: str) -> Optional[str]: """Retrieve cached translation for msgid.""" msgid_hash = hashlib.md5(msgid.encode()).hexdigest() cursor = self.conn.execute( "SELECT msgstr FROM translations WHERE msgid_hash = ? AND validated = 1", (msgid_hash,) ) result = cursor.fetchone() return result[0] if result else None def save_translation(self, msgid: str, msgstr: str, plugin_name: str, validated: bool = False): """Save translation to cache.""" msgid_hash = hashlib.md5(msgid.encode()).hexdigest() self.conn.execute( """INSERT OR REPLACE INTO translations (msgid_hash, msgid, msgstr, plugin_name, validated, timestamp) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", (msgid_hash, msgid, msgstr, plugin_name, validated) ) self.conn.commit() def log_correction(self, original: str, corrected: str, rule: str, plugin_name: str): """Log a correction to history.""" self.conn.execute( """INSERT INTO corrections (original, corrected, rule_applied, plugin_name, timestamp) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)""", (original, corrected, rule, plugin_name) ) self.conn.commit() def backup_translation(self, msgid: str, original_msgstr: str, new_msgstr: str, plugin_name: str, po_file: str): """Backup the original translation before overwriting.""" if not original_msgstr or original_msgstr == new_msgstr: return msgid_hash = hashlib.md5(msgid.encode()).hexdigest() self.conn.execute( """INSERT INTO translation_backups (msgid_hash, msgid, original_msgstr, new_msgstr, plugin_name, po_file, timestamp) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", (msgid_hash, msgid, original_msgstr, new_msgstr, plugin_name, po_file) ) self.conn.commit() def close(self): """Close database connection.""" self.conn.close() # ============================================================================= # BrandProtector # ============================================================================= class BrandProtector: """Detects and protects brand names from literal translation.""" # Known literal translations LITERAL_TRANSLATIONS = { "Fluent Forms": ["Formulários Fluentes", "Formas Fluentes"], "FluentCRM": ["CRM Fluente"], "Fluent SMTP": ["SMTP Fluente"], "Fluent Booking": ["Reserva Fluente"], "Rank Math": ["Matemática de Classificação", "SEO Matemática"], "Element Pack": ["Pacote de Elementos"], "ElementsKit": ["Kit de Elementos"], "Happy Addons": ["Complementos Felizes"], "Happy Elementor Addons": ["Complementos Elementor Felizes"], "Real Cookie Banner": ["Banner de Biscoito Real", "Bandeira de Biscoito Real"], "Cookie Banner": ["Banner de Biscoito"], "Loco Translate": ["Loco Traduzir"], "WP Fastest Cache": ["Cache Mais Rápido WP"], "Bit Integrations": ["Integrações Bit"], "Bit Social": ["Social Bit"], "Wordfence": ["Cerca de Palavras"], } def __init__(self, db_path: str): """Initialize with database path.""" self.db_path = db_path self.cache = CacheManager(db_path) if db_path != ":memory:" else None self.known_brands = self._load_brands() def _load_brands(self) -> List[str]: """Load known brands from database.""" if not self.cache: return list(self.LITERAL_TRANSLATIONS.keys()) cursor = self.cache.conn.execute("SELECT name FROM brands") brands = [row[0] for row in cursor.fetchall()] return brands if brands else list(self.LITERAL_TRANSLATIONS.keys()) def detect_brand_patterns(self, text: str) -> List[str]: """Detect possible brand names using heuristics.""" candidates = [] # Pattern 1: CamelCase camel_case = re.findall(r'\b[A-Z][a-z]+(?:[A-Z][a-z]+)+\b', text) candidates.extend(camel_case) # Pattern 2: Acronyms acronyms = re.findall(r'\b[A-Z]{2,}\b', text) candidates.extend(acronyms) # Pattern 3: Trademarks trademarks = re.findall(r'(\w+(?:\s+\w+)?)\s*[®™]', text) candidates.extend(trademarks) # Pattern 4: Mid-sentence capitals mid_sentence = re.findall(r'(?<=\s)[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?=\s)', text) candidates.extend(mid_sentence) return list(set(candidates)) def calculate_confidence(self, brand: str, occurrences: int = 1) -> float: """Calculate confidence score for detected brand.""" score = 0.0 # +0.4 if CamelCase if re.match(r'^[A-Z][a-z]+(?:[A-Z][a-z]+)+$', brand): score += 0.4 # +0.3 if has trademark if any(char in brand for char in ['®', '™']): score += 0.3 # +0.1 per 5 occurrences (max 0.3) score += min(occurrences / 5 * 0.1, 0.3) return min(score, 1.0) def protect_brands(self, text: str) -> Tuple[str, Dict[str, str]]: """Replace brand names with placeholders before translation.""" placeholders = {} protected_text = text for i, brand in enumerate(self.known_brands): if brand in text: placeholder = f"__BRAND_{i}__" placeholders[placeholder] = brand protected_text = protected_text.replace(brand, placeholder) return protected_text, placeholders def restore_brands(self, text: str, placeholders: Dict[str, str]) -> str: """Restore brand names after translation.""" restored_text = text for placeholder, brand in placeholders.items(): restored_text = restored_text.replace(placeholder, brand) return restored_text def fix_translated_brands(self, msgid: str, msgstr: str) -> Tuple[str, List[str]]: """Fix brands that were literally translated.""" corrections = [] fixed_msgstr = msgstr # Fix known literal translations for correct_name, wrong_variations in self.LITERAL_TRANSLATIONS.items(): for wrong in wrong_variations: if wrong in fixed_msgstr: fixed_msgstr = fixed_msgstr.replace(wrong, correct_name) corrections.append(f"{wrong} → {correct_name}") if self.cache: self.cache.log_correction( original=wrong, corrected=correct_name, rule="literal_translation", plugin_name="unknown" ) return fixed_msgstr, corrections # ============================================================================= # QualityValidator # ============================================================================= class QualityValidator: """Validates translation quality.""" PTBR_TERMS = [ 'você', 'vocês', 'gerenciar', 'habilitar', 'desabilitar', 'deletar', 'salvar', 'arquivo', 'tela', 'senha', 'celular', 'usuário', 'configuração', 'cadastro', 'lixeira', 'gerenciamento', 'visualizar', 'acessar', 'baixar', 'compartilhar' ] def validate_entry(self, entry: PoEntry) -> Tuple[bool, List[str]]: """Validate a complete entry.""" errors = [] # 1. Check placeholders if not self._check_placeholders(entry.msgid, entry.msgstr): errors.append("PLACEHOLDER_MISMATCH") # 2. Check HTML tags if not self._check_html_tags(entry.msgid, entry.msgstr): errors.append("HTML_TAG_MISMATCH") # 3. Check for empty translations if entry.msgid and not entry.msgstr and not entry.msgstr_plural: errors.append("EMPTY_TRANSLATION") # 4. Check for PT-BR terms ptbr_terms = self._detect_ptbr(entry.msgstr) if ptbr_terms: errors.append(f"PTBR_TERMS: {', '.join(ptbr_terms)}") return len(errors) == 0, errors def _check_placeholders(self, msgid: str, msgstr: str) -> bool: """Check if placeholders are preserved.""" if not msgstr: return True pattern = r'%(?:\d+\$)?[sdifuxX]|\{\{?\w+\}?\}|\[\w+\]' msgid_placeholders = sorted(re.findall(pattern, msgid)) msgstr_placeholders = sorted(re.findall(pattern, msgstr)) return msgid_placeholders == msgstr_placeholders def _check_html_tags(self, msgid: str, msgstr: str) -> bool: """Check if HTML tags are preserved.""" if not msgstr: return True msgid_tags = sorted(re.findall(r'<[^>]+>', msgid)) msgstr_tags = sorted(re.findall(r'<[^>]+>', msgstr)) # Auto-fix common issues msgstr_fixed = msgstr.replace('', '').replace('', '') msgstr_tags_fixed = sorted(re.findall(r'<[^>]+>', msgstr_fixed)) return msgid_tags == msgstr_tags or msgid_tags == msgstr_tags_fixed def _detect_ptbr(self, text: str) -> List[str]: """Detect PT-BR terms in text.""" found = [] for term in self.PTBR_TERMS: if re.search(r'\b' + re.escape(term) + r'\b', text, re.IGNORECASE): found.append(term) return found # ============================================================================= # TranslationEngine # ============================================================================= class TranslationEngine: """Wrapper for LibreTranslate API with retry and rate limiting.""" def __init__(self, api_url: str = "https://translate.descomplicar.pt"): """Initialize translation engine.""" self.api_url = api_url.rstrip("/") self.translate_endpoint = f"{self.api_url}/translate" self.rate_limit = 0.3 self.last_call = 0 self.stats = {"success": 0, "failed": 0, "cached": 0} def translate(self, text: str, source: str = "en", target: str = "pt") -> str: """Translate text with retry logic.""" if not text or text.isspace(): return text # Rate limiting elapsed = time.time() - self.last_call if elapsed < self.rate_limit: time.sleep(self.rate_limit - elapsed) # Retry 3 times for attempt in range(3): try: data = json.dumps({ "q": text, "source": source, "target": target, "format": "text" }).encode('utf-8') req = Request( self.translate_endpoint, data=data, headers={"Content-Type": "application/json"} ) with urlopen(req, timeout=30) as response: result = json.loads(response.read().decode('utf-8')) translated = result.get("translatedText", "") self.last_call = time.time() self.stats["success"] += 1 return translated except (URLError, Exception) as e: if attempt < 2: wait = 2 ** attempt time.sleep(wait) continue else: self.stats["failed"] += 1 return "" return "" # ============================================================================= # PoFileHandler # ============================================================================= class PoFileHandler: """Parse and write .po files.""" def parse(self, po_file: Path) -> List[PoEntry]: """Parse .po file into list of entries.""" entries = [] current = PoEntry() with open(po_file, 'r', encoding='utf-8') as f: lines = f.readlines() for i, line in enumerate(lines): line = line.rstrip('\n') if line.startswith('#'): current.comments.append(line) current.line_start = i elif line.startswith('msgid '): if current.msgid: entries.append(current) current = PoEntry() current.msgid = self._extract_string(line) current.msgid_line = i elif line.startswith('msgid_plural '): current.msgid_plural = self._extract_string(line) elif line.startswith('msgstr'): value = self._extract_string(line) if '[' in line: match = re.search(r'\[(\d+)\]', line) if match: idx = int(match.group(1)) current.msgstr_plural[idx] = value else: current.msgstr = value else: current.msgstr = value current.msgstr_line = i elif line.startswith('"'): continuation = self._extract_string(line) if current.msgstr_line and i > current.msgstr_line: if current.msgstr_plural: last_idx = max(current.msgstr_plural.keys()) current.msgstr_plural[last_idx] += continuation else: current.msgstr += continuation elif current.msgid_line and i > current.msgid_line: if current.msgid_plural: current.msgid_plural += continuation else: current.msgid += continuation elif not line.strip(): if current.msgid: entries.append(current) current = PoEntry() if current.msgid: entries.append(current) return entries def save(self, entries: List[PoEntry], output_file: Path): """Save entries to .po file.""" lines = [] for entry in entries: lines.extend(entry.comments) lines.append(f'msgid "{entry.msgid}"') if entry.msgid_plural: lines.append(f'msgid_plural "{entry.msgid_plural}"') if entry.msgstr_plural: for idx, value in sorted(entry.msgstr_plural.items()): lines.append(f'msgstr[{idx}] "{value}"') else: lines.append(f'msgstr "{entry.msgstr}"') lines.append("") with open(output_file, 'w', encoding='utf-8') as f: f.write('\n'.join(lines)) def _extract_string(self, line: str) -> str: """Extract string from msgid/msgstr line.""" match = re.search(r'"(.*?)"', line) return match.group(1) if match else "" # ============================================================================= # TranslationProcessor # ============================================================================= class TranslationProcessor: """Main orchestrator for translation pipeline.""" def __init__(self, db_path: str, api_url: str): """Initialize processor with all components.""" self.db_path = db_path self.cache = CacheManager(db_path) if db_path != ":memory:" else None # Validar saude da BD antes de processar if self.cache: health = self.cache.verify_health() if not health["healthy"]: errors_str = "; ".join(health["errors"]) raise RuntimeError( f"Base de dados com problemas: {errors_str}. " f"Executar --verify-db para diagnostico completo." ) self.brand_protector = BrandProtector(db_path) self.translator = TranslationEngine(api_url) self.po_handler = PoFileHandler() self.validator = QualityValidator() def process_file(self, po_file: Path, mode: str = "full") -> ProcessResult: """Process .po file through full pipeline.""" # Backup original backup_path = po_file.with_suffix('.po.backup') if backup_path.exists(): backup_path.unlink() shutil.copy2(po_file, backup_path) try: # Parse entries = self.po_handler.parse(po_file) # Process entries processed = [] errors = [] stats = {"translated": 0, "cached": 0, "brands_fixed": 0} for entry in entries: try: result, brands_fixed = self._process_entry(entry, mode, po_file.stem) # Validate valid, validation_errors = self.validator.validate_entry(result) if valid or not result.msgstr: processed.append(result) if result.msgstr and not entry.msgstr: stats["translated"] += 1 stats["brands_fixed"] += brands_fixed else: errors.append({ 'msgid': entry.msgid[:50], 'errors': validation_errors }) processed.append(entry) except Exception as e: errors.append({'msgid': entry.msgid[:50], 'exception': str(e)}) processed.append(entry) # Save self.po_handler.save(processed, po_file) # Compile .mo (best effort - don't fail if compilation has errors) compile_success = self._compile_mo(po_file) if not compile_success: errors.append({'warning': 'msgfmt compilation had warnings or errors'}) # Success - remove backup backup_path.unlink() return ProcessResult( success=True, total=len(entries), translated=stats["translated"], cached=stats["cached"], brands_fixed=stats["brands_fixed"], errors=errors ) except Exception as e: # Rollback shutil.copy2(backup_path, po_file) return ProcessResult(success=False, error=str(e)) def _process_entry(self, entry: PoEntry, mode: str, plugin_name: str) -> Tuple[PoEntry, int]: """Process single entry through pipeline.""" brands_fixed = 0 # Skip header entries if not entry.msgid: return entry, 0 # Mode: brands-only if mode == "brands-only" or entry.msgstr: # Process msgstr (singular) if entry.msgstr: original_msgstr = entry.msgstr fixed, corrections = self.brand_protector.fix_translated_brands( entry.msgid, entry.msgstr ) if corrections: brands_fixed = len(corrections) fixed, _ = apply_ptbr_fixes(fixed) entry.msgstr = fixed # Guardar backup se houve alteracao if self.cache and fixed != original_msgstr: self.cache.backup_translation( entry.msgid, original_msgstr, fixed, plugin_name, plugin_name ) # Process msgstr_plural (plural forms) if entry.msgstr_plural: for idx, value in entry.msgstr_plural.items(): fixed, corrections = self.brand_protector.fix_translated_brands( entry.msgid, value ) if corrections: brands_fixed += len(corrections) fixed, _ = apply_ptbr_fixes(fixed) entry.msgstr_plural[idx] = fixed return entry, brands_fixed # Mode: full translation if entry.msgid and not entry.msgstr: # Check cache if self.cache: cached = self.cache.get_cached_translation(entry.msgid) if cached: entry.msgstr = cached return entry, 0 # Translate protected, placeholders = self.brand_protector.protect_brands(entry.msgid) translated = self.translator.translate(protected) if translated: translated = self.brand_protector.restore_brands(translated, placeholders) translated, _ = apply_ptbr_fixes(translated) translated, corrections = self.brand_protector.fix_translated_brands( entry.msgid, translated ) brands_fixed = len(corrections) entry.msgstr = translated if self.cache: self.cache.save_translation( entry.msgid, translated, plugin_name, validated=False ) # Guardar backup (original vazio -> traduzido) self.cache.backup_translation( entry.msgid, "", translated, plugin_name, plugin_name ) return entry, brands_fixed def _compile_mo(self, po_file: Path) -> bool: """Compile .mo file using msgfmt.""" mo_file = po_file.with_suffix('.mo') try: subprocess.run( ['msgfmt', '-cv', '-o', str(mo_file), str(po_file)], capture_output=True, text=True, check=True ) return True except (subprocess.CalledProcessError, FileNotFoundError): return False # ============================================================================= # Seed Database # ============================================================================= def seed_brands_db(cache: CacheManager): """Populate database with seed brands.""" print("🌱 Seeding brands database...") for brand in SEED_BRANDS: try: cache.conn.execute( """INSERT OR IGNORE INTO brands (name, auto_detected, confidence_score) VALUES (?, 0, 1.0)""", (brand,) ) except Exception: pass cache.conn.commit() print(f"✅ Seeded {len(SEED_BRANDS)} brands") # ============================================================================= # Main CLI # ============================================================================= def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( description="Sistema eficiente de traduções WordPress PT-PT", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("files", nargs="*", help="Po files to process") parser.add_argument("--batch", type=Path, help="Process all .po files in directory") parser.add_argument("--brands-only", action="store_true", help="Only fix brands") parser.add_argument("--dry-run", action="store_true", help="Show what would be done") parser.add_argument("--init-db", action="store_true", help="Initialize database") parser.add_argument("--verify-db", action="store_true", help="Verify database health") parser.add_argument("--restore-backup", type=str, metavar="MSGID_HASH", help="Restore original translation from backup by msgid hash") parser.add_argument("--export-brands", type=Path, help="Export brands to JSON") parser.add_argument("--import-brands", type=Path, help="Import brands from JSON") parser.add_argument("--db-path", type=str, default=str(Path.home() / ".wp-translate-ptpt" / "cache.db")) parser.add_argument("--api-url", type=str, default="https://translate.descomplicar.pt") parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") args = parser.parse_args() # Ensure db directory exists db_dir = Path(args.db_path).parent db_dir.mkdir(parents=True, exist_ok=True) # Verify database health if args.verify_db: try: cache = CacheManager(args.db_path) except Exception as e: print(f"\u274c Erro ao abrir BD: {e}") return 1 health = cache.verify_health() print("=" * 60) print(f"\U0001f50d Verificacao BD: {args.db_path}") print("=" * 60) print(f"Integridade: {health['integrity']}") state_icon = '\u2705 Saudavel' if health['healthy'] else '\u274c Com problemas' print(f"Estado: {state_icon}") print() for table_name, info in health["tables"].items(): status = "\u2705" if info["exists"] and info.get("columns_ok", False) else "\u274c" print(f" {status} {table_name}: ", end="") if info["exists"]: print(f"{info['rows']} registos", end="") if not info.get("columns_ok", True): print(f" (colunas em falta: {info.get('missing_columns', [])})", end="") print() else: print("NAO EXISTE") if health["errors"]: print(f"\nErros: {'; '.join(health['errors'])}") cache.close() return 0 if health["healthy"] else 1 # Initialize database if args.init_db: cache = CacheManager(args.db_path) seed_brands_db(cache) cache.close() return 0 # Restore translation from backup if args.restore_backup: try: cache = CacheManager(args.db_path) except Exception as e: print(f"\u274c Erro ao abrir BD: {e}") return 1 cursor = cache.conn.execute( """SELECT msgid, original_msgstr, new_msgstr, plugin_name, po_file, timestamp FROM translation_backups WHERE msgid_hash = ? ORDER BY timestamp DESC LIMIT 1""", (args.restore_backup,) ) row = cursor.fetchone() if not row: print(f"\u274c Nenhum backup encontrado para hash: {args.restore_backup}") cache.close() return 1 print(f"\U0001f4e6 Backup encontrado:") print(f" msgid: {row[0][:80]}") print(f" Original: {row[1][:80] if row[1] else '(vazio)'}") print(f" Actual: {row[2][:80]}") print(f" Plugin: {row[3]}") print(f" Ficheiro: {row[4]}") print(f" Data: {row[5]}") cache.close() return 0 # Export brands if args.export_brands: cache = CacheManager(args.db_path) cursor = cache.conn.execute("SELECT name FROM brands ORDER BY name") brands = [row[0] for row in cursor.fetchall()] with open(args.export_brands, 'w') as f: json.dump(brands, f, indent=2, ensure_ascii=False) print(f"✅ Exported {len(brands)} brands to {args.export_brands}") cache.close() return 0 # Import brands if args.import_brands: with open(args.import_brands, 'r') as f: brands = json.load(f) cache = CacheManager(args.db_path) for brand in brands: cache.conn.execute( """INSERT OR IGNORE INTO brands (name, auto_detected, confidence_score) VALUES (?, 0, 1.0)""", (brand,) ) cache.conn.commit() print(f"✅ Imported {len(brands)} brands") cache.close() return 0 # Collect files files_to_process = [] if args.batch: files_to_process = list(args.batch.rglob("*-pt_PT.po")) elif args.files: files_to_process = [Path(f) for f in args.files] else: parser.print_help() return 1 if not files_to_process: print("❌ No .po files found") return 1 # Process files try: processor = TranslationProcessor(args.db_path, args.api_url) except (sqlite3.DatabaseError, RuntimeError) as e: print(f"\u274c Erro na base de dados: {e}") print("Sugestao: executar --verify-db para diagnostico ou --init-db para reinicializar") return 1 mode = "brands-only" if args.brands_only else "full" print("="*60) print(f"🌍 WP Translate PT-PT v{__version__}") print("="*60) print(f"Mode: {mode}") print(f"Files: {len(files_to_process)}") print(f"Dry run: {args.dry_run}") print("="*60) print() results = [] start_time = time.time() for i, po_file in enumerate(files_to_process, 1): print(f"[{i}/{len(files_to_process)}] {po_file.name}...", end=" ", flush=True) if args.dry_run: print("(skipped)") continue result = processor.process_file(po_file, mode=mode) results.append(result) if result.success: print(f"✅ {result.brands_fixed} brands fixed") else: print(f"❌ {result.error}") # Summary if results and not args.dry_run: elapsed = time.time() - start_time print("\n" + "="*60) print("📊 SUMMARY") print("="*60) total_success = sum(1 for r in results if r.success) total_brands_fixed = sum(r.brands_fixed for r in results) total_errors = sum(len(r.errors) for r in results) print(f"Files processed: {total_success}/{len(results)}") print(f"Brands fixed: {total_brands_fixed}") print(f"Errors: {total_errors}") print(f"Time: {elapsed:.1f}s") print("="*60) if processor.cache: processor.cache.close() return 0 if __name__ == "__main__": sys.exit(main())