# -*- coding: utf-8 -*- """ emoji.py !!!声明: 由于表情包并不属于个人,并且其可能具有版权风险,你只有浏览权没有拥有权 另外访问腾讯API可能会给腾讯服务器造成压力 所以禁止任何人以任何方式修改或间接修改该文件,违者后果自负 """ from typing import Union import os import traceback import xml.etree.ElementTree as ET import sqlite3 import threading from PyQt5.QtGui import QPixmap import requests from app.log import log, logger root_path = './data/emoji/' if not os.path.exists('./data'): os.mkdir('./data') if not os.path.exists(root_path): os.mkdir(root_path) @log def get_image_format(header): # 定义图片格式的 magic numbers image_formats = { b'\xFF\xD8\xFF': 'jpeg', b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A': 'png', b'\x47\x49\x46': 'gif', b'\x42\x4D': 'bmp', # 添加其他图片格式的 magic numbers } # 判断文件的图片格式 for magic_number, image_format in image_formats.items(): if header.startswith(magic_number): return image_format # 如果无法识别格式,返回 None return None @log def parser_xml(xml_string): assert type(xml_string) == str # Parse the XML string try: root = ET.fromstring(xml_string) except: root = ET.fromstring(xml_string.replace("&", "&")) emoji = root.find('./emoji') # Accessing attributes of the 'emoji' element fromusername = emoji.get('fromusername') tousername = emoji.get('tousername') md5 = emoji.get('md5') cdnurl = emoji.get('cdnurl') encrypturl = emoji.get('encrypturl') thumburl = emoji.get('thumburl') externurl = emoji.get('externurl') androidmd5 = emoji.get('androidmd5') width = emoji.get('width') height = emoji.get('height') return { 'width': width, 'height': height, 'cdnurl': cdnurl, 'thumburl': thumburl if thumburl else cdnurl, 'md5': (md5 if md5 else androidmd5).lower(), } lock = threading.Lock() db_path = "./app/Database/Msg/Emotion.db" class Emotion: def __init__(self): self.DB = None self.cursor: sqlite3.Cursor = None self.open_flag = False self.init_database() def init_database(self): if not self.open_flag: if os.path.exists(db_path): self.DB = sqlite3.connect(db_path, check_same_thread=False) # '''创建游标''' self.cursor = self.DB.cursor() self.open_flag = True if lock.locked(): lock.release() def get_emoji_url(self, md5: str, thumb: bool) -> Union[str, bytes]: '''供下载用,返回可能是url可能是bytes''' if thumb: sql = ''' select case when thumburl is NULL or thumburl = '' then cdnurl else thumburl end as selected_url from CustomEmotion where md5 = ? ''' else: sql = ''' select CDNUrl from CustomEmotion where md5 = ? ''' try: lock.acquire(True) self.cursor.execute(sql, [md5]) return self.cursor.fetchone()[0] except: md5 = md5.upper() sql = f""" select {"Thumb" if thumb else "Data"} from EmotionItem where md5 = ? """ self.cursor.execute(sql, [md5]) try: return self.cursor.fetchone()[0] except: return "" finally: lock.release() def get_emoji_URL(self, md5: str, thumb: bool): '''只管url,另外的不管''' if thumb: sql = ''' select case when thumburl is NULL or thumburl = '' then cdnurl else thumburl end as selected_url from CustomEmotion where md5 = ? ''' else: sql = ''' select CDNUrl from CustomEmotion where md5 = ? ''' try: lock.acquire(True) self.cursor.execute(sql, [md5]) return self.cursor.fetchone()[0] except: return "" finally: lock.release() def close(self): if self.open_flag: try: lock.acquire(True) self.open_flag = False self.DB.close() finally: lock.release() def __del__(self): self.close() @log def download(url, output_dir, name, thumb=False): resp = requests.get(url) byte = resp.content image_format = get_image_format(byte[:8]) if image_format: if thumb: output_path = os.path.join(output_dir, 'th_' + name + '.' + image_format) else: output_path = os.path.join(output_dir, name + '.' + image_format) else: output_path = os.path.join(output_dir, name) with open(output_path, 'wb') as f: f.write(resp.content) return output_path def get_most_emoji(messages): dic = {} for msg in messages: str_content = msg[7] emoji_info = parser_xml(str_content) if emoji_info is None: continue md5 = emoji_info['md5'] if not md5: continue try: dic[md5][0] += 1 except: dic[md5] = [1, emoji_info] md5_nums = [(num[0], key, num[1]) for key, num in dic.items()] md5_nums.sort(key=lambda x: x[0], reverse=True) if not md5_nums: return '', 0 md5 = md5_nums[0][1] num = md5_nums[0][0] emoji_info = md5_nums[0][2] url = emoji_info['cdnurl'] if not url or url == "": url = Emotion().get_emoji_url(md5, False) return url, num def get_emoji(xml_string, thumb=True, output_path=root_path) -> str: """供下载用""" try: emoji_info = parser_xml(xml_string) md5 = emoji_info['md5'] image_format = ['.png', '.gif', '.jpeg'] for f in image_format: prefix = 'th_' if thumb else '' file_path = os.path.join(output_path, prefix + md5 + f) if os.path.exists(file_path): return file_path url = emoji_info['thumburl'] if thumb else emoji_info['cdnurl'] if not url or url == "": url = Emotion().get_emoji_url(md5, thumb) if type(url) == str and url != "": print("下载表情包ing:", url) emoji_path = download(url, output_path, md5, thumb) return emoji_path elif type(url) == bytes: image_format = get_image_format(url[:8]) if image_format: if thumb: output_path = os.path.join(output_path, 'th_' + md5 + '.' + image_format) else: output_path = os.path.join(output_path, md5 + '.' + image_format) else: output_path = os.path.join(output_path, md5) with open(output_path, 'wb') as f: f.write(url) print("表情包数据库加载", output_path) return output_path else: print("!!!未知表情包数据,信息:", xml_string, emoji_info, url) output_path = os.path.join(output_path, '404.png') if not os.path.exists(output_path): QPixmap(':/icons/icons/404.png').save(output_path) return output_path except: logger.error(traceback.format_exc()) output_path = os.path.join(output_path, "404.png") if not os.path.exists(output_path): QPixmap(':/icons/icons/404.png').save(output_path) return output_path def get_emoji_path(xml_string, thumb=True, output_path=root_path) -> str: try: emoji_info = parser_xml(xml_string) md5 = emoji_info['md5'] image_format = ['.png', '.gif', '.jpeg'] for f in image_format: prefix = 'th_' if thumb else '' file_path = os.path.join(output_path, prefix + md5 + f) return file_path except: logger.error(traceback.format_exc()) output_path = os.path.join(output_path, "404.png") return output_path def get_emoji_url(xml_string, thumb=True) -> str: """不管下载,只返回url""" try: emoji_info = parser_xml(xml_string) md5 = emoji_info['md5'] url = emoji_info["thumburl" if thumb else "cdnurl"] if not url or url == '': url = Emotion().get_emoji_URL(md5=md5, thumb=thumb) return url except: logger.error(traceback.format_exc()) output_path = os.path.join("./emoji/404.png") return output_path if __name__ == '__main__': # xml_string = ' ' # res1 = parser_xml(xml_string) # print(res1, res1['md5']) # download(res1['cdnurl'], "./data/emoji/", res1['md5']) # download(res1['thumburl'], "./data/emoji/", res1['md5'], True) # print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", True)) # print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", False)) print(parser_xml("")) # print(get_emoji(xml_string, True)) # print(get_emoji(xml_string, False)) # http://vweixinf.tc.qq.com/110/20403/stodownload?m=3a4d439aba02dce4834b2c54e9f15597&filekey=3043020101042f302d02016e0402534804203361346434333961626130326463653438333462326335346539663135353937020213f0040d00000004627466730000000131&hy=SH&storeid=323032313037323030373236313130303039653236646365316535316534383236386234306230303030303036653033303034666233&ef=3&bizid=1022