import csv import html import os from PyQt5.QtCore import pyqtSignal, QThread from . import msg from ..DataBase import hard_link from ..person_pc import MePC from ..util import get_abs_path if not os.path.exists('./data/聊天记录'): os.mkdir('./data/聊天记录') def escape_js_and_html(input_str): # 转义HTML特殊字符 html_escaped = html.escape(input_str, quote=False) # 手动处理JavaScript转义字符 js_escaped = ( html_escaped .replace("\\", "\\\\") .replace("'", r"\'") .replace('"', r'\"') .replace("\n", r'\n') .replace("\r", r'\r') .replace("\t", r'\t') ) return js_escaped class Output(QThread): """ 发送信息线程 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 CSV_ALL = 3 def __init__(self, contact, parent=None, type_=DOCX): super().__init__(parent) self.Child0 = None self.last_timestamp = 0 self.sec = 2 # 默认1000秒 self.contact = contact self.ta_username = contact.wxid if contact else '' self.msg_id = 0 self.output_type = type_ self.total_num = 0 self.num = 0 def progress(self, value): self.progressSignal.emit(value) def to_csv_all(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/" if not os.path.exists(origin_docx_path): os.mkdir(origin_docx_path) filename = f"{os.path.abspath('.')}/data/聊天记录/messages.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime'] messages = msg.get_messages_all() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit(1) def run(self): if self.output_type == self.DOCX: return elif self.output_type == self.CSV_ALL: self.to_csv_all() else: self.Child0 = ChildThread(self.contact, type_=self.output_type) self.Child0.progressSignal.connect(self.progress) self.Child0.rangeSignal.connect(self.rangeSignal) self.Child0.okSignal.connect(self.okSignal) self.Child0.start() def cancel(self): self.requestInterruption() class ChildThread(QThread): """ 子线程,用于导出部分聊天记录 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 def __init__(self, contact, parent=None, type_=DOCX): super().__init__(parent) self.contact = contact self.last_timestamp = 0 self.sec = 2 # 默认1000秒 self.msg_id = 0 self.output_type = type_ def is_5_min(self, timestamp): if abs(timestamp - self.last_timestamp) > 300: self.last_timestamp = timestamp return True return False def text(self, doc, isSend, message, status): return def image(self, doc, isSend, Type, content, imgPath): return def emoji(self, doc, isSend, content, imgPath): return def wx_file(self, doc, isSend, content, status): return def retract_message(self, doc, isSend, content, status): return def reply(self, doc, isSend, content, status): return def pat_a_pat(self, doc, isSend, content, status): return def video(self, doc, isSend, content, status, img_path): return def to_csv(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" if not os.path.exists(origin_docx_path): os.mkdir(origin_docx_path) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime'] messages = msg.get_messages(self.contact.wxid) # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit('ok') def to_csv_all(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/" if not os.path.exists(origin_docx_path): os.mkdir(origin_docx_path) filename = f"{os.path.abspath('.')}/data/聊天记录/messages.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime'] messages = msg.get_messages_all() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit(1) def to_html(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" if not os.path.exists(origin_docx_path): os.mkdir(origin_docx_path) messages = msg.get_messages(self.contact.wxid) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html" f = open(filename, 'w', encoding='utf-8') html_head = ''' Title
''' f.write(html_head) MePC().avatar.save(os.path.join(origin_docx_path, 'myhead.png')) self.contact.avatar.save(os.path.join(origin_docx_path, 'tahead.png')) self.rangeSignal.emit(len(messages)) for index, message in enumerate(messages): type_ = message[2] str_content = message[7] str_time = message[8] # print(type_, type(type_)) is_send = message[4] avatar = MePC().avatar_path if is_send else self.contact.avatar_path timestamp = message[5] self.progressSignal.emit(index) if type_ == 1: if self.is_5_min(timestamp): f.write( f'''
{str_time}
''' ) if is_send: f.write( f'''
{str_content}
''' ) else: f.write( f'''
{str_content}
''' ) html_end = '''
''' f.write(html_end) f.close() self.okSignal.emit(1) def to_html_(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" if not os.path.exists(origin_docx_path): os.mkdir(origin_docx_path) messages = msg.get_messages(self.contact.wxid) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html" f = open(filename, 'w', encoding='utf-8') html_head = ''' Chat Records
昨天 12:35
你已添加了凡繁烦,现在可以开始聊天了。
您好,我在武汉,你可以直接送过来吗,我有时间的话,可以自己过去拿
!!!
123
hello
你好呀
昨天 13:15
''' f.write(html_end) f.close() self.okSignal.emit(1) def run(self): if self.output_type == Output.DOCX: return elif self.output_type == Output.CSV: self.to_csv() elif self.output_type == Output.HTML: self.to_html_() elif self.output_type == Output.CSV_ALL: self.to_csv_all() def cancel(self): self.requestInterruption()