333 lines
11 KiB
Python
333 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
okf-normalize.py — Fase 1: Normaliza frontmatter OKF em todos os .md do Hub
|
|
Adiciona/completa: type, title, description, timestamp
|
|
OKF SPEC §4.1: type é o único campo obrigatório
|
|
|
|
Uso:
|
|
python3 okf-normalize.py [--dry-run] [--dir /path/to/Hub]
|
|
|
|
Criado: 28-06-2026
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
HUB_DEFAULT = "/media/ealmeida/Dados/Hub"
|
|
|
|
# Directórios excluídos do scan
|
|
EXCLUDE_DIRS = {
|
|
".stversions",
|
|
"node_modules",
|
|
".git",
|
|
".obsidian",
|
|
".trash",
|
|
"99-Arquivo",
|
|
# dependências / builds / caches (relevante p/ Dev) — dot-dirs já são saltados
|
|
"venv",
|
|
"vendor",
|
|
"dist",
|
|
"build",
|
|
"site-packages",
|
|
"target",
|
|
"__pycache__",
|
|
"3rdparty",
|
|
}
|
|
|
|
# Ficheiros reservados OKF — sem frontmatter obrigatório
|
|
OKF_RESERVED = {"index.md", "log.md"}
|
|
|
|
# Taxonomia Hub → OKF type
|
|
def infer_type(filepath: Path) -> str:
|
|
name = filepath.name
|
|
parts = str(filepath).lower()
|
|
|
|
if name.startswith("PROC-") or name.startswith("proc-"):
|
|
return "Playbook"
|
|
if name.startswith("QR-") or name.startswith("qr-"):
|
|
return "Reference"
|
|
if name.lower() in ("index.md", "index.md"):
|
|
return "Index"
|
|
if name.endswith("-SPEC.md") or name == "SPEC.md":
|
|
return "Specification"
|
|
if name.startswith("STATUS"):
|
|
return "Status"
|
|
if name.upper().startswith("CHANGELOG"):
|
|
return "Changelog"
|
|
if name.upper().startswith("README"):
|
|
return "Reference"
|
|
if "proposta" in parts or "orcamento" in parts or "budget" in parts:
|
|
return "Proposal"
|
|
if "90-templates" in parts or "/template" in parts:
|
|
return "Template"
|
|
if "07-clientes" in parts:
|
|
return "Client Profile"
|
|
return "Document"
|
|
|
|
|
|
def get_git_timestamp(filepath: Path, hub: Path) -> str:
|
|
"""Obter timestamp da última modificação via git log."""
|
|
try:
|
|
rel = filepath.relative_to(hub)
|
|
result = subprocess.run(
|
|
["git", "log", "-1", "--format=%cI", "--", str(rel)],
|
|
cwd=str(hub),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
ts = result.stdout.strip()
|
|
if ts:
|
|
return ts
|
|
except Exception:
|
|
pass
|
|
# fallback: mtime do ficheiro
|
|
mtime = filepath.stat().st_mtime
|
|
return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
|
|
|
|
|
|
def _looks_like_yaml_mapping(fm: str) -> bool:
|
|
"""True só se o bloco for um mapa YAML válido (evita tratar régua '---' + corpo
|
|
como frontmatter — causa de corrupção em MEMORY.md e afins)."""
|
|
if not fm.strip():
|
|
return False
|
|
try:
|
|
import yaml # type: ignore
|
|
data = yaml.safe_load(fm)
|
|
return isinstance(data, dict)
|
|
except ImportError:
|
|
pass
|
|
except Exception:
|
|
return False
|
|
# Fallback sem PyYAML: a 1ª linha não-vazia tem de ser uma chave `nome:`;
|
|
# nenhuma linha de topo (não-indentada, não-vazia) pode ser heading markdown.
|
|
first = None
|
|
for ln in fm.split("\n"):
|
|
if not ln.strip():
|
|
continue
|
|
if first is None:
|
|
first = ln
|
|
if ln[:1] == "#": # heading markdown ao nível de topo
|
|
return False
|
|
if first is None:
|
|
return False
|
|
return bool(re.match(r"^[A-Za-z_][\w\-]*:", first))
|
|
|
|
|
|
def parse_frontmatter(content: str):
|
|
"""Retorna (frontmatter_str, body_str, has_fm) ou (None, content, False).
|
|
|
|
Só considera frontmatter um bloco `---…---` inicial que seja um MAPA YAML válido.
|
|
Um `---` inicial usado como régua/separador (seguido de corpo) NÃO é frontmatter."""
|
|
if content.startswith("---\n"):
|
|
end = content.find("\n---\n", 4)
|
|
if end != -1:
|
|
fm = content[4:end]
|
|
if _looks_like_yaml_mapping(fm):
|
|
body = content[end + 5:]
|
|
return fm, body, True
|
|
return None, content, False
|
|
|
|
|
|
def first_useful_sentence(body: str) -> str:
|
|
"""Extrai primeira frase útil do body para description."""
|
|
# Remover headings, listas, blocos de código
|
|
lines = body.split("\n")
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line.startswith("#"):
|
|
continue
|
|
if line.startswith("```"):
|
|
continue
|
|
if line.startswith("|"):
|
|
continue
|
|
if line.startswith("-") or line.startswith("*"):
|
|
# Lista: usar conteúdo sem bullet
|
|
line = re.sub(r"^[-*]\s+", "", line)
|
|
# Limpar markdown inline
|
|
line = re.sub(r"\*\*(.+?)\*\*", r"\1", line)
|
|
line = re.sub(r"\[(.+?)\]\(.+?\)", r"\1", line)
|
|
line = line.strip()
|
|
if len(line) > 10:
|
|
# Truncar em 120 chars
|
|
return line[:120].rstrip(".") + ("…" if len(line) > 120 else "")
|
|
return ""
|
|
|
|
|
|
def normalize_file(filepath: Path, hub: Path, dry_run: bool) -> dict:
|
|
"""Normaliza um ficheiro. Retorna dict com acção tomada."""
|
|
result = {"file": str(filepath.relative_to(hub)), "action": "skip", "changes": []}
|
|
|
|
try:
|
|
content = filepath.read_text(encoding="utf-8")
|
|
except Exception as e:
|
|
result["action"] = "error"
|
|
result["error"] = str(e)
|
|
return result
|
|
|
|
fm_str, body, has_fm = parse_frontmatter(content)
|
|
|
|
if not has_fm:
|
|
# Injetar frontmatter mínimo
|
|
inferred_type = infer_type(filepath)
|
|
title = filepath.stem.replace("-", " ").replace("_", " ").title()
|
|
description = first_useful_sentence(body)
|
|
timestamp = get_git_timestamp(filepath, hub)
|
|
|
|
new_fm_lines = [f"type: {inferred_type}", f"title: {title}"]
|
|
if description:
|
|
new_fm_lines.append(f"description: >-\n {description}")
|
|
new_fm_lines.append(f"timestamp: {timestamp}")
|
|
|
|
new_content = "---\n" + "\n".join(new_fm_lines) + "\n---\n" + content
|
|
result["action"] = "add_frontmatter"
|
|
result["changes"] = new_fm_lines
|
|
else:
|
|
# Ficheiro já tem frontmatter — completar campos em falta
|
|
fm_lines = fm_str.split("\n")
|
|
changes = []
|
|
|
|
has_type = any(line.startswith("type:") for line in fm_lines)
|
|
has_title = any(line.startswith("title:") for line in fm_lines)
|
|
has_description = any(line.startswith("description:") for line in fm_lines)
|
|
has_timestamp = any(
|
|
line.startswith("timestamp:") or line.startswith("date:")
|
|
for line in fm_lines
|
|
)
|
|
|
|
if not has_type:
|
|
inferred_type = infer_type(filepath)
|
|
fm_lines.insert(0, f"type: {inferred_type}")
|
|
changes.append(f"+ type: {inferred_type}")
|
|
|
|
if not has_title:
|
|
title = filepath.stem.replace("-", " ").replace("_", " ").title()
|
|
# Inserir após type
|
|
type_idx = next(
|
|
(i for i, l in enumerate(fm_lines) if l.startswith("type:")), 0
|
|
)
|
|
fm_lines.insert(type_idx + 1, f"title: {title}")
|
|
changes.append(f"+ title: {title}")
|
|
|
|
if not has_description:
|
|
desc = first_useful_sentence(body)
|
|
if desc:
|
|
desc_entry = f"description: >-\n {desc}"
|
|
title_idx = next(
|
|
(i for i, l in enumerate(fm_lines) if l.startswith("title:")), 1
|
|
)
|
|
fm_lines.insert(title_idx + 1, desc_entry)
|
|
changes.append(f"+ description: {desc[:60]}…")
|
|
|
|
if not has_timestamp:
|
|
ts = get_git_timestamp(filepath, hub)
|
|
fm_lines.append(f"timestamp: {ts}")
|
|
changes.append(f"+ timestamp: {ts}")
|
|
|
|
if not changes:
|
|
result["action"] = "already_ok"
|
|
return result
|
|
|
|
new_fm = "\n".join(fm_lines)
|
|
new_content = "---\n" + new_fm + "\n---\n" + body
|
|
result["action"] = "update_frontmatter"
|
|
result["changes"] = changes
|
|
|
|
if not dry_run:
|
|
try:
|
|
filepath.write_text(new_content, encoding="utf-8")
|
|
except Exception as e:
|
|
result["action"] = "error"
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def scan_hub(hub: Path, dry_run: bool):
|
|
"""Scan recursivo do vault Hub."""
|
|
stats = {"add": 0, "update": 0, "ok": 0, "skip": 0, "error": 0}
|
|
report_lines = [
|
|
f"# okf-normalize — {'DRY-RUN' if dry_run else 'EXECUÇÃO'} — {datetime.now().isoformat()[:16]}",
|
|
f"Hub: {hub}",
|
|
"",
|
|
]
|
|
|
|
for root, dirs, files in os.walk(hub):
|
|
root_path = Path(root)
|
|
|
|
# Excluir directórios
|
|
dirs[:] = [
|
|
d for d in dirs
|
|
if d not in EXCLUDE_DIRS and not d.startswith(".")
|
|
]
|
|
|
|
for fname in files:
|
|
if not fname.endswith(".md"):
|
|
continue
|
|
if fname.lower() in OKF_RESERVED:
|
|
continue
|
|
|
|
filepath = root_path / fname
|
|
result = normalize_file(filepath, hub, dry_run)
|
|
|
|
action = result["action"]
|
|
if action == "add_frontmatter":
|
|
stats["add"] += 1
|
|
report_lines.append(f"[ADD] {result['file']}")
|
|
for c in result["changes"]:
|
|
report_lines.append(f" {c}")
|
|
elif action == "update_frontmatter":
|
|
stats["update"] += 1
|
|
report_lines.append(f"[UPD] {result['file']}")
|
|
for c in result["changes"]:
|
|
report_lines.append(f" {c}")
|
|
elif action == "already_ok":
|
|
stats["ok"] += 1
|
|
elif action == "error":
|
|
stats["error"] += 1
|
|
report_lines.append(f"[ERR] {result['file']}: {result.get('error')}")
|
|
else:
|
|
stats["skip"] += 1
|
|
|
|
report_lines += [
|
|
"",
|
|
"## Resultado",
|
|
f"- Frontmatter adicionado: {stats['add']}",
|
|
f"- Frontmatter actualizado: {stats['update']}",
|
|
f"- Já conformes: {stats['ok']}",
|
|
f"- Erros: {stats['error']}",
|
|
f"- Ignorados: {stats['skip']}",
|
|
]
|
|
return stats, "\n".join(report_lines)
|
|
|
|
|
|
def main():
|
|
dry_run = "--dry-run" in sys.argv
|
|
hub = Path(HUB_DEFAULT)
|
|
for arg in sys.argv[1:]:
|
|
if arg.startswith("--dir="):
|
|
hub = Path(arg[6:])
|
|
|
|
if not hub.exists():
|
|
print(f"ERRO: Hub não encontrado em {hub}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"{'[DRY-RUN] ' if dry_run else ''}A normalizar OKF em {hub}…")
|
|
stats, report = scan_hub(hub, dry_run)
|
|
|
|
report_path = Path(__file__).parent / "okf-normalize-report.md"
|
|
report_path.write_text(report, encoding="utf-8")
|
|
|
|
print(report_path.read_text(encoding="utf-8").split("## Resultado")[1].strip())
|
|
print(f"\nRelatório completo: {report_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|