python-selenium-spider/run/__init__.py
2024-05-12 01:50:29 +08:00

172 lines
6.6 KiB
Python

from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
import time
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.common.by import By
import requests
import os
from concurrent.futures import ThreadPoolExecutor
# def main(driver:WebDriver):
# driver.set_window_size(1920, 1080)
# # driver.get("https://bot.sannysoft.com/")\
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
workers = os.cpu_count()
def find_link(driver: WebDriver, url: str):
driver.get(url)
time.sleep(1)
cards = driver.find_element(By.XPATH, "//div[contains(@class,'CreatorPostList__CardsWrapper')]")
items = cards.find_elements(By.XPATH, "//a[contains(@class,'CardPostItem__Wrapper')]")
links = []
for item in items:
print(item.get_attribute("href"))
links.append(item.get_attribute("href"))
return links
def has_next(driver: WebDriver):
has_next = False
try:
next = driver.find_element(By.XPATH, "//pixiv-icon[@name='24/Next']")
if next is not None:
has_next = True
except Exception as e:
print(e)
has_next = False
return has_next
def download_file_link(driver: WebDriver, links):
for link in links:
print(link)
driver.get(link)
time.sleep(1)
# 获取所有打开的窗口句柄
# all_windows = driver.window_handles
# for window in all_windows:
# if window != original_window:
# driver.switch_to.window(window)
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
print(el)
a = el.find_element(By.TAG_NAME, "a")
if a is not None:
a.click()
except Exception as e:
print(e)
time.sleep(5)
# driver.back()
# driver.switch_to(original_window)
def download_image(url: str, download_dir: str, proxies):
print(f"开始下载:{url}")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Referer": "https://monpetit17.fanbox.cc/",
"Cookie": "p_ab_id=0; p_ab_id_2=1; p_ab_d_id=1743506050; _gcl_au=1.1.1973053698.1715103653; cf_clearance=9nglHcFCr9D17iz8392jJQIJi4oN8TSpBsvlS4oPjvs-1715103654-1.0.1.1-DLbaQVWB8O8lyUZyJ5P8ToQD7Bx7dt5r_7KUz9aFqIcEM5GiAunuXwAQRu5BJ3c3zBOt242Oy13YvXz1omV5Dw; FANBOXSESSID=35206256_01xWtpm33E6tjKYTAdLoVaZq5xceRVNB; privacy_policy_agreement=6; privacy_policy_notification=0; _gid=GA1.2.1663028388.1715442061; __cf_bm=YRkVSwzNtFz96eLrGB3KhENnEVv7lkYdc898q2AF3G0-1715447846-1.0.1.1-yJ.D2R_c.jS8SErn4fAmIg6fShYSfc2h_m4vQrehVA5UpRV7rIsjnYCGTZLro7JW9nh1r0Hu853rOcvoy6hSrA; cf_clearance=QQWAR7NhgwYZjtKExO9v0IE2eHNkQweSMLRVugqL3mE-1715447848-1.0.1.1-JhfPisIW0GNy135ks_mIObi9.X.FmmorhRl_Fows5nrHEQuPBt2S7CY_lnB4vCipSp4Xq.QrKW5oscwSCMk_Hw; _gat_gtag_UA_1830249_145=1; _ga_D9TLP3EFER=GS1.1.1715442066.2.1.1715448098.22.0.0; _ga=GA1.1.530858074.1715103653"
}
save_path = os.path.join(download_dir, url.split("/")[-1])
req = requests.get(url, proxies=proxies, headers=headers)
with open(save_path, "wb") as f:
f.write(req.content)
print(f"下载结束 {url}, 保存路径:{save_path}")
def download_images(driver: WebDriver, links, download_dir: str, proxies):
print(f"最大并发下载数: {workers}")
with ThreadPoolExecutor(max_workers=workers) as worker:
img_list = []
for link in links:
part_img_list = []
print(link)
# https://monpetit17.fanbox.cc/posts/5183527
# https://api.fanbox.cc/post.info?postId=5183527
driver.get(link)
print("打开页面")
time.sleep(1)
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
continue
except Exception as e:
pass
# html = driver.find_element(By.TAG_NAME, "html")
# height = html.size['height']
# for _ in range(500, height, 500):
# driver.execute_script("window.scrollBy(0, 500)")
# time.sleep(1.5)
for _ in range(0, 10):
driver.execute_script("window.scrollBy(0, 500)")
time.sleep(0.2)
img_link_elements = driver.find_elements(By.XPATH, "//a[contains(@class, 'PostImage__Anchor')]")
sub_dir = link.split("/")[-1]
real_download_dir = os.path.join(download_dir, sub_dir)
if not os.path.exists(real_download_dir):
os.makedirs(real_download_dir)
for element in img_link_elements:
href = element.get_attribute("href")
print(href)
part_img_list.append(href)
worker.submit(download_image, href, real_download_dir, proxies)
print("获取 %d 个图片地址" % len(part_img_list))
print(part_img_list)
img_list += part_img_list
print("%d 个页面, 共计 %d 个图片" % (len(links), len(img_list)))
def main(driver: WebDriver, download_dir: str, proxies):
driver.set_window_size(1920, 1080)
# driver.get("https://bot.sannysoft.com/")\
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
links = []
start_page = 1
url = "https://monpetit17.fanbox.cc/posts?page=%d" % start_page
links += find_link(driver, url)
while has_next(driver):
start_page += 1
url = "https://monpetit17.fanbox.cc/posts?page=%d" % start_page
links += find_link(driver, url)
for sub_dir in os.listdir(download_dir):
f = os.path.join(download_dir, sub_dir)
if os.path.isfile(f):
continue
if len(os.listdir(f)) == 0:
sub_page = "https://monpetit17.fanbox.cc/posts/%s" % sub_dir
links.append(sub_page)
print("获取子页面链接", links)
download_file_link(driver, links)
download_images(driver, links, download_dir, proxies)