import csv import html import os from re import findall from PyQt5.QtCore import pyqtSignal, QThread from PyQt5.QtWidgets import QFileDialog from . import msg_db, micro_msg_db from .package_msg import PackageMsg from ..DataBase import hard_link_db from ..DataBase import media_msg_db from ..person_pc import MePC from ..util import path import shutil from ..util.compress_content import parser_reply from ..util.emoji import get_emoji os.makedirs('./data/聊天记录', exist_ok=True) def makedirs(path): os.makedirs(path, exist_ok=True) os.makedirs(os.path.join(path, 'image'), exist_ok=True) os.makedirs(os.path.join(path, 'emoji'), exist_ok=True) os.makedirs(os.path.join(path, 'video'), exist_ok=True) os.makedirs(os.path.join(path, 'voice'), exist_ok=True) os.makedirs(os.path.join(path, 'file'), exist_ok=True) def escape_js_and_html(input_str): # 转义HTML特殊字符 html_escaped = html.escape(input_str, quote=False) # 手动处理JavaScript转义字符 js_escaped = ( html_escaped .replace("\\", "\\\\") .replace("'", r"\'") .replace('"', r'\"') .replace("\n", r'\n') .replace("\r", r'\r') .replace("\t", r'\t') ) return js_escaped class Output(QThread): """ 发送信息线程 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 CSV_ALL = 3 CONTACT_CSV = 4 TXT = 5 def __init__(self, contact, type_=DOCX, message_types={}, parent=None): super().__init__(parent) self.Child0 = None self.last_timestamp = 0 self.message_types = message_types self.sec = 2 # 默认1000秒 self.contact = contact self.ta_username = contact.wxid if contact else '' self.msg_id = 0 self.output_type = type_ self.total_num = 0 self.num = 0 def progress(self, value): self.progressSignal.emit(value) def to_csv_all(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/" os.makedirs(origin_docx_path, exist_ok=True) filename = QFileDialog.getSaveFileName(None, "save file", os.path.join(os.getcwd(), 'messages.csv'), "csv files (*.csv);;all files(*.*)") if not filename[0]: return filename = filename[0] # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime', 'Remark', 'NickName', 'Sender'] packagemsg = PackageMsg() messages = packagemsg.get_package_message_all() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit(1) def contact_to_csv(self): filename = QFileDialog.getSaveFileName(None, "save file", os.path.join(os.getcwd(), 'contacts.csv'), "csv files (*.csv);;all files(*.*)") if not filename[0]: return filename = filename[0] # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['UserName', 'Alias', 'Type', 'Remark', 'NickName', 'PYInitial', 'RemarkPYInitial', 'smallHeadImgUrl', 'bigHeadImgUrl'] contacts = micro_msg_db.get_contact() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(contacts) self.okSignal.emit(1) def run(self): if self.output_type == self.DOCX: return elif self.output_type == self.CSV_ALL: self.to_csv_all() elif self.output_type == self.CONTACT_CSV: self.contact_to_csv() else: self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types) self.Child.progressSignal.connect(self.progress) self.Child.rangeSignal.connect(self.rangeSignal) self.Child.okSignal.connect(self.okSignal) self.Child.start() def cancel(self): self.requestInterruption() class ChildThread(QThread): """ 子线程,用于导出部分聊天记录 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 def __init__(self, contact, type_=DOCX, message_types={}, parent=None): super().__init__(parent) self.contact = contact self.message_types = message_types self.last_timestamp = 0 self.sec = 2 # 默认1000秒 self.msg_id = 0 self.output_type = type_ def is_5_min(self, timestamp): if abs(timestamp - self.last_timestamp) > 300: self.last_timestamp = timestamp return True return False def text(self, doc, message): type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] avatar = 'myhead.png' if is_send else 'tahead.png' timestamp = message[5] if self.output_type == Output.HTML: str_content = escape_js_and_html(str_content) if self.is_5_min(timestamp): doc.write( f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:''}},''' ) emojiText = findall(r"(\[.+?\])", str_content) for emoji_text in emojiText: if emoji_text in emoji: str_content = str_content.replace(emoji_text, emoji[emoji_text]) doc.write( f'''{{ type:{1}, text: '{str_content}',is_send:{is_send},avatar_path:'{avatar}'}},''' ) elif self.output_type == Output.TXT: name = '你' if is_send else self.contact.remark doc.write( f'''{str_time} {name}\n{str_content}\n\n''' ) def image(self, doc, message): type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] avatar = 'myhead.png' if is_send else 'tahead.png' timestamp = message[5] BytesExtra = message[10] if self.output_type == Output.HTML: str_content = escape_js_and_html(str_content) image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False) image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True) if not os.path.exists(os.path.join(MePC().wx_dir, image_path)): image_path = None if not os.path.exists(os.path.join(MePC().wx_dir, image_thumb_path)): image_thumb_path = None if image_path is None and image_thumb_path is not None: image_path = image_thumb_path if image_path is None and image_thumb_path is None: return image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image') image_path = image_path.replace('\\', '/') # print(f"tohtml:---{image_path}") if self.is_5_min(timestamp): doc.write( f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:''}},''' ) doc.write( f'''{{ type:{type_}, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}'}},''' ) elif self.output_type == Output.TXT: name = '你' if is_send else self.contact.remark doc.write( f'''{str_time} {name}\n[图片]\n\n''' ) def audio(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" str_content = message[7] str_time = message[8] is_send = message[4] avatar = 'myhead.png' if is_send else 'tahead.png' timestamp = message[5] msgSvrId = message[9] if self.output_type == Output.HTML: try: audio_path = media_msg_db.get_audio(msgSvrId, output_path=origin_docx_path + "/voice") audio_path = audio_path.replace('\\', '/') voice_to_text = media_msg_db.get_audio_text(str_content) except: return if self.is_5_min(timestamp): doc.write( f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:''}},''' ) doc.write( f'''{{ type:34, text:'{audio_path}',is_send:{is_send},avatar_path:'{avatar}',voice_to_text:'{voice_to_text}'}},''' ) def emoji(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" str_content = message[7] str_time = message[8] is_send = message[4] avatar = 'myhead.png' if is_send else 'tahead.png' timestamp = message[5] if self.output_type == Output.HTML: emoji_path = get_emoji(str_content, thumb=True, output_path=origin_docx_path + '/emoji') emoji_path = './emoji/' + os.path.basename(emoji_path) if self.is_5_min(timestamp): doc.write( f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:''}},''' ) doc.write( f'''{{ type:{3}, text: '{emoji_path}',is_send:{is_send},avatar_path:'{avatar}'}},''' ) elif self.output_type == Output.TXT: name = '你' if is_send else self.contact.remark doc.write( f'''{str_time} {name}\n[表情包]\n\n''' ) def wx_file(self, doc, isSend, content, status): return def retract_message(self, doc, isSend, content, status): return def refermsg(self, doc,message): """ 处理回复消息 @param doc: @param message: @return: """ type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] avatar = 'myhead.png' if is_send else 'tahead.png' content = parser_reply(message[11]) refer_msg = content.get('refer') if self.output_type == Output.HTML: doc.write( f'''{{ type:1, text: '{content.get('title')}',is_send:{is_send},avatar_path:'{avatar}'}},''' ) doc.write( f'''{{ type:{49},sub_type:{content.get('type')}, text: '{refer_msg.get('displayname')}:{refer_msg.get('content')}',is_send:{is_send},avatar_path:''}},''' ) elif self.output_type==Output.TXT: name = '你' if is_send else self.contact.remark doc.write( f'''{str_time} {name}\n{content.get('title')}\n引用:{refer_msg.get('displayname')}:{refer_msg.get('content')}\n\n''' ) def system_msg(self, doc, message): str_content = message[7] is_send = message[4] str_time = message[8] str_content = escape_js_and_html(str_content.lstrip('').rstrip('')) if self.output_type == Output.HTML: doc.write( f'''{{ type:0, text: '{str_content}',is_send:{is_send},avatar_path:''}},''' ) elif self.output_type == Output.TXT: name = '你' if is_send else self.contact.remark doc.write( f'''{str_time} {name}\n{str_content}\n\n''' ) def video(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] BytesExtra = message[10] avatar = 'myhead.png' if is_send else 'tahead.png' timestamp = message[5] if self.output_type == Output.HTML: video_path = hard_link_db.get_video(str_content, BytesExtra, thumb=False) image_path = hard_link_db.get_video(str_content, BytesExtra, thumb=True) if video_path is None and image_path is not None: print(video_path, image_path) image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image') print(image_path) image_path = image_path.replace('\\', '/') # print(f"tohtml:---{image_path}") if self.is_5_min(timestamp): doc.write( f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:''}},''' ) doc.write( f'''{{ type:3, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}'}},''' ) return if video_path is None and image_path is None: return video_path = f'{MePC().wx_dir}/{video_path}' if os.path.exists(video_path): new_path = origin_docx_path + '/video/' + os.path.basename(video_path) if not os.path.exists(new_path): shutil.copy(video_path, os.path.join(origin_docx_path, 'video')) video_path = f'./video/{os.path.basename(video_path)}' video_path = video_path.replace('\\', '/') if self.is_5_min(timestamp): doc.write( f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:''}},''' ) doc.write( f'''{{ type:{type_}, text: '{video_path}',is_send:{is_send},avatar_path:'{avatar}'}},''' ) elif self.output_type == Output.TXT: name = '你' if is_send else self.contact.remark doc.write( f'''{str_time} {name}\n[视频]\n\n''' ) def to_csv(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" os.makedirs(origin_docx_path, exist_ok=True) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}_utf8.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime'] messages = msg_db.get_messages(self.contact.wxid) # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) # with open(filename, mode='r', newline='', encoding='utf-8') as file: # content = file.read() # filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}_gbk.csv" # with open(filename, mode='w', newline='', encoding='gbk') as file: # file.write(content.encode('utf-8', errors='ignore').decode('gbk', errors='ignore')) self.okSignal.emit('ok') def to_html_(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" makedirs(origin_docx_path) messages = msg_db.get_messages(self.contact.wxid) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html" f = open(filename, 'w', encoding='utf-8') f.write(html_head) MePC().avatar.save(os.path.join(origin_docx_path, 'myhead.png')) self.contact.avatar.save(os.path.join(origin_docx_path, 'tahead.png')) self.rangeSignal.emit(len(messages)) total_steps = len(messages) for index, message in enumerate(messages): type_ = message[2] sub_type = message[3] self.progressSignal.emit(int((index + 1) / total_steps * 100)) if type_ == 1 and self.message_types.get(type_): self.text(f, message) elif type_ == 3 and self.message_types.get(type_): self.image(f, message) elif type_ == 34 and self.message_types.get(type_): self.audio(f, message) elif type_ == 43 and self.message_types.get(type_): self.video(f, message) elif type_ == 47 and self.message_types.get(type_): self.emoji(f, message) elif type_ == 10000 and self.message_types.get(type_): self.system_msg(f, message) elif type_ == 49 and sub_type == 57: self.refermsg(f,message) f.write(html_end) f.close() self.okSignal.emit(1) def to_txt(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" os.makedirs(origin_docx_path, exist_ok=True) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.txt" messages = msg_db.get_messages(self.contact.wxid) total_steps = len(messages) with open(filename, mode='w', newline='', encoding='utf-8') as f: for index, message in enumerate(messages): type_ = message[2] sub_type = message[3] self.progressSignal.emit(int((index + 1) / total_steps * 100)) if type_ == 1 and self.message_types.get(type_): self.text(f, message) elif type_ == 3 and self.message_types.get(type_): self.image(f, message) elif type_ == 43 and self.message_types.get(type_): self.video(f, message) elif type_ == 47 and self.message_types.get(type_): self.emoji(f, message) elif type_ == 10000 and self.message_types.get(type_): self.system_msg(f, message) elif type_ == 49 and sub_type == 57: self.refermsg(f, message) self.okSignal.emit(1) def run(self): if self.output_type == Output.DOCX: return elif self.output_type == Output.CSV: self.to_csv() elif self.output_type == Output.HTML: self.to_html_() elif self.output_type == Output.CSV_ALL: self.to_csv_all() elif self.output_type == Output.TXT: self.to_txt() def cancel(self): self.requestInterruption() emoji = { '[微笑]': '', '[撇嘴]': '', '[色]': '', '[发呆]': '', '[得意]': '', '[流泪]': '', '[害羞]': '', '[闭嘴]': '', '[睡]': '', '[大哭]': '', '[尴尬]': '', '[发怒]': '', '[调皮]': '', '[呲牙]': '', '[惊讶]': '', '[难过]': '', '[抓狂]': '', '[吐]': '', '[偷笑]': '', '[愉快]': '', '[白眼]': '', '[傲慢]': '', '[困]': '', '[惊恐]': '', '[憨笑]': '', '[悠闲]': '', '[咒骂]': '', '[疑问]': '', '[嘘]': '', '[晕]': '', '[衰]': '', '[骷髅]': '', '[敲打]': '', '[再见]': '', '[擦汗]': '', '[抠鼻]': '', '[鼓掌]': '', '[坏笑]': '', '[右哼哼]': '', '[鄙视]': '', '[委屈]': '', '[快哭了]': '', '[阴险]': '', '[亲亲]': '', '[可怜]': '', '[笑脸]': '', '[生病]': '', '[脸红]': '', '[破涕为笑]': '', '[恐惧]': '', '[失望]': '', '[无语]': '', '[嘿哈]': '', '[捂脸]': '', '[奸笑]': '', '[机智]': '', '[皱眉]': '', '[耶]': '', '[吃瓜]': '', '[加油]': '', '[汗]': '', '[天啊]': '', '[Emm]': '', '[社会社会]': '', '[旺柴]': '', '[好的]': '', '[打脸]': '', '[哇]': '', '[翻白眼]': '', '[666]': '', '[让我看看]': '', '[叹气]': '', '[苦涩]': '', '[裂开]': '', '[嘴唇]': '', '[爱心]': '', '[心碎]': '', '[拥抱]': '', '[强]': '', '[弱]': '', '[握手]': '', '[胜利]': '', '[抱拳]': '', '[勾引]': '', '[拳头]': '', '[OK]': '', '[合十]': '', '[啤酒]': '', '[咖啡]': '', '[蛋糕]': '', '[玫瑰]': '', '[凋谢]': '', '[菜刀]': '', '[炸弹]': '', '[便便]': '', '[月亮]': '', '[太阳]': '', '[庆 祝]': '', '[礼物]': '', '[红包]': '', '[發]': '', '[福]': '', '[烟花]': '', '[爆竹]': '', '[猪头]': '', '[跳跳]': '', '[发抖]': '', '[转圈]': ''} html_head = ''' Chat Records
昨天 12:35
你已添加了凡繁烦,现在可以开始聊天了。
您好,我在武汉,你可以直接送过来吗,我有时间的话,可以自己过去拿
!!!
123
hello
你好呀
昨天 13:15
'''