import csv import html import os import sys import time import traceback from re import findall from PyQt5.QtCore import pyqtSignal, QThread from PyQt5.QtWidgets import QFileDialog from docx.oxml.ns import qn from . import msg_db from .package_msg import PackageMsg from ..DataBase import hard_link_db from ..DataBase import media_msg_db from ..log import logger from ..person import MePC from ..util import path import shutil from ..util.compress_content import parser_reply from ..util.emoji import get_emoji_url from ..util.image import get_image_path, get_image, get_image_abs_path from ..util.file import get_file import docx from docx import shared from docx.enum.table import WD_ALIGN_VERTICAL from docx.enum.text import WD_COLOR_INDEX, WD_PARAGRAPH_ALIGNMENT os.makedirs('./data/聊天记录', exist_ok=True) def set_global_font(doc, font_name): # 创建一个新样式 style = doc.styles['Normal'] # 设置字体名称 style.font.name = font_name # 遍历文档中的所有段落,将样式应用到每个段落 for paragraph in doc.paragraphs: for run in paragraph.runs: run.font.name = font_name def makedirs(path): os.makedirs(path, exist_ok=True) os.makedirs(os.path.join(path, 'image'), exist_ok=True) os.makedirs(os.path.join(path, 'emoji'), exist_ok=True) os.makedirs(os.path.join(path, 'video'), exist_ok=True) os.makedirs(os.path.join(path, 'voice'), exist_ok=True) os.makedirs(os.path.join(path, 'file'), exist_ok=True) os.makedirs(os.path.join(path, 'avatar'), exist_ok=True) file = './app/resources/data/file.png' if not os.path.exists(file): resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__))) file = os.path.join(resource_dir, 'app', 'resources', 'data', 'file.png') shutil.copy(file, path + '/file/file.png') def escape_js_and_html(input_str): # 转义HTML特殊字符 html_escaped = html.escape(input_str, quote=False) # 手动处理JavaScript转义字符 js_escaped = ( html_escaped .replace("\\", "\\\\") .replace("'", r"\'") .replace('"', r'\"') .replace("\n", r'\n') .replace("\r", r'\r') .replace("\t", r'\t') ) return js_escaped class Output(QThread): """ 发送信息线程 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 CSV_ALL = 3 CONTACT_CSV = 4 TXT = 5 def __init__(self, contact, type_=DOCX, message_types={}, parent=None): super().__init__(parent) self.Child0 = None self.last_timestamp = 0 self.message_types = message_types self.sec = 2 # 默认1000秒 self.contact = contact self.ta_username = contact.wxid if contact else '' self.msg_id = 0 self.output_type = type_ self.total_num = 1 self.num = 0 def progress(self, value): self.progressSignal.emit(value) def to_csv_all(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/" os.makedirs(origin_docx_path, exist_ok=True) filename = QFileDialog.getSaveFileName(None, "save file", os.path.join(os.getcwd(), 'messages.csv'), "csv files (*.csv);;all files(*.*)") if not filename[0]: return filename = filename[0] # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime', 'Remark', 'NickName', 'Sender'] packagemsg = PackageMsg() messages = packagemsg.get_package_message_all() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8-sig') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit(1) def contact_to_csv(self): filename = QFileDialog.getSaveFileName(None, "save file", os.path.join(os.getcwd(), 'contacts.csv'), "csv files (*.csv);;all files(*.*)") if not filename[0]: return filename = filename[0] # columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"] columns = ['UserName', 'Alias', 'Type', 'Remark', 'NickName', 'PYInitial', 'RemarkPYInitial', 'smallHeadImgUrl', 'bigHeadImgUrl'] contacts = micro_msg_db.get_contact() # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8-sig') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(contacts) self.okSignal.emit(1) def run(self): if self.output_type == self.DOCX: self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types) self.Child.progressSignal.connect(self.progress) self.Child.rangeSignal.connect(self.rangeSignal) self.Child.okSignal.connect(self.okSignal) self.Child.start() elif self.output_type == self.CSV_ALL: self.to_csv_all() elif self.output_type == self.CONTACT_CSV: self.contact_to_csv() elif self.output_type == self.CSV or self.output_type == self.TXT or self.output_type == self.DOCX: self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types) self.Child.progressSignal.connect(self.progress) self.Child.rangeSignal.connect(self.rangeSignal) self.Child.okSignal.connect(self.okSignal) self.Child.start() elif self.output_type == self.HTML: self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types) self.Child.progressSignal.connect(self.progressSignal) self.Child.rangeSignal.connect(self.rangeSignal) self.Child.okSignal.connect(self.count_finish_num) self.Child.start() if self.message_types.get(34): # 语音消息单独的线程 self.total_num += 1 self.output_media = OutputMedia(self.contact) self.output_media.okSingal.connect(self.count_finish_num) self.output_media.progressSignal.connect(self.progressSignal) self.output_media.start() if self.message_types.get(47): # emoji消息单独的线程 self.total_num += 1 self.output_emoji = OutputEmoji(self.contact) self.output_emoji.okSingal.connect(self.count_finish_num) self.output_emoji.progressSignal.connect(self.progressSignal) self.output_emoji.start() if self.message_types.get(3): # 图片消息单独的线程 self.total_num += 1 self.output_image = OutputImage(self.contact) self.output_image.okSingal.connect(self.count_finish_num) self.output_image.progressSignal.connect(self.progressSignal) self.output_image.start() def count_finish_num(self, num): self.num += 1 if self.num == self.total_num: self.okSignal.emit(1) def cancel(self): self.requestInterruption() def modify_audio_metadata(audiofile, new_artist): # 修改音频元数据中的“创作者”标签 return audiofile = load(audiofile) # 检查文件是否有标签 if audiofile.tag is None: audiofile.initTag() # 修改艺术家名称 audiofile.tag.artist = new_artist audiofile.tag.save() class ChildThread(QThread): """ 子线程,用于导出部分聊天记录 """ progressSignal = pyqtSignal(int) rangeSignal = pyqtSignal(int) okSignal = pyqtSignal(int) i = 1 CSV = 0 DOCX = 1 HTML = 2 def __init__(self, contact, type_=DOCX, message_types={}, parent=None): super().__init__(parent) self.contact = contact self.message_types = message_types self.last_timestamp = 0 self.sec = 2 # 默认1000秒 self.msg_id = 0 self.output_type = type_ def is_5_min(self, timestamp) -> bool: if abs(timestamp - self.last_timestamp) > 300: self.last_timestamp = timestamp return True return False def get_avatar_path(self, is_send, message,is_absolute_path=False) -> str: if self.contact.is_chatroom: avatar = message[12].smallHeadImgUrl else: avatar = MePC().smallHeadImgUrl if is_send else self.contact.smallHeadImgUrl if is_absolute_path: avatar = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"+avatar return avatar def get_display_name(self, is_send, message) -> str: if self.contact.is_chatroom: if is_send: display_name = MePC().name else: display_name = message[12].remark else: display_name = MePC().name if is_send else self.contact.remark return escape_js_and_html(display_name) def text(self, doc, message): type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 display_name = self.get_display_name(is_send,message) if self.output_type == Output.HTML: avatar = self.get_avatar_path(is_send, message) str_content = escape_js_and_html(str_content) doc.write( f'''{{ type:{1}, text: '{str_content}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) elif self.output_type == Output.TXT: name = display_name doc.write( f'''{str_time} {name}\n{str_content}\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content_cell = self.create_table(doc, is_send, avatar) content_cell.paragraphs[0].add_run(str_content) content_cell.paragraphs[0].font_size = shared.Inches(0.5) if is_send: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def image(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] BytesExtra = message[10] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 avatar = self.get_avatar_path(is_send, message) display_name = self.get_display_name(is_send,message) if self.output_type == Output.HTML: str_content = escape_js_and_html(str_content) image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False) if not os.path.exists(os.path.join(MePC().wx_dir, image_path)): image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True) if not os.path.exists(os.path.join(MePC().wx_dir, image_thumb_path)): return image_path = image_thumb_path image_path = get_image_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image') doc.write( f'''{{ type:{type_}, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) elif self.output_type == Output.TXT: doc.write( f'''{str_time} {display_name}\n[图片]\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content = self.create_table(doc, is_send, avatar) run = content.paragraphs[0].add_run() str_content = escape_js_and_html(str_content) image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True) if not os.path.exists(os.path.join(MePC().wx_dir, image_path)): image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False) if not os.path.exists(os.path.join(MePC().wx_dir, image_thumb_path)): return image_path = image_thumb_path image_path = get_image_abs_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image') try: run.add_picture(image_path, height=shared.Inches(2)) doc.add_paragraph() except Exception: print("Error!image") def audio(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" str_content = message[7] str_time = message[8] is_send = message[4] msgSvrId = message[9] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 avatar = self.get_avatar_path(is_send, message) display_name = self.get_display_name(is_send, message) if self.output_type == Output.HTML: try: audio_path = media_msg_db.get_audio_path(msgSvrId, output_path=origin_docx_path + "/voice") audio_path = "./voice/" + os.path.basename(audio_path) voice_to_text = escape_js_and_html(media_msg_db.get_audio_text(str_content)) except: logger.error(traceback.format_exc()) return doc.write( f'''{{ type:34, text:'{audio_path}',is_send:{is_send},avatar_path:'{avatar}',voice_to_text:'{voice_to_text}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) elif self.output_type == Output.TXT: doc.write( f'''{str_time} {display_name}\n[语音]\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content_cell = self.create_table(doc, is_send, avatar) content_cell.paragraphs[0].add_run('【表情包】') content_cell.paragraphs[0].font_size = shared.Inches(0.5) if is_send: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def emoji(self, doc, message): str_content = message[7] str_time = message[8] is_send = message[4] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 avatar = self.get_avatar_path(is_send, message) display_name = self.get_display_name(is_send, message) if self.output_type == Output.HTML: emoji_path = get_emoji_url(str_content, thumb=True) doc.write( f'''{{ type:{3}, text: '{emoji_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) elif self.output_type == Output.TXT: doc.write( f'''{str_time} {display_name}\n[表情包]\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content_cell = self.create_table(doc, is_send, avatar) content_cell.paragraphs[0].add_run('【表情包】') content_cell.paragraphs[0].font_size = shared.Inches(0.5) if is_send: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def file(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" bytesExtra = message[10] str_time = message[8] is_send = message[4] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 avatar = self.get_avatar_path(is_send, message) display_name = self.get_display_name(is_send, message) if self.output_type == Output.HTML: link = get_file(bytesExtra, thumb=True, output_path=origin_docx_path + '/file') file_name = '' file_path = './file/file.png' if link != "": file_name = os.path.basename(link) link = './file/' + file_name doc.write( f'''{{ type:49, text: '{file_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',link: '{link}',sub_type:6,file_name: '{file_name}'}},''' ) elif self.output_type == Output.TXT: doc.write( f'''{str_time} {display_name}\n[文件]\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content_cell = self.create_table(doc, is_send, avatar) content_cell.paragraphs[0].add_run('【文件】') content_cell.paragraphs[0].font_size = shared.Inches(0.5) if is_send: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def refermsg(self, doc, message): """ 处理回复消息 @param doc: @param message: @return: """ str_time = message[8] is_send = message[4] content = parser_reply(message[11]) refer_msg = content.get('refer') timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 avatar = self.get_avatar_path(is_send, message) display_name = self.get_display_name(is_send, message) if self.output_type == Output.HTML: contentText = escape_js_and_html(content.get('title')) if refer_msg: referText = f"{escape_js_and_html(refer_msg.get('displayname'))}:{escape_js_and_html(refer_msg.get('content'))}" doc.write( f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},refer_text: '{referText}',avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) else: doc.write( f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) elif self.output_type == Output.TXT: if refer_msg: doc.write( f'''{str_time} {display_name}\n{content.get('title')}\n引用:{refer_msg.get('displayname')}:{refer_msg.get('content')}\n\n''' ) else: doc.write( f'''{str_time} {display_name}\n{content.get('title')}\n引用:未知\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content_cell = self.create_table(doc, is_send, avatar) content_cell.paragraphs[0].add_run(content.get('title')) content_cell.paragraphs[0].font_size = shared.Inches(0.5) reply_p = content_cell.add_paragraph() reply_content = f"{refer_msg.get('displayname')}:{refer_msg.get('content')}" if refer_msg else '未知引用' run = content_cell.paragraphs[1].add_run(reply_content) '''设置被回复内容格式''' run.font.color.rgb = shared.RGBColor(121, 121, 121) run.font_size = shared.Inches(0.3) run.font.highlight_color = WD_COLOR_INDEX.GRAY_25 if is_send: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT reply_p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def system_msg(self, doc, message): str_content = message[7] is_send = message[4] str_time = message[8] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 str_content = str_content.replace('重新编辑]]>', "") res = findall('({0,1}(img|revo|_wc_cus|a).*?>)', str_content) for xmlstr, b in res: str_content = str_content.replace(xmlstr, "") if self.output_type == Output.HTML: str_content = escape_js_and_html(str_content) doc.write( f'''{{ type:0, text: '{str_content}',is_send:{is_send},avatar_path:'',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:''}},''' ) elif self.output_type == Output.TXT: doc.write( f'''{str_time} {str_content}\n\n''' ) elif self.output_type == Output.DOCX: doc.add_paragraph(str_content).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER def video(self, doc, message): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" type_ = message[2] str_content = message[7] str_time = message[8] is_send = message[4] BytesExtra = message[10] timestamp = message[5] is_chatroom = 1 if self.contact.is_chatroom else 0 avatar = self.get_avatar_path(is_send, message) display_name = self.get_display_name(is_send, message) if self.output_type == Output.HTML: video_path = hard_link_db.get_video(str_content, BytesExtra, thumb=False) image_path = hard_link_db.get_video(str_content, BytesExtra, thumb=True) if video_path is None and image_path is not None: image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image') try: # todo 网络图片问题 print(origin_docx_path + image_path[1:]) os.utime(origin_docx_path + image_path[1:], (timestamp, timestamp)) doc.write( f'''{{ type:3, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) except: doc.write( f'''{{ type:1, text: '视频丢失',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) return if video_path is None and image_path is None: return video_path = f'{MePC().wx_dir}/{video_path}' if os.path.exists(video_path): new_path = origin_docx_path + '/video/' + os.path.basename(video_path) if not os.path.exists(new_path): shutil.copy(video_path, os.path.join(origin_docx_path, 'video')) os.utime(new_path, (timestamp, timestamp)) video_path = f'./video/{os.path.basename(video_path)}' doc.write( f'''{{ type:{type_}, text: '{video_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},''' ) elif self.output_type == Output.TXT: doc.write( f'''{str_time} {display_name}\n[视频]\n\n''' ) elif self.output_type == Output.DOCX: avatar = self.get_avatar_path(is_send,message,True) content_cell = self.create_table(doc, is_send, avatar) content_cell.paragraphs[0].add_run('【视频】') content_cell.paragraphs[0].font_size = shared.Inches(0.5) if is_send: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def create_table(self, doc, is_send, avatar_path): ''' #! 创建一个1*2表格 #! isSend = 1 (0,0)存聊天内容,(0,1)存头像 #! isSend = 0 (0,0)存头像,(0,1)存聊天内容 #! 返回聊天内容的坐标 ''' table = doc.add_table(rows=1, cols=2, style='Normal Table') table.cell(0, 1).height = shared.Inches(0.5) table.cell(0, 0).height = shared.Inches(0.5) if is_send: '''表格右对齐''' table.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT avatar = table.cell(0, 1).paragraphs[0].add_run() '''插入头像,设置头像宽度''' avatar.add_picture(avatar_path, width=shared.Inches(0.5)) '''设置单元格宽度跟头像一致''' table.cell(0, 1).width = shared.Inches(0.5) content_cell = table.cell(0, 0) '''聊天内容右对齐''' content_cell.paragraphs[0].paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT else: avatar = table.cell(0, 0).paragraphs[0].add_run() avatar.add_picture(avatar_path, width=shared.Inches(0.5)) '''设置单元格宽度''' table.cell(0, 0).width = shared.Inches(0.5) content_cell = table.cell(0, 1) '''聊天内容垂直居中对齐''' content_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER return content_cell def to_csv(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" os.makedirs(origin_docx_path, exist_ok=True) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}_utf8.csv" columns = ['localId', 'TalkerId', 'Type', 'SubType', 'IsSender', 'CreateTime', 'Status', 'StrContent', 'StrTime', 'Remark', 'NickName', 'Sender'] if self.contact.is_chatroom: packagemsg = PackageMsg() messages = packagemsg.get_package_message_by_wxid(self.contact.wxid) else: messages = msg_db.get_messages(self.contact.wxid) # 写入CSV文件 with open(filename, mode='w', newline='', encoding='utf-8-sig') as file: writer = csv.writer(file) writer.writerow(columns) # 写入数据 writer.writerows(messages) self.okSignal.emit('ok') def to_html_(self): origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}" makedirs(origin_docx_path) if self.contact.is_chatroom: packagemsg = PackageMsg() messages = packagemsg.get_package_message_by_wxid(self.contact.wxid) else: messages = msg_db.get_messages(self.contact.wxid) filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html" file_path = './app/resources/data/template.html' if not os.path.exists(file_path): resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__))) file_path = os.path.join(resource_dir, 'app', 'resources', 'data', 'template.html') with open(file_path, "r", encoding="utf-8") as f: content = f.read() html_head, html_end = content.split('/*注意看这是分割线*/') f = open(filename, 'w', encoding='utf-8') f.write(html_head.replace("