# -*- coding: utf-8 -*- """ emoji.py !!!声明: 由于表情包并不属于个人,并且其可能具有版权风险,你只有浏览权没有拥有权 另外访问腾讯API可能会给腾讯服务器造成压力 所以禁止任何人以任何方式修改或间接修改该文件,违者后果自负 """ import os import traceback import xml.etree.ElementTree as ET import sqlite3 import threading import requests from app.log import log, logger root_path = './data/emoji/' if not os.path.exists('./data'): os.mkdir('./data') if not os.path.exists(root_path): os.mkdir(root_path) @log def get_image_format(header): # 定义图片格式的 magic numbers image_formats = { b'\xFF\xD8\xFF': 'jpeg', b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A': 'png', b'\x47\x49\x46': 'gif', b'\x42\x4D': 'bmp', # 添加其他图片格式的 magic numbers } # 判断文件的图片格式 for magic_number, image_format in image_formats.items(): if header.startswith(magic_number): return image_format # 如果无法识别格式,返回 None return None @log def parser_xml(xml_string): assert type(xml_string) == str # Parse the XML string try: root = ET.fromstring(xml_string) except: root = ET.fromstring(xml_string.replace("&", "&")) emoji = root.find('./emoji') # Accessing attributes of the 'emoji' element fromusername = emoji.get('fromusername') tousername = emoji.get('tousername') md5 = emoji.get('md5') cdnurl = emoji.get('cdnurl') encrypturl = emoji.get('encrypturl') thumburl = emoji.get('thumburl') externurl = emoji.get('externurl') androidmd5 = emoji.get('androidmd5') width = emoji.get('width') height = emoji.get('height') return { 'width': width, 'height': height, 'cdnurl': cdnurl, 'thumburl': thumburl if thumburl else cdnurl, 'md5': (md5 if md5 else androidmd5).lower(), } lock = threading.Lock() db_path = "./app/Database/Msg/Emotion.db" class Emotion: def __init__(self): self.DB = None self.cursor: sqlite3.Cursor = None self.open_flag = False self.init_database() def init_database(self): if not self.open_flag: if os.path.exists(db_path): self.DB = sqlite3.connect(db_path, check_same_thread=False) # '''创建游标''' self.cursor = self.DB.cursor() self.open_flag = True if lock.locked(): lock.release() def get_emoji_url(self, md5: str, thumb: bool): if thumb: sql = ''' select case when thumburl is NULL or thumburl = '' then cdnurl else thumburl end as selected_url from CustomEmotion where md5 = ? ''' else: sql = ''' select CDNUrl from CustomEmotion where md5 = ? ''' try: lock.acquire(True) self.cursor.execute(sql, [md5]) return self.cursor.fetchone()[0] except: md5 = md5.upper() sql = f""" select {"Thumb" if thumb else "Data"} from EmotionItem where md5 = ? """ self.cursor.execute(sql, [md5]) try: return self.cursor.fetchone()[0] except: return "" finally: lock.release() def close(self): if self.open_flag: try: lock.acquire(True) self.open_flag = False self.DB.close() finally: lock.release() def __del__(self): self.close() @log def download(url, output_dir, name, thumb=False): if not url: return ':/icons/icons/404.png' resp = requests.get(url) byte = resp.content image_format = get_image_format(byte[:8]) if image_format: if thumb: output_path = os.path.join(output_dir, 'th_' + name + '.' + image_format) else: output_path = os.path.join(output_dir, name + '.' + image_format) else: output_path = os.path.join(output_dir, name) with open(output_path, 'wb') as f: f.write(resp.content) return output_path def get_most_emoji(messages): dic = {} for msg in messages: str_content = msg[7] emoji_info = parser_xml(str_content) if emoji_info is None: continue md5 = emoji_info['md5'] if not md5: continue try: dic[md5][0] += 1 except: dic[md5] = [1, emoji_info] md5_nums = [(num[0], key, num[1]) for key, num in dic.items()] md5_nums.sort(key=lambda x: x[0],reverse=True) if not md5_nums: return '' md5 = md5_nums[0][1] num = md5_nums[0][0] emoji_info = md5_nums[0][2] url = emoji_info['cdnurl'] if not url or url == "": url = Emotion().get_emoji_url(md5, False) return url, num def get_emoji(xml_string, thumb=True, output_path=root_path) -> str: try: emoji_info = parser_xml(xml_string) md5 = emoji_info['md5'] image_format = ['.png', '.gif', '.jpeg'] for f in image_format: prefix = 'th_' if thumb else '' file_path = os.path.join(output_path, prefix + md5 + f) if os.path.exists(file_path): print('表情包已存在') return file_path url = emoji_info['thumburl'] if thumb else emoji_info['cdnurl'] if not url or url == "": url = Emotion().get_emoji_url(md5, thumb) if type(url) == str and url != "": print("下载表情包ing:", url) emoji_path = download(url, output_path, md5, thumb) return emoji_path elif type(url) == bytes: image_format = get_image_format(url[:8]) if image_format: if thumb: output_path = os.path.join(output_path, 'th_' + md5 + '.' + image_format) else: output_path = os.path.join(output_path, md5 + '.' + image_format) else: output_path = os.path.join(output_path, md5) with open(output_path, 'wb') as f: f.write(url) print("表情包数据库加载", output_path) return output_path else: print("!!!未知表情包数据,信息:", xml_string, emoji_info, url) return "" except: logger.error(traceback.format_exc()) return "" if __name__ == '__main__': # xml_string = ' ' # res1 = parser_xml(xml_string) # print(res1, res1['md5']) # download(res1['cdnurl'], "./data/emoji/", res1['md5']) # download(res1['thumburl'], "./data/emoji/", res1['md5'], True) # print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", True)) # print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", False)) print(parser_xml("")) # print(get_emoji(xml_string, True)) # print(get_emoji(xml_string, False)) # http://vweixinf.tc.qq.com/110/20403/stodownload?m=3a4d439aba02dce4834b2c54e9f15597&filekey=3043020101042f302d02016e0402534804203361346434333961626130326463653438333462326335346539663135353937020213f0040d00000004627466730000000131&hy=SH&storeid=323032313037323030373236313130303039653236646365316535316534383236386234306230303030303036653033303034666233&ef=3&bizid=1022