# -*- coding: utf-8 -*-
"""
emoji.py
!!!声明:
由于表情包并不属于个人,并且其可能具有版权风险,你只有浏览权没有拥有权
另外访问腾讯API可能会给腾讯服务器造成压力
所以禁止任何人以任何方式修改或间接修改该文件,违者后果自负
"""
import os
import re
import traceback
import xml.etree.ElementTree as ET
import sqlite3
import threading
from PyQt5.QtGui import QPixmap
import requests
from app.log import log, logger
lock = threading.Lock()
db_path = "./app/Database/Msg/Emotion.db"
root_path = "./data/emoji/"
if not os.path.exists("./data"):
os.mkdir("./data")
if not os.path.exists(root_path):
os.mkdir(root_path)
@log
def get_image_format(header):
# 定义图片格式的 magic numbers
image_formats = {
b"\xFF\xD8\xFF": "jpeg",
b"\x89\x50\x4E\x47\x0D\x0A\x1A\x0A": "png",
b"\x47\x49\x46": "gif",
b"\x42\x4D": "bmp",
# 添加其他图片格式的 magic numbers
}
# 判断文件的图片格式
for magic_number, image_format in image_formats.items():
if header.startswith(magic_number):
return image_format
# 如果无法识别格式,返回 None
return None
@log
def parser_xml(xml_string):
assert type(xml_string) == str
# Parse the XML string
try:
root = ET.fromstring(xml_string)
except:
res = re.search('.*', xml_string)
if res:
xml_string = res.group()
root = ET.fromstring(xml_string.replace("&", "&"))
emoji = root.find("./emoji")
# Accessing attributes of the 'emoji' element
fromusername = emoji.get("fromusername")
tousername = emoji.get("tousername")
md5 = emoji.get("md5")
cdnurl = emoji.get("cdnurl")
encrypturl = emoji.get("encrypturl")
thumburl = emoji.get("thumburl")
externurl = emoji.get("externurl")
androidmd5 = emoji.get("androidmd5")
width = emoji.get("width")
height = emoji.get("height")
return {
"width": width,
"height": height,
"cdnurl": cdnurl,
"thumburl": thumburl if thumburl else cdnurl,
"md5": (md5 if md5 else androidmd5).lower(),
}
def singleton(cls):
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner
# 一定要保证只有一个实例对象
@singleton
class Emotion:
def __init__(self):
self.DB = None
self.cursor: sqlite3.Cursor = None
self.open_flag = False
self.init_database()
def init_database(self):
if not self.open_flag:
if os.path.exists(db_path):
self.DB = sqlite3.connect(db_path, check_same_thread=False)
# '''创建游标'''
self.cursor = self.DB.cursor()
self.open_flag = True
if lock.locked():
lock.release()
def get_emoji_url(self, md5: str, thumb: bool) -> str | bytes:
"""供下载用,返回可能是url可能是bytes"""
if thumb:
sql = """
select
case
when thumburl is NULL or thumburl = '' then cdnurl
else thumburl
end as selected_url
from CustomEmotion
where md5 = ?
"""
else:
sql = """
select CDNUrl
from CustomEmotion
where md5 = ?
"""
try:
lock.acquire(True)
self.cursor.execute(sql, [md5])
return self.cursor.fetchone()[0]
except:
md5 = md5.upper()
sql = f"""
select {"Thumb" if thumb else "Data"}
from EmotionItem
where md5 = ?
"""
self.cursor.execute(sql, [md5])
res = self.cursor.fetchone()
return res[0] if res else ""
finally:
lock.release()
def get_emoji_URL(self, md5: str, thumb: bool):
"""只管url,另外的不管"""
if thumb:
sql = """
select
case
when thumburl is NULL or thumburl = '' then cdnurl
else thumburl
end as selected_url
from CustomEmotion
where md5 = ?
"""
else:
sql = """
select CDNUrl
from CustomEmotion
where md5 = ?
"""
try:
lock.acquire(True)
self.cursor.execute(sql, [md5])
return self.cursor.fetchone()[0]
except:
return ""
finally:
lock.release()
def close(self):
if self.open_flag:
try:
lock.acquire(True)
self.open_flag = False
self.DB.close()
finally:
lock.release()
def __del__(self):
self.close()
@log
def download(url, output_dir, name, thumb=False):
resp = requests.get(url)
byte = resp.content
image_format = get_image_format(byte[:8])
if image_format:
if thumb:
output_path = os.path.join(output_dir, "th_" + name + "." + image_format)
else:
output_path = os.path.join(output_dir, name + "." + image_format)
else:
output_path = os.path.join(output_dir, name)
with open(output_path, "wb") as f:
f.write(resp.content)
return output_path
def get_most_emoji(messages):
dic = {}
for msg in messages:
str_content = msg[7]
emoji_info = parser_xml(str_content)
if emoji_info is None:
continue
md5 = emoji_info["md5"]
if not md5:
continue
try:
dic[md5][0] += 1
except:
dic[md5] = [1, emoji_info]
md5_nums = [(num[0], key, num[1]) for key, num in dic.items()]
md5_nums.sort(key=lambda x: x[0], reverse=True)
if not md5_nums:
return "", 0
md5 = md5_nums[0][1]
num = md5_nums[0][0]
emoji_info = md5_nums[0][2]
url = emoji_info["cdnurl"]
if not url or url == "":
url = Emotion().get_emoji_url(md5, False)
return url, num
def get_emoji(xml_string, thumb=True, output_path=root_path) -> str:
"""供下载用"""
try:
emoji_info = parser_xml(xml_string)
md5 = emoji_info["md5"]
image_format = [".png", ".gif", ".jpeg"]
for f in image_format:
prefix = "th_" if thumb else ""
file_path = os.path.join(output_path, prefix + md5 + f)
if os.path.exists(file_path):
return file_path
url = emoji_info["thumburl"] if thumb else emoji_info["cdnurl"]
if not url or url == "":
url = Emotion().get_emoji_url(md5, thumb)
if type(url) == str and url != "":
print("下载表情包ing:", url)
emoji_path = download(url, output_path, md5, thumb)
return emoji_path
elif type(url) == bytes:
image_format = get_image_format(url[:8])
if image_format:
if thumb:
output_path = os.path.join(
output_path, "th_" + md5 + "." + image_format
)
else:
output_path = os.path.join(output_path, md5 + "." + image_format)
else:
output_path = os.path.join(output_path, md5)
with open(output_path, "wb") as f:
f.write(url)
print("表情包数据库加载", output_path)
return output_path
else:
print("!!!未知表情包数据,信息:", xml_string, emoji_info, url)
output_path = os.path.join(output_path, "404.png")
if not os.path.exists(output_path):
QPixmap(":/icons/icons/404.png").save(output_path)
return output_path
except:
logger.error(traceback.format_exc())
output_path = os.path.join(output_path, "404.png")
if not os.path.exists(output_path):
QPixmap(":/icons/icons/404.png").save(output_path)
return output_path
def get_emoji_path(xml_string, thumb=True, output_path=root_path) -> str:
try:
emoji_info = parser_xml(xml_string)
md5 = emoji_info["md5"]
image_format = [".png", ".gif", ".jpeg"]
for f in image_format:
prefix = "th_" if thumb else ""
file_path = os.path.join(output_path, prefix + md5 + f)
return file_path
except:
logger.error(traceback.format_exc())
output_path = os.path.join(output_path, "404.png")
return output_path
def get_emoji_url(xml_string, thumb=True) -> str:
"""不管下载,只返回url"""
try:
emoji_info = parser_xml(xml_string)
md5 = emoji_info["md5"]
url = emoji_info["thumburl" if thumb else "cdnurl"]
if not url or url == "":
url = Emotion().get_emoji_URL(md5=md5, thumb=thumb)
return url
except:
logger.error(traceback.format_exc())
output_path = os.path.join("./emoji/404.png")
return output_path
if __name__ == "__main__":
# xml_string = ' '
# res1 = parser_xml(xml_string)
# print(res1, res1['md5'])
# download(res1['cdnurl'], "./data/emoji/", res1['md5'])
# download(res1['thumburl'], "./data/emoji/", res1['md5'], True)
# print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", True))
# print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", False))
print(parser_xml(""))
# print(get_emoji(xml_string, True))
# print(get_emoji(xml_string, False))
# http://vweixinf.tc.qq.com/110/20403/stodownload?m=3a4d439aba02dce4834b2c54e9f15597&filekey=3043020101042f302d02016e0402534804203361346434333961626130326463653438333462326335346539663135353937020213f0040d00000004627466730000000131&hy=SH&storeid=323032313037323030373236313130303039653236646365316535316534383236386234306230303030303036653033303034666233&ef=3&bizid=1022