182 lines
6.4 KiB
Python
Executable file
182 lines
6.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import time
|
|
import argparse
|
|
import requests
|
|
from urllib.parse import urlparse, urljoin
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
# Optionally import selenium-stealth for additional anti-detection measures.
|
|
try:
|
|
from selenium_stealth import stealth
|
|
except ImportError:
|
|
stealth = None
|
|
|
|
# Global sets to track visited URLs and downloaded files
|
|
visited_pages = set()
|
|
downloaded_files = set()
|
|
|
|
def get_local_path(url, output_dir):
|
|
"""
|
|
Generate a local file path from a URL by subfoldering based on netloc and URL path.
|
|
If a URL path ends with / then it's saved as index.html.
|
|
Avoid filename collisions by appending a counter if needed.
|
|
"""
|
|
parsed = urlparse(url)
|
|
netloc = parsed.netloc
|
|
path = parsed.path
|
|
|
|
if path.endswith("/") or not os.path.basename(path):
|
|
filename = "index.html"
|
|
folder = os.path.join(output_dir, netloc, path.lstrip("/"))
|
|
else:
|
|
filename = os.path.basename(path)
|
|
folder = os.path.join(output_dir, netloc, os.path.dirname(path.lstrip("/")))
|
|
|
|
os.makedirs(folder, exist_ok=True)
|
|
local_file = os.path.join(folder, filename)
|
|
counter = 1
|
|
base, ext = os.path.splitext(local_file)
|
|
while os.path.exists(local_file):
|
|
local_file = f"{base}_{counter}{ext}"
|
|
counter += 1
|
|
return local_file
|
|
|
|
def download_file(url, output_dir, session):
|
|
"""
|
|
Downloads a file from the given URL using requests and writes it to a local file.
|
|
"""
|
|
if url in downloaded_files:
|
|
return
|
|
downloaded_files.add(url)
|
|
print(f"Downloading file: {url}")
|
|
local_path = get_local_path(url, output_dir)
|
|
try:
|
|
with session.get(url, stream=True, timeout=10) as r:
|
|
r.raise_for_status()
|
|
with open(local_path, "wb") as f:
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
print(f"Saved to: {local_path}")
|
|
except Exception as e:
|
|
print(f"Failed to download {url}: {e}")
|
|
|
|
def crawl(url, depth, args, driver, session, output_dir):
|
|
"""
|
|
Crawl an HTML page with Selenium, optionally downloading page content and recursively processing found links.
|
|
"""
|
|
if url in visited_pages:
|
|
return
|
|
visited_pages.add(url)
|
|
print(f"Crawling: {url} (depth {depth})")
|
|
try:
|
|
driver.get(url)
|
|
# simple wait - adjust as needed
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
print(f"Error loading {url}: {e}")
|
|
return
|
|
|
|
# Optionally download the HTML page itself if --all is specified
|
|
if args.all:
|
|
local_path = get_local_path(url, output_dir)
|
|
try:
|
|
with open(local_path, "w", encoding="utf-8") as f:
|
|
f.write(driver.page_source)
|
|
print(f"Page saved to: {local_path}")
|
|
except Exception as e:
|
|
print(f"Failed to save {url}: {e}")
|
|
|
|
# Find all <a> tags
|
|
try:
|
|
anchors = driver.find_elements(By.TAG_NAME, "a")
|
|
except Exception as e:
|
|
print(f"Error finding links on {url}: {e}")
|
|
return
|
|
|
|
for a in anchors:
|
|
href = a.get_attribute("href")
|
|
if not href:
|
|
continue
|
|
# Join relative URLs with current page URL.
|
|
href = urljoin(url, href)
|
|
# Skip if already visited (for downloads or crawling)
|
|
if href in visited_pages or href in downloaded_files:
|
|
continue
|
|
|
|
# Try to determine content-type via a HEAD request
|
|
try:
|
|
head = session.head(href, allow_redirects=True, timeout=5)
|
|
content_type = head.headers.get("Content-Type", "")
|
|
except Exception:
|
|
content_type = ""
|
|
|
|
if "text/html" in content_type.lower():
|
|
# Recurse if depth allows
|
|
if depth > 0:
|
|
crawl(href, depth - 1, args, driver, session, output_dir)
|
|
else:
|
|
# Download if --all flag is enabled
|
|
if args.all:
|
|
download_file(href, output_dir, session)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Selenium-based downloader with recursion and subfoldering")
|
|
parser.add_argument("url", help="URL to fetch")
|
|
parser.add_argument("-a", "--all", action="store_true", help="Download all links on the page")
|
|
parser.add_argument("-r", "--recurse", type=int, default=0,
|
|
help="Recursively crawl linked pages up to the provided depth")
|
|
parser.add_argument("-o", "--output", default="downloads", help="Output directory")
|
|
args = parser.parse_args()
|
|
|
|
# Configure Chrome options
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
user_agent = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/115.0.0.0 Safari/537.36")
|
|
chrome_options.add_argument(f"--user-agent={user_agent}")
|
|
# Specify custom Chrome binary if needed.
|
|
chrome_options.binary_location = "/Applications/Google Chrome for Testing.app/Contents/MacOS/Google Chrome for Testing"
|
|
|
|
# Use chromedriver from a known location (e.g., Brew installation)
|
|
service = Service("/usr/local/bin/chromedriver")
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
# Optionally apply stealth configurations
|
|
if stealth:
|
|
stealth(driver,
|
|
languages=["en-US", "en"],
|
|
vendor="Google Inc.",
|
|
platform="Win32",
|
|
webgl_vendor="Intel Inc.",
|
|
renderer="Intel Iris OpenGL Engine",
|
|
fix_hairline=True,
|
|
)
|
|
|
|
# Create a requests session and transfer cookies from Selenium driver
|
|
session = requests.Session()
|
|
for cookie in driver.get_cookies():
|
|
session.cookies.set(cookie["name"], cookie["value"], domain=cookie.get("domain"))
|
|
|
|
# Ensure the output directory exists.
|
|
os.makedirs(args.output, exist_ok=True)
|
|
|
|
# Start crawling/ downloading
|
|
crawl(args.url, args.recurse, args, driver, session, args.output)
|
|
|
|
driver.quit()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|