Files
scripts/kb-processor/processar_e_traduzir.py

281 lines
9.0 KiB
Python
Executable File

"""
processar_e_traduzir.py
Author: Descomplicar® Crescimento Digital
Link: https://descomplicar.pt
Copyright: 2025 Descomplicar®
"""
import os
import re
import time
import requests
from pathlib import Path
from datetime import datetime
# Configurações da API
BASE_URL = "https://openrouter.ai/api/v1/chat/completions"
API_KEY = "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"
# Diretórios
input_dir = Path("/home/ealmeida/Scripts/SuperBot-KB/input")
output_dir = Path("/home/ealmeida/Scripts/SuperBot-KB/output/yt")
def get_channel_name(file_path):
"""Extrai o nome do canal do caminho do arquivo"""
return file_path.parent.name
def extract_playlist_name(content):
"""Extrai o nome da lista de reprodução do conteúdo"""
match = re.search(r'^# (.+?)(?:\n|$)', content, re.MULTILINE)
return match.group(1) if match else "Lista de Reprodução Desconhecida"
def translate_title(title):
"""Traduz o título do vídeo para português de Portugal"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "mistralai/ministral-8b",
"messages": [
{
"role": "user",
"content": f"Traduza este título para português de Portugal, mantendo termos técnicos em inglês quando apropriado e não adicione nada além da tradução:\n\n{title}"
}
]
}
try:
response = requests.post(BASE_URL, json=data, headers=headers)
if response.status_code == 200:
response_data = response.json()
if 'choices' in response_data:
return response_data['choices'][0]['message']['content'].strip()
except Exception as e:
print(f"Erro ao traduzir título: {str(e)}")
return title # Retorna título original se falhar
def extract_videos_with_transcripts(content):
"""Extrai apenas os vídeos que têm transcrição"""
videos = []
current_video = {}
current_transcript = []
in_transcript = False
lines = content.split('\n')
for line in lines:
if line.startswith('### '): # Novo vídeo
if current_video and current_transcript:
current_video['transcript'] = '\n'.join(current_transcript)
videos.append(current_video)
title = line[4:]
current_video = {
'original_title': title,
'title': translate_title(title)
}
current_transcript = []
in_transcript = False
time.sleep(2) # Delay entre traduções
elif line.strip().startswith('Idioma:'): # Início da transcrição
in_transcript = True
elif in_transcript and line.strip(): # Linha da transcrição
current_transcript.append(line)
# Adicionar o último vídeo se tiver transcrição
if current_video and current_transcript:
current_video['transcript'] = '\n'.join(current_transcript)
videos.append(current_video)
return videos
def get_processing_prompt(video):
"""Gera o prompt para processamento do conteúdo"""
return f"""Transforme esta transcrição em um documento técnico estruturado, seguindo estas diretrizes:
1. Idioma: Português de Portugal (pt-PT)
2. Estilo: Técnico e profissional, sem marcas de oralidade
3. Estrutura necessária:
## {video['title']}
### Conteúdo
[Transforme a transcrição em texto técnico coeso, removendo marcas de oralidade e referências pessoais. Mantenha informação técnica relevante.]
### FAQs
[Gere exatamente 5 perguntas e respostas técnicas relevantes baseadas no conteúdo]
### Tags
[Liste 5-8 tags relevantes para o conteúdo]
Transcrição original:
{video['transcript']}"""
def process_content(texto, max_retries=3, retry_delay=60):
"""Processa o conteúdo usando a API"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "mistralai/ministral-8b",
"messages": [
{
"role": "user",
"content": texto
}
]
}
for attempt in range(max_retries):
try:
print(f"Tentativa {attempt + 1} de {max_retries}")
response = requests.post(BASE_URL, json=data, headers=headers)
print(f"Status: {response.status_code}")
if response.status_code == 200:
response_data = response.json()
if 'choices' in response_data:
return response_data['choices'][0]['message']['content'].strip()
else:
print(f"Estrutura de resposta inesperada: {response_data}")
elif response.status_code == 429: # Rate limit
wait_time = retry_delay * (attempt + 1)
print(f"Rate limit atingido. Aguardando {wait_time} segundos...")
time.sleep(wait_time)
continue
else:
print(f"Erro na API: {response.status_code}")
print(f"Resposta: {response.text}")
except Exception as e:
print(f"Erro ao processar texto: {str(e)}")
time.sleep(retry_delay)
print("Todas as tentativas falharam.")
return None
def process_file(file_path):
"""Processa um arquivo de transcrições"""
print(f"\nProcessando arquivo: {file_path}")
try:
# Ler arquivo
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extrair informações básicas
channel_name = get_channel_name(file_path)
playlist_name = extract_playlist_name(content)
print(f"Extraindo vídeos com transcrição de: {playlist_name}")
videos = extract_videos_with_transcripts(content)
if not videos:
print("Nenhum vídeo com transcrição encontrado")
return False
print(f"Encontrados {len(videos)} vídeos com transcrição")
# Criar conteúdo inicial do arquivo MD
md_content = f"""# Ficha Técnica
## Nome do Canal
{channel_name}
## Nome da Lista de Reprodução
{playlist_name}
---
"""
# Processar cada vídeo
for i, video in enumerate(videos, 1):
print(f"\nProcessando vídeo {i}/{len(videos)}: {video['title']}")
processed_content = process_content(get_processing_prompt(video))
if processed_content:
md_content += processed_content + "\n---\n\n"
print(f"Vídeo {i} processado com sucesso")
else:
print(f"Falha ao processar vídeo {i}: {video['title']}")
time.sleep(5) # Delay entre vídeos
# Criar diretório de saída
output_path = output_dir / channel_name
output_path.mkdir(parents=True, exist_ok=True)
# Gerar nome do arquivo
safe_playlist_name = re.sub(r'[<>:"/\\|?*]', '_', playlist_name)
output_file = output_path / f"{safe_playlist_name}.md"
# Salvar arquivo processado
with open(output_file, 'w', encoding='utf-8') as f:
f.write(md_content)
print(f"\nArquivo processado salvo em: {output_file}")
return True
except Exception as e:
print(f"Erro ao processar arquivo {file_path}: {str(e)}")
return False
def main():
start_time = datetime.now()
print(f"Iniciando processamento em {start_time}")
print(f"Diretório de entrada: {input_dir}")
print(f"Diretório de saída: {output_dir}")
# Criar diretório de saída
output_dir.mkdir(parents=True, exist_ok=True)
# Lista para arquivos que falharam
failed_files = []
processed_files = []
# Processar arquivos
total_files = len([f for f in input_dir.rglob('*.txt')])
print(f"\nEncontrados {total_files} arquivos para processar")
for i, file_path in enumerate(input_dir.rglob('*.txt'), 1):
print(f"\nProcessando arquivo {i}/{total_files}: {file_path}")
if process_file(file_path):
processed_files.append(file_path)
else:
failed_files.append(file_path)
time.sleep(5) # Delay entre arquivos
# Relatório final
end_time = datetime.now()
duration = end_time - start_time
print("\n=== Relatório Final ===")
print(f"Processamento iniciado em: {start_time}")
print(f"Processamento finalizado em: {end_time}")
print(f"Duração total: {duration}")
print(f"\nArquivos processados com sucesso: {len(processed_files)}")
print(f"Arquivos com falha: {len(failed_files)}")
if failed_files:
print("\nArquivos que falharam:")
for file in failed_files:
print(f"- {file}")
if processed_files:
print("\nArquivos processados com sucesso:")
for file in processed_files:
print(f"- {file}")
if __name__ == "__main__":
main()