import binascii import os.path import sqlite3 import threading import traceback import xml.etree.ElementTree as ET from app.log import log, logger from app.util.protocbuf.msg_pb2 import MessageBytesExtra image_db_lock = threading.Lock() video_db_lock = threading.Lock() image_db_path = "./app/Database/Msg/HardLinkImage.db" video_db_path = "./app/Database/Msg/HardLinkVideo.db" root_path = "FileStorage/MsgAttach/" video_root_path = "FileStorage/Video/" @log def get_md5_from_xml(content, type_="img"): try: # 解析XML root = ET.fromstring(content) if type_ == "img": # 提取md5的值 md5_value = root.find(".//img").get("md5") elif type_ == "video": md5_value = root.find(".//videomsg").get("md5") # print(md5_value) return md5_value except ET.ParseError: return None def decodeExtraBuf(extra_buf_content: bytes): if not extra_buf_content: return { "region": ('', '', ''), "signature": '', "telephone": '', "gender": 0, } trunkName = { b"\x46\xCF\x10\xC4": "个性签名", b"\xA4\xD9\x02\x4A": "国家", b"\xE2\xEA\xA8\xD1": "省份", b"\x1D\x02\x5B\xBF": "市", # b"\x81\xAE\x19\xB4": "朋友圈背景url", # b"\xF9\x17\xBC\xC0": "公司名称", # b"\x4E\xB9\x6D\x85": "企业微信属性", # b"\x0E\x71\x9F\x13": "备注图片", b"\x75\x93\x78\xAD": "手机号", b"\x74\x75\x2C\x06": "性别", } res = {"手机号": ""} off = 0 try: for key in trunkName: trunk_head = trunkName[key] try: off = extra_buf_content.index(key) + 4 except: pass char = extra_buf_content[off : off + 1] off += 1 if char == b"\x04": # 四个字节的int,小端序 intContent = extra_buf_content[off : off + 4] off += 4 intContent = int.from_bytes(intContent, "little") res[trunk_head] = intContent elif char == b"\x18": # utf-16字符串 lengthContent = extra_buf_content[off : off + 4] off += 4 lengthContent = int.from_bytes(lengthContent, "little") strContent = extra_buf_content[off : off + lengthContent] off += lengthContent res[trunk_head] = strContent.decode("utf-16").rstrip("\x00") return { "region": (res["国家"], res["省份"], res["市"]), "signature": res["个性签名"], "telephone": res["手机号"], "gender": res["性别"], } except: logger.error(f'联系人解析错误:\n{traceback.format_exc()}') return { "region": ('', '', ''), "signature": '', "telephone": '', "gender": 0, } def singleton(cls): _instance = {} def inner(): if cls not in _instance: _instance[cls] = cls() return _instance[cls] return inner @singleton class HardLink: def __init__(self): self.imageDB = None self.videoDB = None self.image_cursor = None self.video_cursor = None self.open_flag = False self.init_database() def init_database(self): if not self.open_flag: if os.path.exists(image_db_path): self.imageDB = sqlite3.connect(image_db_path, check_same_thread=False) # '''创建游标''' self.image_cursor = self.imageDB.cursor() self.open_flag = True if image_db_lock.locked(): image_db_lock.release() if os.path.exists(video_db_path): self.videoDB = sqlite3.connect(video_db_path, check_same_thread=False) # '''创建游标''' self.video_cursor = self.videoDB.cursor() self.open_flag = True if video_db_lock.locked(): video_db_lock.release() def get_image_by_md5(self, md5: bytes): if not md5: return None if not self.open_flag: return None sql = """ select Md5Hash,MD5,FileName,HardLinkImageID.Dir as DirName1,HardLinkImageID2.Dir as DirName2 from HardLinkImageAttribute join HardLinkImageID on HardLinkImageAttribute.DirID1 = HardLinkImageID.DirID join HardLinkImageID as HardLinkImageID2 on HardLinkImageAttribute.DirID2 = HardLinkImageID2.DirID where MD5 = ?; """ try: image_db_lock.acquire(True) try: self.image_cursor.execute(sql, [md5]) except AttributeError: self.init_database() self.image_cursor.execute(sql, [md5]) result = self.image_cursor.fetchone() return result finally: image_db_lock.release() def get_video_by_md5(self, md5: bytes): if not md5: return None if not self.open_flag: return None sql = """ select Md5Hash,MD5,FileName,HardLinkVideoID2.Dir as DirName2 from HardLinkVideoAttribute join HardLinkVideoID as HardLinkVideoID2 on HardLinkVideoAttribute.DirID2 = HardLinkVideoID2.DirID where MD5 = ?; """ try: video_db_lock.acquire(True) try: self.video_cursor.execute(sql, [md5]) except sqlite3.OperationalError: return None except AttributeError: self.init_database() self.video_cursor.execute(sql, [md5]) result = self.video_cursor.fetchone() return result finally: video_db_lock.release() def get_image(self, content, bytesExtra, thumb=False): msg_bytes = MessageBytesExtra() msg_bytes.ParseFromString(bytesExtra) for tmp in msg_bytes.message2: if tmp.field1 != (3 if thumb else 4): continue pathh = tmp.field2 # wxid\FileStorage\... pathh = "\\".join(pathh.split("\\")[1:]) return pathh md5 = get_md5_from_xml(content) if not md5: return None result = self.get_image_by_md5(binascii.unhexlify(md5)) if result: dir1 = result[3] dir2 = result[4] data_image = result[2] dir0 = "Thumb" if thumb else "Image" dat_image = os.path.join(root_path, dir1, dir0, dir2, data_image) return dat_image def get_video(self, content, bytesExtra, thumb=False): msg_bytes = MessageBytesExtra() msg_bytes.ParseFromString(bytesExtra) for tmp in msg_bytes.message2: if tmp.field1 != (3 if thumb else 4): continue pathh = tmp.field2 # wxid\FileStorage\... pathh = "\\".join(pathh.split("\\")[1:]) return pathh md5 = get_md5_from_xml(content, type_="video") if not md5: return None result = self.get_video_by_md5(binascii.unhexlify(md5)) if result: dir2 = result[3] data_image = result[2].split(".")[0] + ".jpg" if thumb else result[2] # dir0 = 'Thumb' if thumb else 'Image' dat_image = os.path.join(video_root_path, dir2, data_image) return dat_image else: return '' def close(self): if self.open_flag: try: image_db_lock.acquire(True) video_db_lock.acquire(True) self.open_flag = False self.imageDB.close() self.videoDB.close() finally: image_db_lock.release() video_db_lock.release() def __del__(self): self.close() # 6b02292eecea118f06be3a5b20075afc_t if __name__ == "__main__": msg_root_path = "./Msg/" image_db_path = "./Msg/HardLinkImage.db" video_db_path = "./Msg/HardLinkVideo.db" hard_link_db = HardLink() hard_link_db.init_database() # content = '''\n\t\n\t\n\t\n\n''' # print(hard_link_db.get_image(content)) # print(hard_link_db.get_image(content, thumb=False)) # result = get_md5_from_xml(content) # print(result) content = """ """ print(hard_link_db.get_video(content)) print(hard_link_db.get_video(content, thumb=True))