init: scripts diversos (crawlers, conversores, scrapers)

This commit is contained in:
2026-03-05 20:38:36 +00:00
commit 6ac6f4be2a
925 changed files with 850330 additions and 0 deletions
+150
View File
@@ -0,0 +1,150 @@
"""
ai_chat_processor.py
Author: Descomplicar® Crescimento Digital
Link: https://descomplicar.pt
Copyright: 2025 Descomplicar®
"""
"""
Processador para chatbot AI integrado com Chatwoot.
"""
import json
import requests
from typing import Dict, List, Optional
from .db_processor import DBProcessor
from .embedding_processor import EmbeddingProcessor
class AIChatProcessor:
def __init__(self, chatwoot_api_key: str, chatwoot_account_id: str, chatwoot_base_url: str):
"""
Inicializa o processador de chat AI.
Args:
chatwoot_api_key: Chave API do Chatwoot
chatwoot_account_id: ID da conta Chatwoot
chatwoot_base_url: URL base da API Chatwoot (ex: https://app.chatwoot.com)
"""
self.chatwoot_api_key = chatwoot_api_key
self.chatwoot_account_id = chatwoot_account_id
self.chatwoot_base_url = chatwoot_base_url.rstrip('/')
# Inicializa processadores
self.db = DBProcessor()
self.embedding_processor = EmbeddingProcessor()
# Headers para API Chatwoot
self.headers = {
'api_access_token': chatwoot_api_key,
'Content-Type': 'application/json'
}
def _search_knowledge_base(self, query: str, limit: int = 3) -> List[Dict]:
"""
Pesquisa na base de conhecimento usando embeddings.
Args:
query: Pergunta do usuário
limit: Número máximo de resultados
Returns:
Lista de resultados mais relevantes
"""
# Gera embedding da pergunta
query_embedding = self.embedding_processor.generate_embedding(query)
# Busca resultados similares
results = self.db.search_similar_chunks(
query_embedding,
limit=limit
)
return results
def _format_response(self, query: str, results: List[Dict]) -> str:
"""
Formata resposta com base nos resultados da pesquisa.
Args:
query: Pergunta original
results: Resultados da pesquisa
Returns:
Resposta formatada
"""
if not results:
return "Desculpe, não encontrei informações relevantes sobre isso na base de conhecimento."
# Formata resposta
response = "Com base na nossa base de conhecimento:\n\n"
for i, result in enumerate(results, 1):
relevance = result.get('relevance', 0) * 100
content = result.get('content', '').strip()
source = result.get('source', 'Desconhecida')
response += f"{i}. Relevância: {relevance:.1f}%\n"
response += f"Fonte: {source}\n"
response += f"Conteúdo: {content}\n\n"
return response
def process_message(self, conversation_id: str, message: str) -> None:
"""
Processa mensagem recebida e envia resposta via Chatwoot.
Args:
conversation_id: ID da conversa no Chatwoot
message: Mensagem recebida
"""
# Pesquisa na base de conhecimento
results = self._search_knowledge_base(message)
# Formata resposta
response = self._format_response(message, results)
# Envia resposta via API Chatwoot
endpoint = f"{self.chatwoot_base_url}/api/v1/accounts/{self.chatwoot_account_id}/conversations/{conversation_id}/messages"
payload = {
'content': response,
'message_type': 'outgoing',
'private': False
}
try:
r = requests.post(
endpoint,
headers=self.headers,
data=json.dumps(payload)
)
r.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Erro ao enviar mensagem para Chatwoot: {e}")
def handle_webhook(self, data: Dict) -> Optional[str]:
"""
Processa webhook recebido do Chatwoot.
Args:
data: Dados do webhook
Returns:
ID da conversa se mensagem deve ser processada
"""
# Verifica se é mensagem de entrada
if data.get('message_type') != 'incoming':
return None
# Extrai dados relevantes
conversation_id = data.get('conversation', {}).get('id')
message = data.get('content')
if not all([conversation_id, message]):
return None
# Processa mensagem
self.process_message(conversation_id, message)
return conversation_id
+130
View File
@@ -0,0 +1,130 @@
"""
Base Processor - Processador base para documentos
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
import os
import json
from typing import Dict, List, Optional
from datetime import datetime
from abc import ABC, abstractmethod
from pathlib import Path
class BaseProcessor(ABC):
"""Classe base para processamento de documentos."""
def __init__(self, input_file: str):
"""
Inicializa o processador base.
Args:
input_file (str): Caminho para o ficheiro de entrada
"""
self.input_file = input_file
self.content = ""
self.metadata = {
"título": "",
"autor_original": "",
"data_original": "",
"tipo_documento": "",
"fonte": "",
"licença": "",
"última_atualização": datetime.now().strftime("%d-%m-%Y"),
"categoria_principal": "",
"tags": [],
"nível_técnico": "",
"idioma_original": "",
"tradutor": "Descomplicar AI",
"revisão": "1.0"
}
self.chapters = []
@abstractmethod
def read_content(self) -> str:
"""Lê o conteúdo do ficheiro."""
pass
@abstractmethod
def process_content(self) -> None:
"""Processa o conteúdo do ficheiro."""
pass
def extract_metadata(self) -> Dict:
"""Extrai metadados do documento."""
return self.metadata
def structure_content(self) -> List[Dict]:
"""Estrutura o conteúdo em capítulos."""
chapters = []
current_chapter = {"title": "", "content": "", "faqs": []}
lines = self.content.split("\n")
for line in lines:
if line.strip().startswith("#"):
if current_chapter["title"]:
chapters.append(current_chapter.copy())
current_chapter = {
"title": line.strip("# "),
"content": "",
"faqs": []
}
else:
current_chapter["content"] += line + "\n"
if current_chapter["title"]:
chapters.append(current_chapter)
self.chapters = chapters
return chapters
def generate_markdown(self) -> str:
"""Gera o documento final em Markdown."""
md_content = []
# Metadados
md_content.append("---")
for key, value in self.metadata.items():
if isinstance(value, list):
value = ", ".join(value)
md_content.append(f"{key}: {value}")
md_content.append("---\n")
# Conteúdo por capítulo
for chapter in self.chapters:
md_content.append(f"# {chapter['title']}\n")
md_content.append(chapter['content'])
if chapter['faqs']:
md_content.append("\n## FAQs\n")
for faq in chapter['faqs']:
md_content.append(f"Q: {faq['question']}")
md_content.append(f"A: {faq['answer']}\n")
md_content.append("\n---\n")
return "\n".join(md_content)
def save_output(self, output_file: str) -> None:
"""Salva o conteúdo processado."""
markdown_content = self.generate_markdown()
with open(output_file, 'w', encoding='utf-8') as f:
f.write(markdown_content)
def save_markdown(self, output_file: str) -> None:
"""
Guarda o conteúdo processado em formato Markdown.
Args:
output_file (str): Caminho para o ficheiro de saída
"""
# Criar diretório de saída se não existir
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Gerar conteúdo em Markdown
markdown_content = self.generate_markdown()
# Guardar ficheiro
with open(output_file, 'w', encoding='utf-8') as f:
f.write(markdown_content)
+132
View File
@@ -0,0 +1,132 @@
"""
Database Processor - Processador para interação com o PostgreSQL
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
import psycopg2
from psycopg2.extras import Json, execute_values
import json
from datetime import datetime
from typing import Dict, Any, Optional, List
import os
from dotenv import load_dotenv
class DBProcessor:
"""Processador para interação com o PostgreSQL."""
def __init__(self):
"""Inicializa a conexão com o banco de dados."""
# Configurações do banco de dados
self.db_config = {
'dbname': 'superbot_kb',
'user': 'superbot_user',
'password': 'KufQ4La5jaAk',
'host': 'easy.descomplicar.pt',
'port': '5433'
}
# Inicializar conexão
self.conn = None
self.cur = None
def connect(self) -> None:
"""Estabelece conexão com o banco de dados."""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cur = self.conn.cursor()
except Exception as e:
raise Exception(f"Erro ao conectar ao banco de dados: {str(e)}")
def disconnect(self) -> None:
"""Fecha a conexão com o banco de dados."""
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
def init_schema(self) -> None:
"""Inicializa o schema do banco de dados."""
try:
self.connect()
# Criar extensão para vetores se não existir
self.cur.execute("""
CREATE EXTENSION IF NOT EXISTS vector;
""")
# Criar tabela de documentos
self.cur.execute("""
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
metadata JSONB NOT NULL,
file_path VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_documents_title ON documents(title);
CREATE INDEX IF NOT EXISTS idx_documents_metadata ON documents USING GIN(metadata);
""")
# Criar tabela de embeddings
self.cur.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
chunk_text TEXT NOT NULL,
embedding vector(1536),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(document_id, chunk_index)
);
CREATE INDEX IF NOT EXISTS idx_embeddings_document_id ON embeddings(document_id);
CREATE INDEX IF NOT EXISTS idx_embeddings_embedding ON embeddings USING ivfflat (embedding vector_cosine_ops);
""")
self.conn.commit()
except Exception as e:
if self.conn:
self.conn.rollback()
raise Exception(f"Erro ao inicializar schema: {str(e)}")
finally:
self.disconnect()
def save_document(self, title: str, content: str, metadata: Dict[str, Any], file_path: str) -> int:
"""
Salva um documento no banco de dados.
Args:
title (str): Título do documento
content (str): Conteúdo do documento
metadata (Dict[str, Any]): Metadados do documento
file_path (str): Caminho do arquivo original
Returns:
int: ID do documento salvo
"""
try:
self.connect()
# Inserir documento
self.cur.execute("""
INSERT INTO documents (title, content, metadata, file_path)
VALUES (%s, %s, %s, %s)
RETURNING id;
""", (title, content, Json(metadata), file_path))
document_id = self.cur.fetchone()[0]
self.conn.commit()
return document_id
except Exception as e:
if self.conn:
self.conn.rollback()
raise Exception(f"Erro ao salvar documento: {str(e)}")
finally:
self.disconnect()
+252
View File
@@ -0,0 +1,252 @@
"""
Embedding Processor - Processador para geração e gestão de embeddings
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
import requests
import numpy as np
from typing import List, Dict, Any, Optional
import os
from datetime import datetime
import psycopg2
from psycopg2.extras import Json, execute_values
import json
import tiktoken
class EmbeddingProcessor:
"""Processador para geração e gestão de embeddings."""
def __init__(self):
"""Inicializa o processador de embeddings."""
self.api_key = "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"
self.model = "text-embedding-ada-002"
self.encoding = tiktoken.encoding_for_model(self.model)
self.max_tokens = 8000 # Deixar margem de segurança
# Configurações do banco de dados
self.db_config = {
'dbname': 'superbot_kb',
'user': 'superbot_user',
'password': 'KufQ4La5jaAk',
'host': 'easy.descomplicar.pt',
'port': '5433'
}
# Inicializar conexão
self.conn = None
self.cur = None
def connect(self) -> None:
"""Estabelece conexão com o banco de dados."""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cur = self.conn.cursor()
except Exception as e:
raise Exception(f"Erro ao conectar ao banco de dados: {str(e)}")
def disconnect(self) -> None:
"""Fecha a conexão com o banco de dados."""
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
def split_text(self, text: str) -> List[str]:
"""
Divide o texto em chunks menores respeitando o limite de tokens.
Args:
text (str): Texto para dividir
Returns:
List[str]: Lista de chunks de texto
"""
tokens = self.encoding.encode(text)
chunks = []
current_chunk = []
current_size = 0
for token in tokens:
if current_size + 1 > self.max_tokens:
# Converter tokens atuais para texto
chunk_text = self.encoding.decode(current_chunk)
chunks.append(chunk_text)
current_chunk = [token]
current_size = 1
else:
current_chunk.append(token)
current_size += 1
# Adicionar último chunk
if current_chunk:
chunk_text = self.encoding.decode(current_chunk)
chunks.append(chunk_text)
return chunks
def generate_embedding(self, text: str) -> List[float]:
"""
Gera embedding para um texto usando a API da OpenAI.
Args:
text (str): Texto para gerar embedding
Returns:
List[float]: Vetor de embedding
"""
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"input": text
}
response = requests.post(
"https://api.openai.com/v1/embeddings",
headers=headers,
json=data
)
if response.status_code != 200:
raise Exception(f"API Error: {response.status_code} - {response.text}")
return response.json()["data"][0]["embedding"]
except Exception as e:
raise Exception(f"Erro ao gerar embedding: {str(e)}")
def update_document_embeddings(self, document_id: int) -> None:
"""
Atualiza os embeddings de um documento.
Args:
document_id (int): ID do documento
"""
try:
# Conectar ao banco de dados
self.connect()
# Buscar conteúdo do documento
self.cur.execute("""
SELECT content FROM documents
WHERE id = %s
""", (document_id,))
result = self.cur.fetchone()
if not result:
raise Exception(f"Documento não encontrado: {document_id}")
content = result[0]
# Dividir em chunks
chunks = self.split_text(content)
print(f"Documento dividido em {len(chunks)} chunks")
# Gerar embeddings para cada chunk
embeddings_data = []
for i, chunk in enumerate(chunks, 1):
print(f"Processando chunk {i}/{len(chunks)}...")
embedding = self.generate_embedding(chunk)
embeddings_data.append({
'document_id': document_id,
'chunk_index': i-1,
'chunk_text': chunk,
'embedding': embedding
})
# Salvar embeddings
self.cur.execute("""
DELETE FROM embeddings
WHERE document_id = %s
""", (document_id,))
execute_values(
self.cur,
"""
INSERT INTO embeddings (document_id, chunk_index, chunk_text, embedding)
VALUES %s
""",
[(
d['document_id'],
d['chunk_index'],
d['chunk_text'],
d['embedding']
) for d in embeddings_data]
)
self.conn.commit()
except Exception as e:
if self.conn:
self.conn.rollback()
raise Exception(f"Erro ao atualizar embeddings: {str(e)}")
finally:
self.disconnect()
def search_similar(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""
Busca documentos similares à query.
Args:
query (str): Texto para buscar
limit (int): Número máximo de resultados
Returns:
List[Dict[str, Any]]: Lista de documentos similares
"""
try:
# Gerar embedding para a query
query_embedding = self.generate_embedding(query)
# Conectar ao banco de dados
self.connect()
# Buscar chunks mais similares
self.cur.execute("""
WITH similarity AS (
SELECT
d.id as document_id,
d.title,
d.metadata,
e.chunk_text,
1 - (e.embedding <=> %s) as similarity
FROM embeddings e
JOIN documents d ON e.document_id = d.id
ORDER BY similarity DESC
LIMIT 20
)
SELECT
document_id,
title,
metadata,
array_agg(chunk_text ORDER BY similarity DESC) as chunks,
max(similarity) as doc_similarity
FROM similarity
GROUP BY document_id, title, metadata
ORDER BY doc_similarity DESC
LIMIT %s
""", (query_embedding, limit))
results = []
for row in self.cur.fetchall():
results.append({
'document_id': row[0],
'title': row[1],
'metadata': row[2],
'relevant_chunks': row[3],
'doc_similarity': row[4]
})
return results
except Exception as e:
raise Exception(f"Erro ao buscar documentos: {str(e)}")
finally:
self.disconnect()
+81
View File
@@ -0,0 +1,81 @@
"""
lead_distributor.py
Author: Descomplicar® Crescimento Digital
Link: https://descomplicar.pt
Copyright: 2025 Descomplicar®
"""
"""
Processador para distribuição inteligente de leads baseado em taxa de conversão.
"""
from typing import Dict, List
class LeadDistributor:
def __init__(self):
self.vendedores: Dict[str, float] = {} # {id_vendedor: taxa_conversao}
self.leads_distribuidos: Dict[str, int] = {} # {id_vendedor: qtd_leads}
def adicionar_vendedor(self, id_vendedor: str, taxa_conversao: float) -> None:
"""Adiciona ou atualiza vendedor com sua taxa de conversão."""
self.vendedores[id_vendedor] = taxa_conversao
if id_vendedor not in self.leads_distribuidos:
self.leads_distribuidos[id_vendedor] = 0
def calcular_quota_leads(self, id_vendedor: str) -> float:
"""Calcula a quota de leads baseada na taxa de conversão."""
taxa = self.vendedores.get(id_vendedor, 0)
# Baixa conversão: recebe menos leads
if taxa < 0.3: # 30% de conversão como limite inferior
return 0.8 # 80% da quota normal
# Alta conversão: recebe mais leads
if taxa > 0.5: # 50% de conversão como limite superior
return 1.2 # 120% da quota normal
# Conversão média: quota normal
return 1.0
def distribuir_leads(self, leads: List[dict]) -> Dict[str, List[dict]]:
"""
Distribui leads entre vendedores baseado em suas taxas de conversão.
Args:
leads: Lista de dicionários contendo informações dos leads
Returns:
Dicionário com leads distribuídos por vendedor
"""
if not self.vendedores:
raise ValueError("Nenhum vendedor registrado no sistema")
distribuicao: Dict[str, List[dict]] = {id_v: [] for id_v in self.vendedores}
# Calcula quotas totais para distribuição proporcional
quotas = {id_v: self.calcular_quota_leads(id_v) for id_v in self.vendedores}
total_quotas = sum(quotas.values())
# Distribui leads proporcionalmente às quotas
for lead in leads:
# Encontra vendedor com menos leads relativos à sua quota
vendedor_escolhido = min(
self.vendedores.keys(),
key=lambda v: len(distribuicao[v]) / quotas[v]
)
distribuicao[vendedor_escolhido].append(lead)
self.leads_distribuidos[vendedor_escolhido] += 1
return distribuicao
def obter_estatisticas(self) -> Dict[str, dict]:
"""Retorna estatísticas de distribuição de leads por vendedor."""
return {
id_v: {
"taxa_conversao": self.vendedores[id_v],
"leads_recebidos": self.leads_distribuidos[id_v],
"quota_atual": self.calcular_quota_leads(id_v)
}
for id_v in self.vendedores
}
+142
View File
@@ -0,0 +1,142 @@
"""
PDF Processor - Módulo para processamento de documentos PDF
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
from PyPDF2 import PdfReader
from typing import List, Dict
from datetime import datetime
from openai import OpenAI
from .base_processor import BaseProcessor
class PDFProcessor(BaseProcessor):
"""Processador específico para documentos PDF."""
def __init__(self, filepath: str):
"""
Inicializa o processador PDF.
Args:
filepath (str): Caminho para o arquivo PDF
"""
super().__init__(filepath)
self.filepath = filepath
self.pdf_reader = None
self.num_pages = 0
self.content = ""
# Configurar cliente OpenAI para traduções
self.translator = OpenAI(
api_key="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"
)
# Atualizar metadata
self.metadata.update({
"tipo_documento": "pdf",
"fonte": filepath,
"data_original": datetime.now().strftime("%d-%m-%Y")
})
def read_content(self) -> str:
"""
Lê o conteúdo do PDF.
Returns:
str: Conteúdo do PDF
"""
try:
# Abrir PDF
self.pdf_reader = PdfReader(self.filepath)
self.num_pages = len(self.pdf_reader.pages)
# Extrair texto por página
content = []
for page in self.pdf_reader.pages:
text = page.extract_text()
if text.strip():
content.append(text)
# Juntar todo o conteúdo
self.content = "\n\n".join(content)
return self.content
except Exception as e:
print(f"Erro ao ler PDF: {str(e)}")
return ""
def process_content(self) -> None:
"""Processa o conteúdo do PDF."""
try:
if not self.content:
self.read_content()
# Traduzir conteúdo se necessário
response = self.translator.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "És um tradutor profissional. Traduz o texto para português de Portugal, mantendo termos técnicos em inglês quando apropriado."},
{"role": "user", "content": self.content}
]
)
translated_text = response.choices[0].message.content
self.content = translated_text
# Gerar FAQs do conteúdo
self.faqs = self.generate_faqs(self.content)
# Atualizar metadata
self.metadata.update({
"num_paginas": self.num_pages,
"num_faqs": len(self.faqs)
})
except Exception as e:
print(f"Erro ao processar PDF: {str(e)}")
def generate_faqs(self, content: str, num_faqs: int = 3) -> List[Dict[str, str]]:
"""
Gera FAQs a partir do conteúdo.
Args:
content (str): Texto para gerar FAQs
num_faqs (int): Número de FAQs a gerar
Returns:
List[Dict[str, str]]: Lista de FAQs geradas
"""
try:
# Limitar tamanho do conteúdo
max_chars = 2000
if len(content) > max_chars:
content = content[:max_chars] + "..."
# Gerar FAQs usando OpenAI
response = self.translator.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "És um especialista em documentação técnica. Gera FAQs relevantes em português de Portugal."},
{"role": "user", "content": f"Gera {num_faqs} FAQs técnicas e diretas sobre:\n\n{content}"}
]
)
# Processar resposta
raw_faqs = response.choices[0].message.content.strip().split('\n\n')
faqs = []
for raw_faq in raw_faqs[:num_faqs]:
if raw_faq.startswith('P:'):
parts = raw_faq.split('\nR:')
if len(parts) == 2:
question = parts[0].replace('P:', '').strip()
answer = parts[1].strip()
faqs.append({
'question': question,
'answer': answer
})
return faqs
except Exception as e:
print(f"Erro ao gerar FAQs: {str(e)}")
return []
+399
View File
@@ -0,0 +1,399 @@
"""
Text Corrector - Processador para correção e formatação de texto
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
import httpx
import json
from typing import Dict, Any, Optional
import os
from pathlib import Path
import time
import re
import concurrent.futures
class TextCorrector:
"""Processador para correção e formatação de texto usando OpenRouter."""
def __init__(self):
"""Inicializa o corretor de texto."""
self.api_key = "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"
self.api_url = "https://openrouter.ai/api/v1/chat/completions"
self.model = "mistralai/mistral-7b-instruct" # ID correto do modelo
self.max_chunk_size = 12000 # Mistral suporta contextos maiores
# Prompt otimizado para o Mistral
self.system_prompt_correction = """Você é um revisor especializado em melhorar documentação técnica em Markdown.
Corrija e formate o texto mantendo estas regras:
1. FORMATAÇÃO MARKDOWN:
- Use # para títulos e subtítulos
- Use ** para negrito em termos importantes
- Use ` para código inline
- Use ``` para blocos de código
- Use > para citações
- Use - ou * para listas
- Use --- para separadores
- Preserve todas as URLs existentes
2. ESTRUTURA:
- Organize em seções com títulos claros
- Use parágrafos para separar ideias
- Mantenha listas e enumerações
- Preserve a hierarquia de títulos
3. CONTEÚDO:
- Corrija erros de pontuação e gramática
- Mantenha todos os termos técnicos inalterados
- Preserve comandos e códigos exatamente como estão
- Mantenha URLs e referências intactas
Retorne o texto mantendo toda a formatação Markdown."""
# Prompt de tradução otimizado
self.system_prompt_translation = """Traduza para português de Portugal mantendo a formatação Markdown.
1. FORMATAÇÃO:
- Preserve todos os elementos Markdown (# ** ` [] () etc)
- Mantenha a estrutura de títulos e seções
- Preserve blocos de código sem traduzir
2. TRADUÇÃO:
- Use português europeu (não brasileiro)
- Mantenha termos técnicos em inglês
- Preserve comandos e códigos inalterados
- Mantenha URLs intactas
Retorne o texto traduzido com toda a formatação Markdown."""
# Prompt PT-PT otimizado
self.system_prompt_ptpt = """Converta para português de Portugal mantendo a formatação Markdown.
1. FORMATAÇÃO:
- Preserve todos os elementos Markdown (# ** ` [] () etc)
- Mantenha a estrutura de títulos e seções
- Preserve blocos de código sem alterar
2. CONVERSÃO:
- Substitua brasileirismos por termos portugueses
- Use terminologia técnica portuguesa
- Mantenha comandos e códigos inalterados
- Preserve URLs e referências
Retorne o texto convertido com toda a formatação Markdown."""
# Compilar expressões regulares para melhor performance
self.space_pattern = re.compile(r'\s+')
self.punct_pattern = re.compile(r'([.,!?;:])\s*')
self.newline_pattern = re.compile(r'\n{3,}')
self.url_pattern = re.compile(r'https?://\S+')
# Cache para resultados
self._cache = {}
def is_english(self, text: str) -> bool:
"""
Verifica se o texto está em inglês.
Usa uma heurística baseada em palavras e expressões comuns.
"""
# Palavras muito comuns em inglês que raramente aparecem em português
english_words = {
'the', 'this', 'these', 'those', 'which', 'what', 'where', 'when', 'why',
'how', 'who', 'whom', 'whose', 'that', 'there', 'here', 'they', 'them',
'their', 'his', 'her', 'its', 'our', 'your', 'my', 'we', 'you', 'he',
'she', 'it', 'they', 'am', 'is', 'are', 'was', 'were', 'been', 'being'
}
# Palavras comuns em português (pt-BR e pt-PT)
portuguese_words = {
'de', 'da', 'do', 'das', 'dos', 'em', 'no', 'na', 'nos', 'nas',
'um', 'uma', 'uns', 'umas', 'que', 'qual', 'quais', 'quando',
'onde', 'como', 'por', 'para', 'pelo', 'pela', 'pelos', 'pelas',
'este', 'esta', 'estes', 'estas', 'esse', 'essa', 'esses', 'essas',
'isto', 'isso', 'aquilo', 'também', 'mas', 'ou', 'porque', 'pois',
'', '', 'ainda', 'mesmo', 'assim', 'então', 'agora', 'depois'
}
# Converter texto para minúsculas e dividir em palavras
words = set(re.findall(r'\b\w+\b', text.lower()))
# Contar ocorrências de palavras em cada idioma
english_count = len(words.intersection(english_words))
portuguese_count = len(words.intersection(portuguese_words))
# Se tiver significativamente mais palavras em inglês, considera como inglês
# Ajustado para evitar falsos positivos com português
return english_count > portuguese_count * 1.5 and english_count > 5
def is_brazilian(self, text: str) -> bool:
"""
Verifica se o texto está em português do Brasil.
"""
# Palavras e expressões típicas do português brasileiro
br_words = {
'você', 'vocês', 'pra', '', '', 'cara', 'beleza', 'legal',
'valeu', 'bacana', 'gente', 'galera', 'pessoal', 'aqui', 'agora',
'então', 'tipo', 'tava', 'tavam', 'vamo', 'vamos', 'cadê',
'', 'daí', 'meu', 'mano', 'brother', 'mina', 'cara', 'véi',
'massa', 'maneiro', 'show', 'tranquilo', 'suave', 'firmeza'
}
# Converter texto para minúsculas e dividir em palavras
words = set(re.findall(r'\b\w+\b', text.lower()))
# Se encontrar algumas palavras típicas do BR, considera como pt-BR
br_count = len(words.intersection(br_words))
return br_count >= 3 # Se encontrar 3 ou mais palavras típicas do BR
def correct_text(self, text: str, max_retries: int = 3) -> str:
"""
Corrige e formata um texto usando a API da OpenRouter.
Args:
text (str): Texto para corrigir
max_retries (int): Número máximo de tentativas em caso de erro
Returns:
str: Texto corrigido e formatado
"""
# Verificar cache
if text in self._cache:
return self._cache[text]
# Verificar idioma
is_eng = self.is_english(text)
is_br = self.is_brazilian(text)
if is_eng:
print("Texto em inglês detectado, será traduzido para português...")
elif is_br:
print("Texto em português do Brasil detectado, será convertido para português de Portugal...")
else:
print("Processando texto em português de Portugal...")
# Dividir texto em chunks se necessário
if len(text) > self.max_chunk_size:
print("Texto muito grande, dividindo em partes...")
chunks = self.split_text(text)
processed_chunks = []
for i, chunk in enumerate(chunks, 1):
print(f"Processando parte {i} de {len(chunks)}...")
if is_eng:
# Se for inglês: corrige -> traduz
corrected = self._process_chunk(chunk, max_retries, self.system_prompt_correction)
processed = self._process_chunk(corrected, max_retries, self.system_prompt_translation)
elif is_br:
# Se for pt-BR: corrige -> converte para pt-PT
corrected = self._process_chunk(chunk, max_retries, self.system_prompt_correction)
processed = self._process_chunk(corrected, max_retries, self.system_prompt_ptpt)
else:
# Se for pt-PT: apenas corrige
processed = self._process_chunk(chunk, max_retries, self.system_prompt_correction)
processed_chunks.append(processed)
# Aplicar correções adicionais
result = "\n\n".join(processed_chunks)
result = self.correct_text_additions(result)
# Armazenar no cache
self._cache[text] = result
return result
# Processar texto completo
if is_eng:
corrected = self._process_chunk(text, max_retries, self.system_prompt_correction)
result = self._process_chunk(corrected, max_retries, self.system_prompt_translation)
elif is_br:
corrected = self._process_chunk(text, max_retries, self.system_prompt_correction)
result = self._process_chunk(corrected, max_retries, self.system_prompt_ptpt)
else:
result = self._process_chunk(text, max_retries, self.system_prompt_correction)
# Aplicar correções adicionais
result = self.correct_text_additions(result)
# Armazenar no cache
self._cache[text] = result
return result
def correct_text_additions(self, text: str) -> str:
"""Aplica correções adicionais ao texto."""
# Preservar URLs
urls = self.url_pattern.findall(text)
for i, url in enumerate(urls):
text = text.replace(url, f"__URL_{i}__")
# Aplicar correções
text = text.strip()
text = self.space_pattern.sub(' ', text)
text = self.punct_pattern.sub(r'\1 ', text)
text = self.newline_pattern.sub('\n\n', text)
# Restaurar URLs
for i, url in enumerate(urls):
text = text.replace(f"__URL_{i}__", url)
return text
def split_text(self, text: str) -> list[str]:
"""Divide o texto em chunks menores."""
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
word_length = len(word) + 1 # +1 para o espaço
if current_length + word_length > self.max_chunk_size:
chunks.append(' '.join(current_chunk))
current_chunk = [word]
current_length = word_length
else:
current_chunk.append(word)
current_length += word_length
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def _process_chunk(self, text: str, max_retries: int, system_prompt: str) -> str:
"""Processa um chunk de texto."""
headers = {
"HTTP-Referer": "https://www.descomplicar.pt/",
"X-Title": "SuperBot KB",
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Corrija e formate este texto:\n\n{text}"}
]
}
for attempt in range(max_retries):
try:
with httpx.Client(timeout=120.0) as client:
response = client.post(
self.api_url,
headers=headers,
json=data
)
if response.status_code == 200:
result = response.json()
# Tentar diferentes formatos de resposta
if 'choices' in result and len(result['choices']) > 0:
if 'message' in result['choices'][0]:
return result['choices'][0]['message']['content']
elif 'text' in result['choices'][0]:
return result['choices'][0]['text']
elif 'response' in result:
return result['response']
print(f"Formato de resposta inesperado: {result}")
raise Exception("Formato de resposta inválido")
elif response.status_code == 401:
print(f"Erro de autenticação. Verifique a chave da API.")
raise Exception("Erro de autenticação na OpenRouter")
else:
print(f"Erro {response.status_code}: {response.text}")
if attempt < max_retries - 1:
time.sleep(5)
continue
except Exception as e:
print(f"Erro ao processar texto: {str(e)}")
if attempt < max_retries - 1:
time.sleep(5)
continue
raise Exception("Falha ao processar texto após várias tentativas")
def process_file(self, input_file: str, output_file: str) -> None:
"""
Processa um arquivo de texto, corrigindo e formatando seu conteúdo.
Args:
input_file (str): Caminho do arquivo de entrada
output_file (str): Caminho do arquivo de saída
"""
try:
# Ler arquivo de entrada
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
# Corrigir texto
print(f"Processando texto de {input_file}...")
corrected_text = self.correct_text(content)
# Salvar resultado
with open(output_file, 'w', encoding='utf-8') as f:
f.write(corrected_text)
print(f"Texto processado salvo em {output_file}")
except Exception as e:
raise Exception(f"Erro ao processar arquivo: {str(e)}")
def process_directory(self, input_dir: str, output_dir: str) -> None:
"""
Processa todos os arquivos .txt em um diretório.
Args:
input_dir (str): Diretório com os arquivos originais
output_dir (str): Diretório para salvar os arquivos corrigidos
"""
from pathlib import Path
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
def process_file(file: Path) -> None:
try:
# Ler arquivo
text = file.read_text(encoding='utf-8')
# Corrigir texto
corrected = self.correct_text(text)
# Salvar resultado
output_file = output_path / f"corrigido_{file.name}"
output_file.write_text(corrected, encoding='utf-8')
except Exception as e:
print(f"Erro ao processar {file.name}: {str(e)}")
# Processar arquivos em paralelo
with concurrent.futures.ThreadPoolExecutor() as executor:
files = list(input_path.glob('*.txt'))
executor.map(process_file, files)
if __name__ == "__main__":
# Criar uma instância do corretor
corrector = TextCorrector()
# Solicitar texto do usuário
print("Digite ou cole o texto a ser corrigido (Ctrl+D para terminar):")
try:
texto = ""
while True:
linha = input()
texto += linha + "\n"
except EOFError:
pass
# Corrigir e mostrar o resultado
texto_corrigido = corrector.correct_text(texto)
print("\nTexto Corrigido:")
print("="*50)
print(texto_corrigido)
+97
View File
@@ -0,0 +1,97 @@
"""
Text Processor - Processador para ficheiros TXT
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
import os
from datetime import datetime
from typing import Dict, Any
import re
class TXTProcessor:
"""Processador para ficheiros TXT."""
def __init__(self, file_path: str):
"""
Inicializa o processador.
Args:
file_path (str): Caminho para o ficheiro TXT
"""
self.file_path = file_path
self.content = ""
self.metadata = {}
def process_content(self) -> None:
"""Processa o conteúdo do ficheiro TXT."""
try:
# Ler o ficheiro
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extrair metadados do início do ficheiro se existirem
metadata_match = re.match(r'^---\n(.*?)\n---\n(.*)', content, re.DOTALL)
if metadata_match:
# Processar metadados em formato YAML
metadata_text = metadata_match.group(1)
self.content = metadata_match.group(2).strip()
# Extrair metadados
for line in metadata_text.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
self.metadata[key.strip()] = value.strip()
else:
# Se não houver metadados, usar todo o conteúdo
self.content = content.strip()
# Gerar metadados automáticos
self._generate_metadata()
except Exception as e:
raise Exception(f"Erro ao processar ficheiro TXT: {str(e)}")
def _generate_metadata(self) -> None:
"""Gera metadados automáticos."""
# Nome do ficheiro como título se não especificado
if 'título' not in self.metadata:
self.metadata['título'] = os.path.splitext(os.path.basename(self.file_path))[0]
# Metadados padrão
self.metadata.update({
'autor_original': self.metadata.get('autor_original', 'Descomplicar AI'),
'data_original': self.metadata.get('data_original', datetime.now().strftime('%d-%m-%Y')),
'tipo_documento': self.metadata.get('tipo_documento', 'Documento de Texto'),
'fonte': self.metadata.get('fonte', 'Descomplicar - Agência de Aceleração Digital'),
'licença': self.metadata.get('licença', 'Copyright Descomplicar'),
'última_atualização': datetime.now().strftime('%d-%m-%Y'),
'categoria_principal': self.metadata.get('categoria_principal', 'Documentação Técnica'),
'tags': self.metadata.get('tags', []),
'nível_técnico': self.metadata.get('nível_técnico', 'Básico'),
'idioma_original': self.metadata.get('idioma_original', 'Português'),
'tradutor': self.metadata.get('tradutor', 'Descomplicar AI'),
'revisão': self.metadata.get('revisão', '1.0')
})
def save_markdown(self, output_file: str) -> None:
"""
Salva o conteúdo processado em formato markdown.
Args:
output_file (str): Caminho para o ficheiro de saída
"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
# Adicionar metadados
f.write('---\n')
for key, value in self.metadata.items():
f.write(f'{key}: {value}\n')
f.write('---\n\n')
# Adicionar conteúdo
f.write(self.content)
except Exception as e:
raise Exception(f"Erro ao salvar markdown: {str(e)}")
+213
View File
@@ -0,0 +1,213 @@
"""
Web Processor - Módulo para processamento de conteúdo web
Descomplicar - Agência de Aceleração Digital
https://www.descomplicar.pt
"""
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from typing import List, Dict
from datetime import datetime
from openai import OpenAI
from .base_processor import BaseProcessor
import time
class WebProcessor(BaseProcessor):
"""Processador específico para conteúdo web."""
def __init__(self, url: str):
"""
Inicializa o processador web.
Args:
url (str): URL do conteúdo web
"""
super().__init__(url)
self.url = url
self.base_domain = urlparse(url).netloc
self.soup = None
self.links = []
self.images = []
# Configurar cliente OpenAI para traduções
self.translator = OpenAI(
api_key="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"
)
# Atualizar metadata
self.metadata.update({
"tipo_documento": "web",
"fonte": url,
"data_original": datetime.now().strftime("%d-%m-%Y")
})
def translate_batch(self, texts: List[str]) -> List[str]:
"""
Traduz um lote de textos de uma vez.
Args:
texts (List[str]): Lista de textos para traduzir
Returns:
List[str]: Lista de textos traduzidos
"""
if not texts:
return []
# Juntar textos com marcadores
combined_text = "\n---SPLIT---\n".join(texts)
try:
response = self.translator.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Traduza o seguinte texto para português de Portugal. Mantenha termos técnicos em inglês quando apropriado. Mantenha os marcadores ---SPLIT--- para separar os textos:"},
{"role": "user", "content": combined_text}
]
)
# Separar textos traduzidos
translated = response.choices[0].message.content.split("\n---SPLIT---\n")
return [t.strip() for t in translated]
except Exception as e:
print(f"Erro ao traduzir textos: {str(e)}")
return texts
def read_content(self) -> str:
"""
Lê o conteúdo da URL.
Returns:
str: Conteúdo da página web
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(self.url, headers=headers, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
print(f"Erro ao ler URL {self.url}: {str(e)}")
return ""
def process_content(self):
"""Processa o conteúdo web."""
start_time = time.time()
# Definir timeout de 5 minutos
timeout = 300
html = self.read_content()
if not html:
return
self.soup = BeautifulSoup(html, 'html.parser')
# Extrair título
title = self.soup.title.string if self.soup.title else "Sem título"
self.metadata["título"] = title.strip()
# Extrair texto principal
self.content = self._extract_main_content()
# Extrair links importantes
self._extract_links()
# Dividir em secções
self._split_into_sections()
# Verificar timeout
if time.time() - start_time > timeout:
print("Tempo limite excedido. Interrompendo processamento.")
return
def _extract_main_content(self) -> str:
"""
Extrai o conteúdo principal da página.
Returns:
str: Texto principal da página
"""
# Remover elementos indesejados
for elem in self.soup.select('script, style, nav, footer, header, .sidebar, .menu, .ads'):
elem.decompose()
# Tentar encontrar o conteúdo principal
main_content = None
for selector in ['article', 'main', '.content', '.main-content', '#content', '#main']:
main_content = self.soup.select_one(selector)
if main_content:
break
# Se não encontrar conteúdo principal, usar body
if not main_content:
main_content = self.soup.body
# Extrair texto
if main_content:
# Extrair apenas parágrafos e cabeçalhos relevantes
elements = main_content.find_all(['p', 'h1', 'h2', 'h3', 'li'])
texts = [elem.get_text(strip=True) for elem in elements]
texts = [t for t in texts if len(t) > 20] # Filtrar textos muito curtos
# Traduzir em lotes de 5 textos
batch_size = 5
translated_texts = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
translated_batch = self.translate_batch(batch)
translated_texts.extend(translated_batch)
return '\n\n'.join(translated_texts)
return ""
def _extract_links(self):
"""Extrai links importantes da página."""
main_content = self.soup.select_one('article, main, .content, #content')
if not main_content:
return
for link in main_content.find_all('a', href=True):
href = link.get('href')
text = link.get_text(strip=True)
if href and text and len(text) > 5: # Ignorar links muito curtos
absolute_url = urljoin(self.url, href)
if urlparse(absolute_url).netloc == self.base_domain:
self.links.append({
'text': text,
'url': absolute_url
})
def _split_into_sections(self):
"""Divide o conteúdo em secções baseado em cabeçalhos."""
if not self.content:
return
# Dividir por linhas vazias para encontrar parágrafos
paragraphs = [p.strip() for p in self.content.split('\n\n') if p.strip()]
current_section = {
'title': self.metadata['título'],
'content': '',
'faqs': []
}
for p in paragraphs:
# Se o parágrafo parece um título (curto e termina sem pontuação)
if len(p) < 100 and not p[-1] in '.!?':
if current_section['content']:
self.chapters.append(current_section)
current_section = {
'title': p,
'content': '',
'faqs': []
}
else:
current_section['content'] += p + '\n\n'
if current_section['content']:
self.chapters.append(current_section)
+243
View File
@@ -0,0 +1,243 @@
"""
youtube_processor.py
Author: Descomplicar® Crescimento Digital
Link: https://descomplicar.pt
Copyright: 2025 Descomplicar®
"""
"""
YouTube Processor
Processa vídeos, playlists e canais do YouTube extraindo suas transcrições.
"""
from youtube_transcript_api import YouTubeTranscriptApi
import yt_dlp
import re
from datetime import datetime
import os
from typing import Dict, List, Optional, Tuple, Callable
from .base_processor import BaseProcessor
from .text_corrector import TextCorrector
class YouTubeProcessor(BaseProcessor):
"""Processador para conteúdo do YouTube."""
def __init__(self, url: str, progress_callback: Callable[[str, int], None] = None):
"""
Inicializa o processador com a URL do YouTube.
Args:
url: URL do vídeo/playlist do YouTube
progress_callback: Função de callback para atualizar o progresso
Recebe (fase: str, progresso: int)
"""
super().__init__(url)
self.url = url
self.info = None
self.transcripts = []
self.metadata = {}
self.text_corrector = TextCorrector()
self.progress_callback = progress_callback or (lambda fase, prog: None)
# Define as fases do processamento
self.fases = {
'extract_id': 'Extraindo ID do vídeo',
'get_info': 'Obtendo informações',
'process_video': 'Processando vídeo',
'get_transcript': 'Obtendo transcrição',
'correct_text': 'Corrigindo texto',
'update_metadata': 'Atualizando metadados'
}
def extract_video_id(self, url: str) -> str:
"""Extrai o ID do vídeo da URL do YouTube."""
patterns = [
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})',
r'(?:embed\/)([0-9A-Za-z_-]{11})'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def get_video_info(self, url: str) -> Dict:
"""Obtém informações do vídeo usando yt-dlp."""
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
return ydl.extract_info(url, download=False)
except Exception as e:
print(f"Erro ao obter informações do vídeo: {e}")
return None
def process_video(self, video_id: str, language: str = None) -> Optional[Dict]:
"""Processa um único vídeo do YouTube."""
try:
self.progress_callback('get_transcript', 0)
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
self.progress_callback('get_transcript', 20)
# Tenta obter a transcrição na língua desejada
if language:
try:
transcript = transcript_list.find_transcript([language])
except:
transcript = transcript_list.find_transcript(['en', 'pt'])
transcript = transcript.translate(language)
else:
transcript = transcript_list.find_transcript(['pt', 'en'])
self.progress_callback('get_transcript', 40)
transcript_data = transcript.fetch()
self.progress_callback('get_transcript', 60)
full_text = ""
for entry in transcript_data:
text = entry.get('text', '').strip()
if text:
full_text += text + " "
self.progress_callback('correct_text', 0)
corrected_text = self.text_corrector.correct_text(full_text)
self.progress_callback('correct_text', 100)
return {
'video_id': video_id,
'language': transcript.language,
'content': corrected_text
}
except Exception as e:
print(f"Erro ao processar transcrição: {str(e)}")
return None
def process_content(self, progress_callback=None) -> None:
"""
Processa o conteúdo do YouTube.
Args:
progress_callback: Função opcional para reportar progresso.
Se fornecido, substitui o callback definido no construtor.
"""
# Usa o callback fornecido ou o definido no construtor
callback = progress_callback or self.progress_callback
# Extrai ID do vídeo
callback('extract_id', 0)
video_id = self.extract_video_id(self.url)
if not video_id:
raise ValueError("URL do YouTube inválida")
callback('extract_id', 100)
# Obtém informações do vídeo
callback('get_info', 0)
self.info = self.get_video_info(self.url)
if not self.info:
raise ValueError("Não foi possível obter informações do vídeo")
callback('get_info', 100)
# Processa vídeo único
if '_type' not in self.info or self.info['_type'] == 'video':
callback('process_video', 0)
transcript = self.process_video(video_id)
if transcript:
self.transcripts.append(transcript)
callback('process_video', 100)
# Processa playlist
elif self.info['_type'] == 'playlist':
total_videos = len(self.info['entries'])
for i, entry in enumerate(self.info['entries'], 1):
progress = int((i-1) * 100 / total_videos)
callback('process_video', progress)
video_id = entry['id']
transcript = self.process_video(video_id)
if transcript:
self.transcripts.append(transcript)
callback('process_video', 100)
# Atualiza metadados
callback('update_metadata', 0)
self.metadata = {
'title': self.info.get('title', ''),
'uploader': self.info.get('uploader', ''),
'upload_date': self.info.get('upload_date', ''),
'videos': []
}
for transcript in self.transcripts:
video_url = f"https://www.youtube.com/watch?v={transcript['video_id']}"
video_info = {
'url': video_url,
'language': transcript['language']
}
self.metadata['videos'].append(video_info)
callback('update_metadata', 100)
def read_content(self) -> str:
"""
Método abstrato implementado para ler o conteúdo do YouTube.
Retorna o conteúdo combinado de todas as transcrições.
"""
if not self.transcripts:
self.process_content()
if not self.transcripts:
return "Nenhuma transcrição encontrada."
# Combina todas as transcrições em um único texto
combined_text = []
for transcript in self.transcripts:
video_url = f"https://www.youtube.com/watch?v={transcript['video_id']}"
combined_text.extend([
f"\n## Vídeo: {video_url}",
f"Idioma original: {transcript['language']}\n",
transcript['content'],
"\n---\n"
])
return '\n'.join(combined_text)
def to_markdown(self) -> str:
"""Converte as transcrições para formato markdown."""
if not self.info or not self.transcripts:
return "Nenhum conteúdo processado."
md_lines = [
f"# {self.metadata['title']}",
f"\nCanal: {self.metadata['uploader']}",
f"Data: {self.metadata['upload_date']}\n",
"## Transcrições\n"
]
for transcript in self.transcripts:
video_url = f"https://www.youtube.com/watch?v={transcript['video_id']}"
md_lines.extend([
f"### Vídeo: {video_url}",
f"Idioma: {transcript['language']}\n",
transcript['content'],
"\n---\n"
])
return '\n'.join(md_lines)
def to_json(self) -> Dict:
"""Retorna os dados em formato JSON."""
return {
'metadata': self.metadata,
'transcripts': self.transcripts
}