python-selenium-spider/run/__init__.py

155 lines
5.7 KiB
Python
Raw Permalink Normal View History

2024-03-02 03:21:47 +08:00
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
import time
from selenium.webdriver.remote.webdriver import WebDriver
2024-05-12 01:41:09 +08:00
from selenium.webdriver.common.by import By
import requests
import os
from concurrent.futures import ThreadPoolExecutor
2024-03-02 03:21:47 +08:00
2024-05-12 01:41:09 +08:00
# def main(driver:WebDriver):
# driver.set_window_size(1920, 1080)
# # driver.get("https://bot.sannysoft.com/")\
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
2024-03-02 03:21:47 +08:00
2024-05-12 01:41:09 +08:00
workers = os.cpu_count()
def find_link(driver: WebDriver, url: str):
driver.get(url)
time.sleep(1)
cards = driver.find_element(By.XPATH, "//div[contains(@class,'CreatorPostList__CardsWrapper')]")
items = cards.find_elements(By.XPATH, "//a[contains(@class,'CardPostItem__Wrapper')]")
links = []
for item in items:
print(item.get_attribute("href"))
links.append(item.get_attribute("href"))
return links
def has_next(driver: WebDriver):
has_next = False
try:
next = driver.find_element(By.XPATH, "//pixiv-icon[@name='24/Next']")
if next is not None:
has_next = True
except Exception as e:
print(e)
has_next = False
return has_next
2024-05-12 02:52:40 +08:00
def download_file_link(driver: WebDriver):
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
print(el)
a = el.find_element(By.TAG_NAME, "a")
if a is not None:
a.click()
time.sleep(1)
except Exception as e:
pass
2024-05-12 01:41:09 +08:00
def download_image(url: str, download_dir: str, proxies):
print(f"开始下载:{url}")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
2024-05-12 02:41:53 +08:00
"Referer": "https://www.fanbox.cc/",
2024-05-12 01:41:09 +08:00
"Cookie": "p_ab_id=0; p_ab_id_2=1; p_ab_d_id=1743506050; _gcl_au=1.1.1973053698.1715103653; cf_clearance=9nglHcFCr9D17iz8392jJQIJi4oN8TSpBsvlS4oPjvs-1715103654-1.0.1.1-DLbaQVWB8O8lyUZyJ5P8ToQD7Bx7dt5r_7KUz9aFqIcEM5GiAunuXwAQRu5BJ3c3zBOt242Oy13YvXz1omV5Dw; FANBOXSESSID=35206256_01xWtpm33E6tjKYTAdLoVaZq5xceRVNB; privacy_policy_agreement=6; privacy_policy_notification=0; _gid=GA1.2.1663028388.1715442061; __cf_bm=YRkVSwzNtFz96eLrGB3KhENnEVv7lkYdc898q2AF3G0-1715447846-1.0.1.1-yJ.D2R_c.jS8SErn4fAmIg6fShYSfc2h_m4vQrehVA5UpRV7rIsjnYCGTZLro7JW9nh1r0Hu853rOcvoy6hSrA; cf_clearance=QQWAR7NhgwYZjtKExO9v0IE2eHNkQweSMLRVugqL3mE-1715447848-1.0.1.1-JhfPisIW0GNy135ks_mIObi9.X.FmmorhRl_Fows5nrHEQuPBt2S7CY_lnB4vCipSp4Xq.QrKW5oscwSCMk_Hw; _gat_gtag_UA_1830249_145=1; _ga_D9TLP3EFER=GS1.1.1715442066.2.1.1715448098.22.0.0; _ga=GA1.1.530858074.1715103653"
}
save_path = os.path.join(download_dir, url.split("/")[-1])
req = requests.get(url, proxies=proxies, headers=headers)
with open(save_path, "wb") as f:
f.write(req.content)
print(f"下载结束 {url}, 保存路径:{save_path}")
2024-05-12 02:52:40 +08:00
def download_images(driver: WebDriver, link, download_dir: str, proxies):
2024-05-12 01:41:09 +08:00
print(f"最大并发下载数: {workers}")
with ThreadPoolExecutor(max_workers=workers) as worker:
2024-05-12 02:52:40 +08:00
part_img_list = []
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
return
except Exception as e:
pass
for _ in range(0, 15):
driver.execute_script("window.scrollBy(0, 500)")
time.sleep(0.2)
img_link_elements = driver.find_elements(By.XPATH, "//a[contains(@class, 'PostImage__Anchor')]")
sub_dir = link.split("/")[-1]
real_download_dir = os.path.join(download_dir, sub_dir)
if not os.path.exists(real_download_dir):
os.makedirs(real_download_dir)
for element in img_link_elements:
href = element.get_attribute("href")
print(href)
part_img_list.append(href)
worker.submit(download_image, href, real_download_dir, proxies)
print("获取 %d 个图片地址" % len(part_img_list))
print(part_img_list)
2024-05-12 01:41:09 +08:00
def main(driver: WebDriver, download_dir: str, proxies):
2024-03-02 03:21:47 +08:00
driver.set_window_size(1920, 1080)
2024-03-02 16:15:32 +08:00
# driver.get("https://bot.sannysoft.com/")\
2024-05-12 01:41:09 +08:00
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
links = []
2024-05-12 02:41:53 +08:00
base_url = "https://www.fanbox.cc/@laserflip/posts"
page_url = base_url + "?page=%d"
post_url = base_url + "/%s"
2024-05-12 01:41:09 +08:00
start_page = 1
2024-05-12 02:41:53 +08:00
url = page_url % start_page
2024-05-12 01:41:09 +08:00
links += find_link(driver, url)
while has_next(driver):
start_page += 1
2024-05-12 02:41:53 +08:00
url = page_url % start_page
2024-05-12 01:41:09 +08:00
links += find_link(driver, url)
2024-05-12 02:52:40 +08:00
for link in links:
driver.get(link)
time.sleep(1)
download_file_link(driver)
download_images(driver, link, download_dir, proxies)
2024-05-12 02:41:53 +08:00
links = []
2024-05-12 01:50:29 +08:00
for sub_dir in os.listdir(download_dir):
f = os.path.join(download_dir, sub_dir)
if os.path.isfile(f):
continue
if len(os.listdir(f)) == 0:
2024-05-12 02:41:53 +08:00
sub_page = post_url % sub_dir
2024-05-12 01:50:29 +08:00
links.append(sub_page)
2024-05-12 01:41:09 +08:00
print("获取子页面链接", links)
2024-05-12 02:52:40 +08:00
for link in links:
driver.get(link)
time.sleep(1)
download_images(driver, link, download_dir, proxies)