Files
scripts/scraper/bizin_scraper_final.py
T
ealmeida 8e0dbbeca0 feat(bizin): scraper final com bypass Cloudflare + monitor de auto-reinício
- bizin_scraper_final.py: scraper híbrido curl_cffi + undetected-chromedriver
  com suporte a distritos e categorias, escrita segura (fsync) e enriquecimento externo
- monitor_scraper.sh: watchdog que reinicia o processo automaticamente em crash
- IMPLEMENTADO.md + README.md: actualizados para reflectir estado Abril 2026
- GEMINI.md: instruções técnicas de automação
- test_curl.py, test_curl_clean.py, test_playwright.py: scripts de teste/diagnóstico

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 17:16:48 +01:00

171 lines
6.5 KiB
Python

import csv
import re
import time
import random
import os
import logging
from pathlib import Path
from urllib.parse import urljoin, urlparse
from curl_cffi import requests as curl_requests
from bs4 import BeautifulSoup
import undetected_chromedriver as uc
# --- CONFIGURAÇÕES ---
BASE_URL = "https://pt.bizin.eu/por/"
OUTPUT_CSV = Path(__file__).parent / "output/bizin_empresas_final.csv"
CATS_DONE_FILE = Path(__file__).parent / "logs/cats_done.txt"
EMAIL_REGEX = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
# Logging configuration
LOG_FILE = Path(__file__).parent / "logs/bizin_final.log"
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class BizinScraper:
def __init__(self):
self.driver = None
self.processed_urls = self._load_processed_urls()
self.cats_done = self._load_cats_done()
self.total_processed = 0
def _load_processed_urls(self):
if not OUTPUT_CSV.exists(): return set()
processed = set()
try:
with open(OUTPUT_CSV, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if 'URL_Bizin' in row: processed.add(row['URL_Bizin'])
except: pass
return processed
def _load_cats_done(self):
if not CATS_DONE_FILE.exists(): return set()
with open(CATS_DONE_FILE, 'r') as f:
return set(line.strip() for line in f)
def save_cat_done(self, url):
with open(CATS_DONE_FILE, 'a') as f:
f.write(url + '\n')
self.cats_done.add(url)
def get_driver(self):
if not self.driver:
logger.info("Iniciando UC Driver...")
options = uc.ChromeOptions()
options.binary_location = "/usr/bin/google-chrome-beta"
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--blink-settings=imagesEnabled=false')
self.driver = uc.Chrome(options=options, version_main=148, headless=False)
self.driver.set_page_load_timeout(60)
return self.driver
def close_driver(self):
if self.driver:
try: self.driver.quit()
except: pass
self.driver = None
def fetch_page(self, url):
try:
driver = self.get_driver()
driver.get(url)
# Espera simples para Cloudflare
time.sleep(random.uniform(5, 8))
if "Um momento" in driver.title or "Just a moment" in driver.title:
logger.warning(f"Aguardando Cloudflare em {url}...")
time.sleep(20)
return driver.page_source
except Exception as e:
logger.error(f"Erro ao carregar {url}: {e}")
self.close_driver()
return None
def parse_details(self, html, url):
soup = BeautifulSoup(html, 'html.parser')
data = {"Nome": "N/A", "Morada": "N/A", "Distrito": "N/A", "Sector": "N/A", "CAE": "N/A", "NIF": "N/A", "Telefone": "N/A", "Fax": "N/A", "Email": "N/A", "Website": "N/A", "URL_Bizin": url}
try:
h1 = soup.find('h1')
if h1: data["Nome"] = h1.text.strip()
for row in soup.find_all(['tr', 'div', 'li']):
text = row.get_text(separator=' ', strip=True)
if 'Morada' in text: data["Morada"] = text.split(':')[-1].strip()
elif 'CAE' in text: data["CAE"] = text.split(':')[-1].strip()
elif 'NIF' in text: data["NIF"] = text.split(':')[-1].strip()
elif 'Sector' in text: data["Sector"] = text.split(':')[-1].strip()
elif 'Telefone' in text: data["Telefone"] = text.split(':')[-1].strip()
elif 'Email' in text: data["Email"] = text.split(':')[-1].strip()
elif 'Website' in text:
a = row.find('a', href=True)
if a: data["Website"] = a['href']
except: pass
return data
def scrape(self):
logger.info("🚀 Iniciando extração persistente...")
html_main = self.fetch_page(BASE_URL)
if not html_main: return
soup = BeautifulSoup(html_main, 'html.parser')
links = []
for a in soup.find_all('a', href=True):
href = urljoin(BASE_URL, a['href'])
if '/por/cat/' in href and len(href.split('-')) > 1 and href not in self.cats_done:
links.append(href)
logger.info(f"Faltam {len(links)} categorias.")
for cat_url in links:
logger.info(f"📂 Categoria: {cat_url}")
page = 1
while True:
paged_url = f"{cat_url}?p={page}" if page > 1 else cat_url
html_list = self.fetch_page(paged_url)
if not html_list: break
soup_list = BeautifulSoup(html_list, 'html.parser')
comp_links = []
for a in soup_list.find_all('a', href=True):
h = urljoin(BASE_URL, a['href'])
if '/por/' in h and len(h.split('-')) >= 3 and '/cat/' not in h and h not in self.processed_urls:
comp_links.append(h)
if not comp_links: break
for c_url in comp_links:
html_c = self.fetch_page(c_url)
if html_c:
det = self.parse_details(html_c, c_url)
self.save_csv(det)
self.processed_urls.add(c_url)
self.total_processed += 1
logger.info(f"✅ [{self.total_processed}] {det['Nome']}")
time.sleep(random.uniform(2, 4))
page += 1
if page > 100: break
# Reiniciar driver a cada página de listagem para evitar crash
self.close_driver()
self.save_cat_done(cat_url)
def save_csv(self, data):
exists = OUTPUT_CSV.exists()
with open(OUTPUT_CSV, 'a', newline='', encoding='utf-8') as f:
w = csv.DictWriter(f, fieldnames=data.keys())
if not exists: w.writeheader()
w.writerow(data)
f.flush()
os.fsync(f.fileno())
if __name__ == "__main__":
s = BizinScraper()
try: s.scrape()
finally: s.close_driver()