Files
scripts/okf-hub/okf-normalize.py
T

292 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
okf-normalize.py — Fase 1: Normaliza frontmatter OKF em todos os .md do Hub
Adiciona/completa: type, title, description, timestamp
OKF SPEC §4.1: type é o único campo obrigatório
Uso:
python3 okf-normalize.py [--dry-run] [--dir /path/to/Hub]
Criado: 28-06-2026
"""
import os
import re
import sys
import subprocess
from datetime import datetime, timezone
from pathlib import Path
HUB_DEFAULT = "/media/ealmeida/Dados/Hub"
# Directórios excluídos do scan
EXCLUDE_DIRS = {
".stversions",
"node_modules",
".git",
".obsidian",
".trash",
"99-Arquivo",
}
# Ficheiros reservados OKF — sem frontmatter obrigatório
OKF_RESERVED = {"index.md", "log.md"}
# Taxonomia Hub → OKF type
def infer_type(filepath: Path) -> str:
name = filepath.name
parts = str(filepath).lower()
if name.startswith("PROC-") or name.startswith("proc-"):
return "Playbook"
if name.startswith("QR-") or name.startswith("qr-"):
return "Reference"
if name.lower() in ("index.md", "index.md"):
return "Index"
if name.endswith("-SPEC.md") or name == "SPEC.md":
return "Specification"
if name.startswith("STATUS"):
return "Status"
if name.upper().startswith("CHANGELOG"):
return "Changelog"
if name.upper().startswith("README"):
return "Reference"
if "proposta" in parts or "orcamento" in parts or "budget" in parts:
return "Proposal"
if "90-templates" in parts or "/template" in parts:
return "Template"
if "07-clientes" in parts:
return "Client Profile"
return "Document"
def get_git_timestamp(filepath: Path, hub: Path) -> str:
"""Obter timestamp da última modificação via git log."""
try:
rel = filepath.relative_to(hub)
result = subprocess.run(
["git", "log", "-1", "--format=%cI", "--", str(rel)],
cwd=str(hub),
capture_output=True,
text=True,
timeout=5,
)
ts = result.stdout.strip()
if ts:
return ts
except Exception:
pass
# fallback: mtime do ficheiro
mtime = filepath.stat().st_mtime
return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
def parse_frontmatter(content: str):
"""Retorna (frontmatter_str, body_str, has_fm) ou (None, content, False)."""
if content.startswith("---\n"):
end = content.find("\n---\n", 4)
if end != -1:
fm = content[4:end]
body = content[end + 5:]
return fm, body, True
return None, content, False
def first_useful_sentence(body: str) -> str:
"""Extrai primeira frase útil do body para description."""
# Remover headings, listas, blocos de código
lines = body.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
if line.startswith("```"):
continue
if line.startswith("|"):
continue
if line.startswith("-") or line.startswith("*"):
# Lista: usar conteúdo sem bullet
line = re.sub(r"^[-*]\s+", "", line)
# Limpar markdown inline
line = re.sub(r"\*\*(.+?)\*\*", r"\1", line)
line = re.sub(r"\[(.+?)\]\(.+?\)", r"\1", line)
line = line.strip()
if len(line) > 10:
# Truncar em 120 chars
return line[:120].rstrip(".") + ("" if len(line) > 120 else "")
return ""
def normalize_file(filepath: Path, hub: Path, dry_run: bool) -> dict:
"""Normaliza um ficheiro. Retorna dict com acção tomada."""
result = {"file": str(filepath.relative_to(hub)), "action": "skip", "changes": []}
try:
content = filepath.read_text(encoding="utf-8")
except Exception as e:
result["action"] = "error"
result["error"] = str(e)
return result
fm_str, body, has_fm = parse_frontmatter(content)
if not has_fm:
# Injetar frontmatter mínimo
inferred_type = infer_type(filepath)
title = filepath.stem.replace("-", " ").replace("_", " ").title()
description = first_useful_sentence(body)
timestamp = get_git_timestamp(filepath, hub)
new_fm_lines = [f"type: {inferred_type}", f"title: {title}"]
if description:
new_fm_lines.append(f"description: >-\n {description}")
new_fm_lines.append(f"timestamp: {timestamp}")
new_content = "---\n" + "\n".join(new_fm_lines) + "\n---\n" + content
result["action"] = "add_frontmatter"
result["changes"] = new_fm_lines
else:
# Ficheiro já tem frontmatter — completar campos em falta
fm_lines = fm_str.split("\n")
changes = []
has_type = any(line.startswith("type:") for line in fm_lines)
has_title = any(line.startswith("title:") for line in fm_lines)
has_description = any(line.startswith("description:") for line in fm_lines)
has_timestamp = any(
line.startswith("timestamp:") or line.startswith("date:")
for line in fm_lines
)
if not has_type:
inferred_type = infer_type(filepath)
fm_lines.insert(0, f"type: {inferred_type}")
changes.append(f"+ type: {inferred_type}")
if not has_title:
title = filepath.stem.replace("-", " ").replace("_", " ").title()
# Inserir após type
type_idx = next(
(i for i, l in enumerate(fm_lines) if l.startswith("type:")), 0
)
fm_lines.insert(type_idx + 1, f"title: {title}")
changes.append(f"+ title: {title}")
if not has_description:
desc = first_useful_sentence(body)
if desc:
desc_entry = f"description: >-\n {desc}"
title_idx = next(
(i for i, l in enumerate(fm_lines) if l.startswith("title:")), 1
)
fm_lines.insert(title_idx + 1, desc_entry)
changes.append(f"+ description: {desc[:60]}")
if not has_timestamp:
ts = get_git_timestamp(filepath, hub)
fm_lines.append(f"timestamp: {ts}")
changes.append(f"+ timestamp: {ts}")
if not changes:
result["action"] = "already_ok"
return result
new_fm = "\n".join(fm_lines)
new_content = "---\n" + new_fm + "\n---\n" + body
result["action"] = "update_frontmatter"
result["changes"] = changes
if not dry_run:
try:
filepath.write_text(new_content, encoding="utf-8")
except Exception as e:
result["action"] = "error"
result["error"] = str(e)
return result
def scan_hub(hub: Path, dry_run: bool):
"""Scan recursivo do vault Hub."""
stats = {"add": 0, "update": 0, "ok": 0, "skip": 0, "error": 0}
report_lines = [
f"# okf-normalize — {'DRY-RUN' if dry_run else 'EXECUÇÃO'}{datetime.now().isoformat()[:16]}",
f"Hub: {hub}",
"",
]
for root, dirs, files in os.walk(hub):
root_path = Path(root)
# Excluir directórios
dirs[:] = [
d for d in dirs
if d not in EXCLUDE_DIRS and not d.startswith(".")
]
for fname in files:
if not fname.endswith(".md"):
continue
if fname.lower() in OKF_RESERVED:
continue
filepath = root_path / fname
result = normalize_file(filepath, hub, dry_run)
action = result["action"]
if action == "add_frontmatter":
stats["add"] += 1
report_lines.append(f"[ADD] {result['file']}")
for c in result["changes"]:
report_lines.append(f" {c}")
elif action == "update_frontmatter":
stats["update"] += 1
report_lines.append(f"[UPD] {result['file']}")
for c in result["changes"]:
report_lines.append(f" {c}")
elif action == "already_ok":
stats["ok"] += 1
elif action == "error":
stats["error"] += 1
report_lines.append(f"[ERR] {result['file']}: {result.get('error')}")
else:
stats["skip"] += 1
report_lines += [
"",
"## Resultado",
f"- Frontmatter adicionado: {stats['add']}",
f"- Frontmatter actualizado: {stats['update']}",
f"- Já conformes: {stats['ok']}",
f"- Erros: {stats['error']}",
f"- Ignorados: {stats['skip']}",
]
return stats, "\n".join(report_lines)
def main():
dry_run = "--dry-run" in sys.argv
hub = Path(HUB_DEFAULT)
for arg in sys.argv[1:]:
if arg.startswith("--dir="):
hub = Path(arg[6:])
if not hub.exists():
print(f"ERRO: Hub não encontrado em {hub}", file=sys.stderr)
sys.exit(1)
print(f"{'[DRY-RUN] ' if dry_run else ''}A normalizar OKF em {hub}")
stats, report = scan_hub(hub, dry_run)
report_path = hub / "04-Stack/02.04-Sistemas/MemoriaCentral/scripts/okf-normalize-report.md"
report_path.write_text(report, encoding="utf-8")
print(report_path.read_text(encoding="utf-8").split("## Resultado")[1].strip())
print(f"\nRelatório completo: {report_path}")
if __name__ == "__main__":
main()