import csv import html import os from PyQt5.QtCore import pyqtSignal, QThread from . import msg_db from .package_msg import PackageMsg from ..DataBase import hard_link_db from ..person_pc import MePC from ..util import path os.makedirs('./data/聊天记录', exist_ok=True) def makedirs(path): os.makedirs(path,exist_ok=True) os.makedirs(os.path.join(path, 'image'), exist_ok=True) os.makedirs(os.path.join(path, 'emoji'), exist_ok=True) os.makedirs(os.path.join(path, 'video'), exist_ok=True) os.makedirs(os.path.join(path, 'voice'), exist_ok=True) os.makedirs(os.path.join(path, 'file'), exist_ok=True) def escape_js_and_html(input_str): # 转义HTML特殊字符 html_escaped = html.escape(input_str, quote=False) # 手动处理JavaScript转义字符 js_escaped = ( html_escaped .replace("\\", "\\\\") .replace("'", r"\'") .replace('"', r'\"') .replace("\n", r'\n') .replace("\r", r'\r') .replace("\t", r'\t') ) return js_escaped class Output(QThread): """ 发送信息线程 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 CSV_ALL = 3 def __init__(self, contact, parent=None, type_=DOCX): super().__init__(parent) self.Child0 = None self.last_timestamp = 0 self.sec = 2 # 默认1000秒 self.contact = contact self.ta_username = contact.wxid if contact else '' self.msg_id = 0 self.output_type = type_ self.total_num = 0 self.num = 0 def progress(self, value): self.progressSignal.emit(value) def to_csv_all(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/" os.makedirs(origin_docx_path, exist_ok=True) filename = f"{os.path.abspath('.')}/data/聊天记录/messages.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime', 'Remark', 'NickName', 'Sender'] # messages = msg_db.get_messages_all() packagemsg = PackageMsg() messages = packagemsg.get_package_message_all() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit(1) def run(self): if self.output_type == self.DOCX: return elif self.output_type == self.CSV_ALL: self.to_csv_all() else: self.Child = ChildThread(self.contact, type_=self.output_type) self.Child.progressSignal.connect(self.progress) self.Child.rangeSignal.connect(self.rangeSignal) self.Child.okSignal.connect(self.okSignal) self.Child.start() def cancel(self): self.requestInterruption() class ChildThread(QThread): """ 子线程,用于导出部分聊天记录 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 def __init__(self, contact, parent=None, type_=DOCX): super().__init__(parent) self.contact = contact self.last_timestamp = 0 self.sec = 2 # 默认1000秒 self.msg_id = 0 self.output_type = type_ def is_5_min(self, timestamp): if abs(timestamp - self.last_timestamp) > 300: self.last_timestamp = timestamp return True return False def text(self, doc, isSend, message, status): return def image(self, doc, isSend, Type, content, imgPath): return def emoji(self, doc, isSend, content, imgPath): return def wx_file(self, doc, isSend, content, status): return def retract_message(self, doc, isSend, content, status): return def reply(self, doc, isSend, content, status): return def pat_a_pat(self, doc, isSend, content, status): return def video(self, doc, isSend, content, status, img_path): return def to_csv(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" os.makedirs(origin_docx_path, exist_ok=True) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime'] messages = msg_db.get_messages(self.contact.wxid) # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit('ok') def to_csv_all(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/" os.makedirs(origin_docx_path, exist_ok=True) filename = f"{os.path.abspath('.')}/data/聊天记录/messages.csv" # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime', 'Remark', 'NickName', 'Sender'] # messages = msg_db.get_messages_all() packagemsg = PackageMsg() messages = packagemsg.get_package_message_all() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit(1) def to_html(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" os.makedirs(origin_docx_path, exist_ok=True) messages = msg_db.get_messages(self.contact.wxid) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html" f = open(filename, 'w', encoding='utf-8') html_head = '''