python-selenium-spider/run/__init__.py
2024-05-12 02:52:40 +08:00

155 lines
5.7 KiB
Python

from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
import time
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.common.by import By
import requests
import os
from concurrent.futures import ThreadPoolExecutor
# def main(driver:WebDriver):
# driver.set_window_size(1920, 1080)
# # driver.get("https://bot.sannysoft.com/")\
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
workers = os.cpu_count()
def find_link(driver: WebDriver, url: str):
driver.get(url)
time.sleep(1)
cards = driver.find_element(By.XPATH, "//div[contains(@class,'CreatorPostList__CardsWrapper')]")
items = cards.find_elements(By.XPATH, "//a[contains(@class,'CardPostItem__Wrapper')]")
links = []
for item in items:
print(item.get_attribute("href"))
links.append(item.get_attribute("href"))
return links
def has_next(driver: WebDriver):
has_next = False
try:
next = driver.find_element(By.XPATH, "//pixiv-icon[@name='24/Next']")
if next is not None:
has_next = True
except Exception as e:
print(e)
has_next = False
return has_next
def download_file_link(driver: WebDriver):
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
print(el)
a = el.find_element(By.TAG_NAME, "a")
if a is not None:
a.click()
time.sleep(1)
except Exception as e:
pass
def download_image(url: str, download_dir: str, proxies):
print(f"开始下载:{url}")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Referer": "https://www.fanbox.cc/",
"Cookie": "p_ab_id=0; p_ab_id_2=1; p_ab_d_id=1743506050; _gcl_au=1.1.1973053698.1715103653; cf_clearance=9nglHcFCr9D17iz8392jJQIJi4oN8TSpBsvlS4oPjvs-1715103654-1.0.1.1-DLbaQVWB8O8lyUZyJ5P8ToQD7Bx7dt5r_7KUz9aFqIcEM5GiAunuXwAQRu5BJ3c3zBOt242Oy13YvXz1omV5Dw; FANBOXSESSID=35206256_01xWtpm33E6tjKYTAdLoVaZq5xceRVNB; privacy_policy_agreement=6; privacy_policy_notification=0; _gid=GA1.2.1663028388.1715442061; __cf_bm=YRkVSwzNtFz96eLrGB3KhENnEVv7lkYdc898q2AF3G0-1715447846-1.0.1.1-yJ.D2R_c.jS8SErn4fAmIg6fShYSfc2h_m4vQrehVA5UpRV7rIsjnYCGTZLro7JW9nh1r0Hu853rOcvoy6hSrA; cf_clearance=QQWAR7NhgwYZjtKExO9v0IE2eHNkQweSMLRVugqL3mE-1715447848-1.0.1.1-JhfPisIW0GNy135ks_mIObi9.X.FmmorhRl_Fows5nrHEQuPBt2S7CY_lnB4vCipSp4Xq.QrKW5oscwSCMk_Hw; _gat_gtag_UA_1830249_145=1; _ga_D9TLP3EFER=GS1.1.1715442066.2.1.1715448098.22.0.0; _ga=GA1.1.530858074.1715103653"
}
save_path = os.path.join(download_dir, url.split("/")[-1])
req = requests.get(url, proxies=proxies, headers=headers)
with open(save_path, "wb") as f:
f.write(req.content)
print(f"下载结束 {url}, 保存路径:{save_path}")
def download_images(driver: WebDriver, link, download_dir: str, proxies):
print(f"最大并发下载数: {workers}")
with ThreadPoolExecutor(max_workers=workers) as worker:
part_img_list = []
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
return
except Exception as e:
pass
for _ in range(0, 15):
driver.execute_script("window.scrollBy(0, 500)")
time.sleep(0.2)
img_link_elements = driver.find_elements(By.XPATH, "//a[contains(@class, 'PostImage__Anchor')]")
sub_dir = link.split("/")[-1]
real_download_dir = os.path.join(download_dir, sub_dir)
if not os.path.exists(real_download_dir):
os.makedirs(real_download_dir)
for element in img_link_elements:
href = element.get_attribute("href")
print(href)
part_img_list.append(href)
worker.submit(download_image, href, real_download_dir, proxies)
print("获取 %d 个图片地址" % len(part_img_list))
print(part_img_list)
def main(driver: WebDriver, download_dir: str, proxies):
driver.set_window_size(1920, 1080)
# driver.get("https://bot.sannysoft.com/")\
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
links = []
base_url = "https://www.fanbox.cc/@laserflip/posts"
page_url = base_url + "?page=%d"
post_url = base_url + "/%s"
start_page = 1
url = page_url % start_page
links += find_link(driver, url)
while has_next(driver):
start_page += 1
url = page_url % start_page
links += find_link(driver, url)
for link in links:
driver.get(link)
time.sleep(1)
download_file_link(driver)
download_images(driver, link, download_dir, proxies)
links = []
for sub_dir in os.listdir(download_dir):
f = os.path.join(download_dir, sub_dir)
if os.path.isfile(f):
continue
if len(os.listdir(f)) == 0:
sub_page = post_url % sub_dir
links.append(sub_page)
print("获取子页面链接", links)
for link in links:
driver.get(link)
time.sleep(1)
download_images(driver, link, download_dir, proxies)