mirror of
https://github.com/LC044/WeChatMsg
synced 2024-11-16 06:51:19 +08:00
169 lines
5.2 KiB
Python
169 lines
5.2 KiB
Python
import html
|
||
import xml.etree.ElementTree as ET
|
||
|
||
import lz4.block
|
||
|
||
import requests
|
||
from urllib.parse import urlparse
|
||
from bs4 import BeautifulSoup
|
||
|
||
|
||
|
||
def decompress_CompressContent(data):
|
||
"""
|
||
解压缩Msg:CompressContent内容
|
||
:param data:
|
||
:return:
|
||
"""
|
||
if data is None or not isinstance(data, bytes):
|
||
return ''
|
||
try:
|
||
dst = lz4.block.decompress(data, uncompressed_size=len(data) << 10)
|
||
decoded_string = dst.decode().replace('\x00', '') # Remove any null characters
|
||
except :
|
||
print("Decompression failed: potentially corrupt input or insufficient buffer size.")
|
||
return ''
|
||
return decoded_string
|
||
|
||
|
||
def escape_js_and_html(input_str):
|
||
# 转义HTML特殊字符
|
||
html_escaped = html.escape(input_str, quote=False)
|
||
|
||
# 手动处理JavaScript转义字符
|
||
js_escaped = (
|
||
html_escaped
|
||
.replace("\\", "\\\\")
|
||
.replace("'", r"\'")
|
||
.replace('"', r'\"')
|
||
.replace("\n", r'\n')
|
||
.replace("\r", r'\r')
|
||
.replace("\t", r'\t')
|
||
)
|
||
|
||
return js_escaped
|
||
|
||
|
||
def parser_reply(data: bytes):
|
||
xml_content = decompress_CompressContent(data)
|
||
if not xml_content:
|
||
return {
|
||
'type': 57,
|
||
'title': "发生错误",
|
||
'refer': {
|
||
'type': '1',
|
||
'content': '引用错误',
|
||
'displayname': '用户名',
|
||
},
|
||
"is_error": True
|
||
}
|
||
try:
|
||
root = ET.XML(xml_content)
|
||
appmsg = root.find('appmsg')
|
||
msg_type = int(appmsg.find('type').text)
|
||
title = appmsg.find('title').text
|
||
refermsg_content = appmsg.find('refermsg').find('content').text
|
||
refermsg_type = int(appmsg.find('refermsg').find('type').text)
|
||
refermsg_displayname = appmsg.find('refermsg').find('displayname').text
|
||
return {
|
||
'type': msg_type,
|
||
'title': title,
|
||
'refer': None if refermsg_type != 1 else {
|
||
'type': refermsg_type,
|
||
'content': refermsg_content.lstrip("\n"),
|
||
'displayname': refermsg_displayname,
|
||
},
|
||
"is_error": False
|
||
}
|
||
except:
|
||
return {
|
||
'type': 57,
|
||
'title': "发生错误",
|
||
'refer': {
|
||
'type': '1',
|
||
'content': '引用错误',
|
||
'displayname': '用户名',
|
||
},
|
||
"is_error": True
|
||
}
|
||
|
||
|
||
def music_share(data: bytes):
|
||
xml_content = decompress_CompressContent(data)
|
||
if not xml_content:
|
||
return {
|
||
'type': 3,
|
||
'title': "发生错误",
|
||
"is_error": True
|
||
}
|
||
try:
|
||
root = ET.XML(xml_content)
|
||
appmsg = root.find('appmsg')
|
||
msg_type = int(appmsg.find('type').text)
|
||
title = appmsg.find('title').text
|
||
if len(title) >= 39:
|
||
title = title[:38] + '...'
|
||
artist = appmsg.find('des').text
|
||
link_url = appmsg.find('url').text # 链接地址
|
||
audio_url = get_audio_url(appmsg.find('dataurl').text) # 播放地址
|
||
website_name = get_website_name(link_url)
|
||
return {
|
||
'type': msg_type,
|
||
'title': title,
|
||
'artist': artist,
|
||
'link_url': link_url,
|
||
'audio_url': audio_url,
|
||
'website_name': website_name,
|
||
"is_error": False
|
||
}
|
||
except Exception as e:
|
||
print(f"Music Share Error: {e}")
|
||
return {
|
||
'type': 3,
|
||
'title': "发生错误",
|
||
"is_error": True
|
||
}
|
||
|
||
|
||
def get_website_name(url):
|
||
parsed_url = urlparse(url)
|
||
domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
||
website_name = ''
|
||
try:
|
||
response = requests.get(domain, allow_redirects=False)
|
||
if response.status_code == 200:
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
website_name = soup.title.string.strip()
|
||
elif response.status_code == 302:
|
||
domain = response.headers['Location']
|
||
response = requests.get(domain, allow_redirects=False)
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
website_name = soup.title.string.strip()
|
||
else:
|
||
response = requests.get(url, allow_redirects=False)
|
||
if response.status_code == 200:
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
website_name = soup.title.string.strip()
|
||
index = website_name.find("-")
|
||
if index != -1: # 如果找到了 "-"
|
||
website_name = website_name[index+1:].strip()
|
||
except Exception as e:
|
||
print(f"Get Website Info Error: {e}")
|
||
return website_name
|
||
|
||
|
||
def get_audio_url(url):
|
||
path = ''
|
||
try:
|
||
response = requests.get(url, allow_redirects=False)
|
||
# 检查响应状态码
|
||
if response.status_code == 302:
|
||
path = response.headers['Location']
|
||
elif response.status_code == 200:
|
||
print('音乐文件已失效,url:' + url)
|
||
else:
|
||
print('音乐文件地址获取失败,url:' + url +',状态码' + str(response.status_code))
|
||
except Exception as e:
|
||
print(f"Get Audio Url Error: {e}")
|
||
return path
|