批量爬取 fanbox

This commit is contained in:
shikong 2024-05-12 01:41:09 +08:00
parent 05532dd278
commit 7b25d0ed7d
4 changed files with 180 additions and 13 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

20
main.py
View File

@ -3,15 +3,26 @@ from selenium.webdriver.support.ui import WebDriverWait
import time import time
import run import run
download_dir = r'E:\Repository\python-selenium-spider\download'
proxies = {
"http": "socks5://127.0.0.1:1080",
"https": "socks5://127.0.0.1:1080",
}
if __name__ == '__main__': if __name__ == '__main__':
options = webdriver.ChromeOptions() options = webdriver.ChromeOptions()
options.add_argument('lang=zh-CN') options.add_argument('lang=zh-CN')
options.add_argument( options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36') 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36')
options.add_argument(r'--user-data-dir=E:\Repository\skcks.cn\python-selenium-spider\tmp\UserData') options.add_argument(r'--user-data-dir=E:\Repository\python-selenium-spider\tmp\UserData')
options.add_argument('--disable-gpu') # 如果不加这个选项,有时定位会出现问题 options.add_argument('--disable-gpu') # 如果不加这个选项,有时定位会出现问题
# options.add_argument('--headless') # 增加无界面选项 # options.add_argument('--headless') # 增加无界面选项
options.add_experimental_option("prefs", {
"download.default_directory": download_dir
})
service = webdriver.ChromeService(r"./driver/chromedriver.exe") service = webdriver.ChromeService(r"./driver/chromedriver.exe")
driver = webdriver.Chrome(service=service,options=options) driver = webdriver.Chrome(service=service,options=options)
# driver.maximize_window() # driver.maximize_window()
@ -29,7 +40,8 @@ if __name__ == '__main__':
"source": f.read() "source": f.read()
}) })
run.main(driver=driver) run.main(driver=driver, download_dir=download_dir, proxies=proxies)
time.sleep(5) time.sleep(86400)
finally: finally:
driver.quit() pass
# driver.quit()

View File

@ -1,2 +1,2 @@
Pillow==10.2.0 Pillow==10.3.0
selenium==4.18.1 selenium==4.20.0

View File

@ -2,15 +2,162 @@ from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
import time import time
from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.common.by import By
import requests
import os
from concurrent.futures import ThreadPoolExecutor
import utils # def main(driver:WebDriver):
# driver.set_window_size(1920, 1080)
# # driver.get("https://bot.sannysoft.com/")\
# driver.get("https://www.geetest.com/adaptive-captcha-demo")
# width = driver.execute_script("return document.documentElement.scrollWidth")
# height = driver.execute_script("return document.documentElement.scrollHeight")
# print(width, height)
# utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
def main(driver:WebDriver): workers = os.cpu_count()
def find_link(driver: WebDriver, url: str):
driver.get(url)
time.sleep(1)
cards = driver.find_element(By.XPATH, "//div[contains(@class,'CreatorPostList__CardsWrapper')]")
items = cards.find_elements(By.XPATH, "//a[contains(@class,'CardPostItem__Wrapper')]")
links = []
for item in items:
print(item.get_attribute("href"))
links.append(item.get_attribute("href"))
return links
def has_next(driver: WebDriver):
has_next = False
try:
next = driver.find_element(By.XPATH, "//pixiv-icon[@name='24/Next']")
if next is not None:
has_next = True
except Exception as e:
print(e)
has_next = False
return has_next
def download_file_link(driver: WebDriver, links):
for link in links:
print(link)
driver.get(link)
time.sleep(1)
# 获取所有打开的窗口句柄
# all_windows = driver.window_handles
# for window in all_windows:
# if window != original_window:
# driver.switch_to.window(window)
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
print(el)
a = el.find_element(By.TAG_NAME, "a")
if a is not None:
a.click()
except Exception as e:
print(e)
time.sleep(5)
# driver.back()
# driver.switch_to(original_window)
def download_image(url: str, download_dir: str, proxies):
print(f"开始下载:{url}")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Referer": "https://monpetit17.fanbox.cc/",
"Cookie": "p_ab_id=0; p_ab_id_2=1; p_ab_d_id=1743506050; _gcl_au=1.1.1973053698.1715103653; cf_clearance=9nglHcFCr9D17iz8392jJQIJi4oN8TSpBsvlS4oPjvs-1715103654-1.0.1.1-DLbaQVWB8O8lyUZyJ5P8ToQD7Bx7dt5r_7KUz9aFqIcEM5GiAunuXwAQRu5BJ3c3zBOt242Oy13YvXz1omV5Dw; FANBOXSESSID=35206256_01xWtpm33E6tjKYTAdLoVaZq5xceRVNB; privacy_policy_agreement=6; privacy_policy_notification=0; _gid=GA1.2.1663028388.1715442061; __cf_bm=YRkVSwzNtFz96eLrGB3KhENnEVv7lkYdc898q2AF3G0-1715447846-1.0.1.1-yJ.D2R_c.jS8SErn4fAmIg6fShYSfc2h_m4vQrehVA5UpRV7rIsjnYCGTZLro7JW9nh1r0Hu853rOcvoy6hSrA; cf_clearance=QQWAR7NhgwYZjtKExO9v0IE2eHNkQweSMLRVugqL3mE-1715447848-1.0.1.1-JhfPisIW0GNy135ks_mIObi9.X.FmmorhRl_Fows5nrHEQuPBt2S7CY_lnB4vCipSp4Xq.QrKW5oscwSCMk_Hw; _gat_gtag_UA_1830249_145=1; _ga_D9TLP3EFER=GS1.1.1715442066.2.1.1715448098.22.0.0; _ga=GA1.1.530858074.1715103653"
}
save_path = os.path.join(download_dir, url.split("/")[-1])
req = requests.get(url, proxies=proxies, headers=headers)
with open(save_path, "wb") as f:
f.write(req.content)
print(f"下载结束 {url}, 保存路径:{save_path}")
def download_images(driver: WebDriver, links, download_dir: str, proxies):
print(f"最大并发下载数: {workers}")
with ThreadPoolExecutor(max_workers=workers) as worker:
img_list = []
for link in links:
part_img_list = []
print(link)
# https://monpetit17.fanbox.cc/posts/5183527
# https://api.fanbox.cc/post.info?postId=5183527
driver.get(link)
print("打开页面")
time.sleep(1)
try:
el = driver.find_element(By.XPATH, "//div[contains(@class, 'FileContent__Wrapper')]")
if el is not None:
continue
except Exception as e:
pass
# html = driver.find_element(By.TAG_NAME, "html")
# height = html.size['height']
# for _ in range(500, height, 500):
# driver.execute_script("window.scrollBy(0, 500)")
# time.sleep(1.5)
for _ in range(0, 10):
driver.execute_script("window.scrollBy(0, 500)")
time.sleep(0.2)
img_link_elements = driver.find_elements(By.XPATH, "//a[contains(@class, 'PostImage__Anchor')]")
sub_dir = link.split("/")[-1]
real_download_dir = os.path.join(download_dir, sub_dir)
if not os.path.exists(real_download_dir):
os.makedirs(real_download_dir)
for element in img_link_elements:
href = element.get_attribute("href")
print(href)
part_img_list.append(href)
worker.submit(download_image, href, real_download_dir, proxies)
print("获取 %d 个图片地址" % len(part_img_list))
print(part_img_list)
img_list += part_img_list
print("%d 个页面, 共计 %d 个图片" % (len(links), len(img_list)))
def main(driver: WebDriver, download_dir: str, proxies):
driver.set_window_size(1920, 1080) driver.set_window_size(1920, 1080)
# driver.get("https://bot.sannysoft.com/")\ # driver.get("https://bot.sannysoft.com/")\
driver.get("https://www.geetest.com/adaptive-captcha-demo") # driver.get("https://www.geetest.com/adaptive-captcha-demo")
width = driver.execute_script("return document.documentElement.scrollWidth") # width = driver.execute_script("return document.documentElement.scrollWidth")
height = driver.execute_script("return document.documentElement.scrollHeight") # height = driver.execute_script("return document.documentElement.scrollHeight")
print(width, height) # print(width, height)
utils.screenshot(driver=driver, save_path="./tmp/geetest.png") # utils.screenshot(driver=driver, save_path="./tmp/geetest.png")
links = []
start_page = 1
url = "https://monpetit17.fanbox.cc/posts?page=%d" % start_page
links += find_link(driver, url)
while has_next(driver):
start_page += 1
url = "https://monpetit17.fanbox.cc/posts?page=%d" % start_page
links += find_link(driver, url)
print("获取子页面链接", links)
# download_file_link(driver, links)
download_images(driver, links, download_dir, proxies)