""" reddit_scraper.py Author: Descomplicar® Crescimento Digital Link: https://descomplicar.pt Copyright: 2025 Descomplicar® """ import os import json import logging import time from pathlib import Path from typing import List, Dict, Optional from datetime import datetime import praw from dotenv import load_dotenv # Carregar variáveis de ambiente load_dotenv() # Configurar logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('reddit_scraper.log'), logging.StreamHandler() ] ) class RedditScraper: def __init__(self, output_dir: str = "output_md"): """ Inicializa o scraper Reddit com credenciais da API oficial. Requer variáveis de ambiente: - REDDIT_CLIENT_ID - REDDIT_CLIENT_SECRET - REDDIT_USER_AGENT """ self.output_dir = output_dir os.makedirs(output_dir, exist_ok=True) # Validar credenciais self.client_id = os.getenv("REDDIT_CLIENT_ID") self.client_secret = os.getenv("REDDIT_CLIENT_SECRET") self.user_agent = os.getenv("REDDIT_USER_AGENT", "ScraperBot/1.0") if not self.client_id or not self.client_secret: raise ValueError( "Credenciais Reddit não encontradas. " "Define REDDIT_CLIENT_ID e REDDIT_CLIENT_SECRET no .env" ) # Inicializar cliente Reddit self.reddit = praw.Reddit( client_id=self.client_id, client_secret=self.client_secret, user_agent=self.user_agent ) logging.info("Reddit API inicializada com sucesso") def scrape_subreddit( self, subreddit_name: str, limit: int = 100, time_filter: str = "all", sort_by: str = "hot" ) -> Dict: """ Extrai posts de um subreddit. Args: subreddit_name: Nome do subreddit (sem r/) limit: Número máximo de posts (padrão: 100) time_filter: Filtro temporal - all, year, month, week, day sort_by: hot, new, top, rising Returns: Dict com metadados e posts """ try: logging.info(f"Iniciando scraping de r/{subreddit_name}") subreddit = self.reddit.subreddit(subreddit_name) # Escolher método de ordenação if sort_by == "hot": posts = subreddit.hot(limit=limit) elif sort_by == "new": posts = subreddit.new(limit=limit) elif sort_by == "top": posts = subreddit.top(time_filter=time_filter, limit=limit) elif sort_by == "rising": posts = subreddit.rising(limit=limit) else: logging.warning(f"Sort inválido '{sort_by}', usando 'hot'") posts = subreddit.hot(limit=limit) # Extrair dados posts_data = [] for post in posts: try: post_data = { "title": post.title, "author": str(post.author), "score": post.score, "url": post.url, "permalink": f"https://reddit.com{post.permalink}", "created_utc": post.created_utc, "created_date": datetime.fromtimestamp(post.created_utc).isoformat(), "num_comments": post.num_comments, "selftext": post.selftext, "is_self": post.is_self, "link_flair_text": post.link_flair_text, "upvote_ratio": post.upvote_ratio } # Extrair top comments se existirem if post.num_comments > 0: post.comments.replace_more(limit=0) # Remove "load more comments" top_comments = [] for comment in post.comments[:5]: # Top 5 comments if hasattr(comment, 'body'): top_comments.append({ "author": str(comment.author), "body": comment.body, "score": comment.score, "created_utc": comment.created_utc }) post_data["top_comments"] = top_comments posts_data.append(post_data) except Exception as e: logging.warning(f"Erro ao processar post: {str(e)}") continue result = { "subreddit": subreddit_name, "scraped_at": datetime.now().isoformat(), "total_posts": len(posts_data), "sort_by": sort_by, "time_filter": time_filter, "posts": posts_data } logging.info(f"Extraídos {len(posts_data)} posts de r/{subreddit_name}") return result except Exception as e: logging.error(f"Erro ao scrape r/{subreddit_name}: {str(e)}") return None def save_to_markdown(self, data: Dict, filename: Optional[str] = None): """Guarda dados em formato Markdown.""" if not data: return if not filename: filename = f"reddit_{data['subreddit']}_{int(time.time())}.md" filepath = Path(self.output_dir) / filename with open(filepath, 'w', encoding='utf-8') as f: # Cabeçalho f.write(f"# r/{data['subreddit']}\n\n") f.write(f"**Extraído em**: {data['scraped_at']}\n") f.write(f"**Total de posts**: {data['total_posts']}\n") f.write(f"**Ordenação**: {data['sort_by']}\n\n") f.write("---\n\n") # Posts for i, post in enumerate(data['posts'], 1): f.write(f"## {i}. {post['title']}\n\n") f.write(f"**Autor**: u/{post['author']}\n") f.write(f"**Score**: {post['score']} | **Comentários**: {post['num_comments']}\n") f.write(f"**Data**: {post['created_date']}\n") f.write(f"**Link**: [{post['permalink']}]({post['permalink']})\n\n") if post['selftext']: f.write("### Conteúdo\n\n") f.write(post['selftext']) f.write("\n\n") if post.get('top_comments'): f.write("### Top Comentários\n\n") for j, comment in enumerate(post['top_comments'], 1): f.write(f"{j}. **u/{comment['author']}** (score: {comment['score']})\n") f.write(f" {comment['body'][:200]}...\n\n") f.write("---\n\n") logging.info(f"Markdown guardado em: {filepath}") def save_to_json(self, data: Dict, filename: Optional[str] = None): """Guarda dados em formato JSON.""" if not data: return if not filename: filename = f"reddit_{data['subreddit']}_{int(time.time())}.json" filepath = Path(self.output_dir) / filename with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) logging.info(f"JSON guardado em: {filepath}") def scrape_multiple_subreddits( self, subreddit_names: List[str], limit_per_sub: int = 100, **kwargs ): """ Scrape múltiplos subreddits. Args: subreddit_names: Lista de nomes de subreddits limit_per_sub: Posts por subreddit **kwargs: Argumentos para scrape_subreddit() """ results = [] for subreddit in subreddit_names: try: logging.info(f"Processando r/{subreddit}...") data = self.scrape_subreddit(subreddit, limit=limit_per_sub, **kwargs) if data: # Guardar em ambos formatos self.save_to_markdown(data) self.save_to_json(data) results.append(data) # Pausa entre subreddits (rate limiting) time.sleep(2) except Exception as e: logging.error(f"Erro ao processar r/{subreddit}: {str(e)}") continue logging.info(f"Scraping concluído: {len(results)}/{len(subreddit_names)} subreddits processados") return results def main(): """Exemplo de uso.""" # Criar scraper scraper = RedditScraper(output_dir="output_md") # Lista de subreddits subreddits = ["Autoupholstery", "upholstery"] # Scrape múltiplos subreddits scraper.scrape_multiple_subreddits( subreddits, limit_per_sub=50, sort_by="top", time_filter="year" ) logging.info("Processo concluído!") if __name__ == "__main__": main()