import csv import re import time import random import os import logging from pathlib import Path from urllib.parse import urljoin, urlparse from curl_cffi import requests as curl_requests from bs4 import BeautifulSoup import undetected_chromedriver as uc # --- CONFIGURAÇÕES --- BASE_URL = "https://pt.bizin.eu/por/" OUTPUT_CSV = Path(__file__).parent / "output/bizin_empresas_final.csv" CATS_DONE_FILE = Path(__file__).parent / "logs/cats_done.txt" EMAIL_REGEX = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' # Logging configuration LOG_FILE = Path(__file__).parent / "logs/bizin_final.log" LOG_FILE.parent.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()] ) logger = logging.getLogger(__name__) class BizinScraper: def __init__(self): self.driver = None self.processed_urls = self._load_processed_urls() self.cats_done = self._load_cats_done() self.total_processed = 0 def _load_processed_urls(self): if not OUTPUT_CSV.exists(): return set() processed = set() try: with open(OUTPUT_CSV, mode='r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: if 'URL_Bizin' in row: processed.add(row['URL_Bizin']) except: pass return processed def _load_cats_done(self): if not CATS_DONE_FILE.exists(): return set() with open(CATS_DONE_FILE, 'r') as f: return set(line.strip() for line in f) def save_cat_done(self, url): with open(CATS_DONE_FILE, 'a') as f: f.write(url + '\n') self.cats_done.add(url) def get_driver(self): if not self.driver: logger.info("Iniciando UC Driver...") options = uc.ChromeOptions() options.binary_location = "/usr/bin/google-chrome-beta" options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') options.add_argument('--blink-settings=imagesEnabled=false') self.driver = uc.Chrome(options=options, version_main=148, headless=False) self.driver.set_page_load_timeout(60) return self.driver def close_driver(self): if self.driver: try: self.driver.quit() except: pass self.driver = None def fetch_page(self, url): try: driver = self.get_driver() driver.get(url) # Espera simples para Cloudflare time.sleep(random.uniform(5, 8)) if "Um momento" in driver.title or "Just a moment" in driver.title: logger.warning(f"Aguardando Cloudflare em {url}...") time.sleep(20) return driver.page_source except Exception as e: logger.error(f"Erro ao carregar {url}: {e}") self.close_driver() return None def parse_details(self, html, url): soup = BeautifulSoup(html, 'html.parser') data = {"Nome": "N/A", "Morada": "N/A", "Distrito": "N/A", "Sector": "N/A", "CAE": "N/A", "NIF": "N/A", "Telefone": "N/A", "Fax": "N/A", "Email": "N/A", "Website": "N/A", "URL_Bizin": url} try: h1 = soup.find('h1') if h1: data["Nome"] = h1.text.strip() for row in soup.find_all(['tr', 'div', 'li']): text = row.get_text(separator=' ', strip=True) if 'Morada' in text: data["Morada"] = text.split(':')[-1].strip() elif 'CAE' in text: data["CAE"] = text.split(':')[-1].strip() elif 'NIF' in text: data["NIF"] = text.split(':')[-1].strip() elif 'Sector' in text: data["Sector"] = text.split(':')[-1].strip() elif 'Telefone' in text: data["Telefone"] = text.split(':')[-1].strip() elif 'Email' in text: data["Email"] = text.split(':')[-1].strip() elif 'Website' in text: a = row.find('a', href=True) if a: data["Website"] = a['href'] except: pass return data def scrape(self): logger.info("🚀 Iniciando extração persistente...") html_main = self.fetch_page(BASE_URL) if not html_main: return soup = BeautifulSoup(html_main, 'html.parser') links = [] for a in soup.find_all('a', href=True): href = urljoin(BASE_URL, a['href']) if '/por/cat/' in href and len(href.split('-')) > 1 and href not in self.cats_done: links.append(href) logger.info(f"Faltam {len(links)} categorias.") for cat_url in links: logger.info(f"📂 Categoria: {cat_url}") page = 1 while True: paged_url = f"{cat_url}?p={page}" if page > 1 else cat_url html_list = self.fetch_page(paged_url) if not html_list: break soup_list = BeautifulSoup(html_list, 'html.parser') comp_links = [] for a in soup_list.find_all('a', href=True): h = urljoin(BASE_URL, a['href']) if '/por/' in h and len(h.split('-')) >= 3 and '/cat/' not in h and h not in self.processed_urls: comp_links.append(h) if not comp_links: break for c_url in comp_links: html_c = self.fetch_page(c_url) if html_c: det = self.parse_details(html_c, c_url) self.save_csv(det) self.processed_urls.add(c_url) self.total_processed += 1 logger.info(f"✅ [{self.total_processed}] {det['Nome']}") time.sleep(random.uniform(2, 4)) page += 1 if page > 100: break # Reiniciar driver a cada página de listagem para evitar crash self.close_driver() self.save_cat_done(cat_url) def save_csv(self, data): exists = OUTPUT_CSV.exists() with open(OUTPUT_CSV, 'a', newline='', encoding='utf-8') as f: w = csv.DictWriter(f, fieldnames=data.keys()) if not exists: w.writeheader() w.writerow(data) f.flush() os.fsync(f.fileno()) if __name__ == "__main__": s = BizinScraper() try: s.scrape() finally: s.close_driver()