#!/usr/bin/env python3 """ okf-normalize.py — Fase 1: Normaliza frontmatter OKF em todos os .md do Hub Adiciona/completa: type, title, description, timestamp OKF SPEC §4.1: type é o único campo obrigatório Uso: python3 okf-normalize.py [--dry-run] [--dir /path/to/Hub] Criado: 28-06-2026 """ import os import re import sys import subprocess from datetime import datetime, timezone from pathlib import Path HUB_DEFAULT = "/media/ealmeida/Dados/Hub" # Directórios excluídos do scan EXCLUDE_DIRS = { ".stversions", "node_modules", ".git", ".obsidian", ".trash", "99-Arquivo", } # Ficheiros reservados OKF — sem frontmatter obrigatório OKF_RESERVED = {"index.md", "log.md"} # Taxonomia Hub → OKF type def infer_type(filepath: Path) -> str: name = filepath.name parts = str(filepath).lower() if name.startswith("PROC-") or name.startswith("proc-"): return "Playbook" if name.startswith("QR-") or name.startswith("qr-"): return "Reference" if name.lower() in ("index.md", "index.md"): return "Index" if name.endswith("-SPEC.md") or name == "SPEC.md": return "Specification" if name.startswith("STATUS"): return "Status" if name.upper().startswith("CHANGELOG"): return "Changelog" if name.upper().startswith("README"): return "Reference" if "proposta" in parts or "orcamento" in parts or "budget" in parts: return "Proposal" if "90-templates" in parts or "/template" in parts: return "Template" if "07-clientes" in parts: return "Client Profile" return "Document" def get_git_timestamp(filepath: Path, hub: Path) -> str: """Obter timestamp da última modificação via git log.""" try: rel = filepath.relative_to(hub) result = subprocess.run( ["git", "log", "-1", "--format=%cI", "--", str(rel)], cwd=str(hub), capture_output=True, text=True, timeout=5, ) ts = result.stdout.strip() if ts: return ts except Exception: pass # fallback: mtime do ficheiro mtime = filepath.stat().st_mtime return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() def _looks_like_yaml_mapping(fm: str) -> bool: """True só se o bloco for um mapa YAML válido (evita tratar régua '---' + corpo como frontmatter — causa de corrupção em MEMORY.md e afins).""" if not fm.strip(): return False try: import yaml # type: ignore data = yaml.safe_load(fm) return isinstance(data, dict) except ImportError: pass except Exception: return False # Fallback sem PyYAML: a 1ª linha não-vazia tem de ser uma chave `nome:`; # nenhuma linha de topo (não-indentada, não-vazia) pode ser heading markdown. first = None for ln in fm.split("\n"): if not ln.strip(): continue if first is None: first = ln if ln[:1] == "#": # heading markdown ao nível de topo return False if first is None: return False return bool(re.match(r"^[A-Za-z_][\w\-]*:", first)) def parse_frontmatter(content: str): """Retorna (frontmatter_str, body_str, has_fm) ou (None, content, False). Só considera frontmatter um bloco `---…---` inicial que seja um MAPA YAML válido. Um `---` inicial usado como régua/separador (seguido de corpo) NÃO é frontmatter.""" if content.startswith("---\n"): end = content.find("\n---\n", 4) if end != -1: fm = content[4:end] if _looks_like_yaml_mapping(fm): body = content[end + 5:] return fm, body, True return None, content, False def first_useful_sentence(body: str) -> str: """Extrai primeira frase útil do body para description.""" # Remover headings, listas, blocos de código lines = body.split("\n") for line in lines: line = line.strip() if not line: continue if line.startswith("#"): continue if line.startswith("```"): continue if line.startswith("|"): continue if line.startswith("-") or line.startswith("*"): # Lista: usar conteúdo sem bullet line = re.sub(r"^[-*]\s+", "", line) # Limpar markdown inline line = re.sub(r"\*\*(.+?)\*\*", r"\1", line) line = re.sub(r"\[(.+?)\]\(.+?\)", r"\1", line) line = line.strip() if len(line) > 10: # Truncar em 120 chars return line[:120].rstrip(".") + ("…" if len(line) > 120 else "") return "" def normalize_file(filepath: Path, hub: Path, dry_run: bool) -> dict: """Normaliza um ficheiro. Retorna dict com acção tomada.""" result = {"file": str(filepath.relative_to(hub)), "action": "skip", "changes": []} try: content = filepath.read_text(encoding="utf-8") except Exception as e: result["action"] = "error" result["error"] = str(e) return result fm_str, body, has_fm = parse_frontmatter(content) if not has_fm: # Injetar frontmatter mínimo inferred_type = infer_type(filepath) title = filepath.stem.replace("-", " ").replace("_", " ").title() description = first_useful_sentence(body) timestamp = get_git_timestamp(filepath, hub) new_fm_lines = [f"type: {inferred_type}", f"title: {title}"] if description: new_fm_lines.append(f"description: >-\n {description}") new_fm_lines.append(f"timestamp: {timestamp}") new_content = "---\n" + "\n".join(new_fm_lines) + "\n---\n" + content result["action"] = "add_frontmatter" result["changes"] = new_fm_lines else: # Ficheiro já tem frontmatter — completar campos em falta fm_lines = fm_str.split("\n") changes = [] has_type = any(line.startswith("type:") for line in fm_lines) has_title = any(line.startswith("title:") for line in fm_lines) has_description = any(line.startswith("description:") for line in fm_lines) has_timestamp = any( line.startswith("timestamp:") or line.startswith("date:") for line in fm_lines ) if not has_type: inferred_type = infer_type(filepath) fm_lines.insert(0, f"type: {inferred_type}") changes.append(f"+ type: {inferred_type}") if not has_title: title = filepath.stem.replace("-", " ").replace("_", " ").title() # Inserir após type type_idx = next( (i for i, l in enumerate(fm_lines) if l.startswith("type:")), 0 ) fm_lines.insert(type_idx + 1, f"title: {title}") changes.append(f"+ title: {title}") if not has_description: desc = first_useful_sentence(body) if desc: desc_entry = f"description: >-\n {desc}" title_idx = next( (i for i, l in enumerate(fm_lines) if l.startswith("title:")), 1 ) fm_lines.insert(title_idx + 1, desc_entry) changes.append(f"+ description: {desc[:60]}…") if not has_timestamp: ts = get_git_timestamp(filepath, hub) fm_lines.append(f"timestamp: {ts}") changes.append(f"+ timestamp: {ts}") if not changes: result["action"] = "already_ok" return result new_fm = "\n".join(fm_lines) new_content = "---\n" + new_fm + "\n---\n" + body result["action"] = "update_frontmatter" result["changes"] = changes if not dry_run: try: filepath.write_text(new_content, encoding="utf-8") except Exception as e: result["action"] = "error" result["error"] = str(e) return result def scan_hub(hub: Path, dry_run: bool): """Scan recursivo do vault Hub.""" stats = {"add": 0, "update": 0, "ok": 0, "skip": 0, "error": 0} report_lines = [ f"# okf-normalize — {'DRY-RUN' if dry_run else 'EXECUÇÃO'} — {datetime.now().isoformat()[:16]}", f"Hub: {hub}", "", ] for root, dirs, files in os.walk(hub): root_path = Path(root) # Excluir directórios dirs[:] = [ d for d in dirs if d not in EXCLUDE_DIRS and not d.startswith(".") ] for fname in files: if not fname.endswith(".md"): continue if fname.lower() in OKF_RESERVED: continue filepath = root_path / fname result = normalize_file(filepath, hub, dry_run) action = result["action"] if action == "add_frontmatter": stats["add"] += 1 report_lines.append(f"[ADD] {result['file']}") for c in result["changes"]: report_lines.append(f" {c}") elif action == "update_frontmatter": stats["update"] += 1 report_lines.append(f"[UPD] {result['file']}") for c in result["changes"]: report_lines.append(f" {c}") elif action == "already_ok": stats["ok"] += 1 elif action == "error": stats["error"] += 1 report_lines.append(f"[ERR] {result['file']}: {result.get('error')}") else: stats["skip"] += 1 report_lines += [ "", "## Resultado", f"- Frontmatter adicionado: {stats['add']}", f"- Frontmatter actualizado: {stats['update']}", f"- Já conformes: {stats['ok']}", f"- Erros: {stats['error']}", f"- Ignorados: {stats['skip']}", ] return stats, "\n".join(report_lines) def main(): dry_run = "--dry-run" in sys.argv hub = Path(HUB_DEFAULT) for arg in sys.argv[1:]: if arg.startswith("--dir="): hub = Path(arg[6:]) if not hub.exists(): print(f"ERRO: Hub não encontrado em {hub}", file=sys.stderr) sys.exit(1) print(f"{'[DRY-RUN] ' if dry_run else ''}A normalizar OKF em {hub}…") stats, report = scan_hub(hub, dry_run) report_path = Path(__file__).parent / "okf-normalize-report.md" report_path.write_text(report, encoding="utf-8") print(report_path.read_text(encoding="utf-8").split("## Resultado")[1].strip()) print(f"\nRelatório completo: {report_path}") if __name__ == "__main__": main()