拆分output模块

This commit is contained in:
shuaikangzhou 2024-01-02 22:33:46 +08:00
parent d568f34f47
commit 8a1f6ad59e
7 changed files with 1142 additions and 1140 deletions

View File

@ -0,0 +1,30 @@
import csv
import os
from app.DataBase import msg_db
from app.DataBase.output import ExporterBase
from app.DataBase.package_msg import PackageMsg
class CSVExporter(ExporterBase):
def to_csv(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
os.makedirs(origin_docx_path, exist_ok=True)
filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}_utf8.csv"
columns = ['localId', 'TalkerId', 'Type', 'SubType',
'IsSender', 'CreateTime', 'Status', 'StrContent',
'StrTime', 'Remark', 'NickName', 'Sender']
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
# 写入CSV文件
with open(filename, mode='w', newline='', encoding='utf-8-sig') as file:
writer = csv.writer(file)
writer.writerow(columns)
# 写入数据
writer.writerows(messages)
self.okSignal.emit('ok')
def run(self):
self.to_csv()

View File

@ -0,0 +1,347 @@
import os
import shutil
import sys
import time
import traceback
from re import findall
import docx
from PyQt5.QtCore import pyqtSignal, QThread
from docx import shared
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.enum.text import WD_COLOR_INDEX, WD_PARAGRAPH_ALIGNMENT
from docx.oxml.ns import qn
from app.DataBase import msg_db, hard_link_db, media_msg_db
from app.DataBase.output import ExporterBase, escape_js_and_html
from app.DataBase.package_msg import PackageMsg
from app.log import logger
from app.person import Me
from app.util import path
from app.util.compress_content import parser_reply, share_card, music_share
from app.util.emoji import get_emoji_url
from app.util.file import get_file
from app.util.image import get_image_path, get_image, get_image_abs_path
from app.util.music import get_music_path
class DocxExporter(ExporterBase):
def text(self, doc, message):
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run(str_content)
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def image(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content = self.create_table(doc, is_send, avatar)
run = content.paragraphs[0].add_run()
str_content = escape_js_and_html(str_content)
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
return
image_path = image_thumb_path
image_path = get_image_abs_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
run.add_picture(image_path, height=shared.Inches(2))
doc.add_paragraph()
except Exception:
print("Error!image")
def audio(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
str_content = message[7]
str_time = message[8]
is_send = message[4]
msgSvrId = message[9]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【表情包】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def emoji(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【表情包】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def file(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
bytesExtra = message[10]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【文件】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run(content.get('title'))
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
reply_p = content_cell.add_paragraph()
reply_content = f"{refer_msg.get('displayname')}:{refer_msg.get('content')}" if refer_msg else '未知引用'
run = content_cell.paragraphs[1].add_run(reply_content)
'''设置被回复内容格式'''
run.font.color.rgb = shared.RGBColor(121, 121, 121)
run.font_size = shared.Inches(0.3)
run.font.highlight_color = WD_COLOR_INDEX.GRAY_25
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
reply_p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def system_msg(self, doc, message):
str_content = message[7]
is_send = message[4]
str_time = message[8]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
res = findall('(</{0,1}(img|revo|_wc_cus|a).*?>)', str_content)
for xmlstr, b in res:
str_content = str_content.replace(xmlstr, "")
doc.add_paragraph(str_content).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
def video(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【视频】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def create_table(self, doc, is_send, avatar_path):
'''
#! 创建一个1*2表格
#! isSend = 1 (0,0)存聊天内容,(0,1)存头像
#! isSend = 0 (0,0)存头像,(0,1)存聊天内容
#! 返回聊天内容的坐标
'''
table = doc.add_table(rows=1, cols=2, style='Normal Table')
table.cell(0, 1).height = shared.Inches(0.5)
table.cell(0, 0).height = shared.Inches(0.5)
if is_send:
'''表格右对齐'''
table.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
avatar = table.cell(0, 1).paragraphs[0].add_run()
'''插入头像,设置头像宽度'''
avatar.add_picture(avatar_path, width=shared.Inches(0.5))
'''设置单元格宽度跟头像一致'''
table.cell(0, 1).width = shared.Inches(0.5)
content_cell = table.cell(0, 0)
'''聊天内容右对齐'''
content_cell.paragraphs[0].paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
else:
avatar = table.cell(0, 0).paragraphs[0].add_run()
avatar.add_picture(avatar_path, width=shared.Inches(0.5))
'''设置单元格宽度'''
table.cell(0, 0).width = shared.Inches(0.5)
content_cell = table.cell(0, 1)
'''聊天内容垂直居中对齐'''
content_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
return content_cell
def music_share(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
content = music_share(message[11])
music_path = ''
if content.get('audio_url') != '':
music_path = get_music_path(content.get('audio_url'), content.get('title'),
output_path=origin_docx_path + '/music')
if music_path != '':
music_path = f'./music/{os.path.basename(music_path)}'
music_path = music_path.replace('\\', '/')
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
def share_card(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
bytesExtra = message[10]
compress_content_ = message[11]
card_data = share_card(bytesExtra, compress_content_)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
thumbnail = ''
if card_data.get('thumbnail'):
thumbnail = os.path.join(Me().wx_dir, card_data.get('thumbnail'))
if os.path.exists(thumbnail):
shutil.copy(thumbnail, os.path.join(origin_docx_path, 'image', os.path.basename(thumbnail)))
thumbnail = './image/' + os.path.basename(thumbnail)
else:
thumbnail = ''
app_logo = ''
if card_data.get('app_logo'):
app_logo = os.path.join(Me().wx_dir, card_data.get('app_logo'))
if os.path.exists(app_logo):
shutil.copy(app_logo, os.path.join(origin_docx_path, 'image', os.path.basename(app_logo)))
app_logo = './image/' + os.path.basename(app_logo)
else:
app_logo = ''
def merge_docx(self, conRemark, n):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{conRemark}"
all_file_path = []
for i in range(n):
file_name = f"{conRemark}{i}.docx"
all_file_path.append(origin_docx_path + '/' + file_name)
filename = f"{conRemark}.docx"
# print(all_file_path)
doc = docx.Document()
doc.save(origin_docx_path + '/' + filename)
master = docx.Document(origin_docx_path + '/' + filename)
middle_new_docx = Composer(master)
num = 0
for word in all_file_path:
word_document = docx.Document(word)
word_document.add_page_break()
if num != 0:
middle_new_docx.append(word_document)
num = num + 1
os.remove(word)
middle_new_docx.save(origin_docx_path + '/' + filename)
def export(self):
print('导出docx')
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
filename = os.path.join(origin_docx_path, f"{self.contact.remark}.docx")
doc = docx.Document()
doc.styles['Normal'].font.name = u'Cambria'
doc.styles['Normal']._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体')
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
Me().save_avatar(os.path.join(f"{origin_docx_path}/avatar/{Me().wxid}.png"))
if self.contact.is_chatroom:
for message in messages:
if message[4]: # is_send
continue
try:
chatroom_avatar_path = f"{origin_docx_path}/avatar/{message[12].wxid}.png"
message[12].save_avatar(chatroom_avatar_path)
except:
print(message)
pass
else:
self.contact.save_avatar(os.path.join(f"{origin_docx_path}/avatar/{self.contact.wxid}.png"))
self.rangeSignal.emit(len(messages))
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
timestamp = message[5]
self.progressSignal.emit(1)
if self.is_5_min(timestamp):
str_time = message[8]
doc.add_paragraph(str_time).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
if type_ == 1 and self.message_types.get(type_):
self.text(doc, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(doc, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(doc, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(doc, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(doc, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(doc, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(doc, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(doc, message)
try:
doc.save(filename)
except PermissionError:
filename = filename[:-5] + f'{time.time()}' + '.docx'
doc.save(filename)
self.okSignal.emit(1)

View File

@ -0,0 +1,464 @@
import os
import shutil
import sys
import traceback
from re import findall
from PyQt5.QtCore import pyqtSignal, QThread
from app.DataBase import msg_db, hard_link_db, media_msg_db
from app.DataBase.output import ExporterBase, escape_js_and_html
from app.DataBase.package_msg import PackageMsg
from app.log import logger
from app.person import Me
from app.util import path
from app.util.compress_content import parser_reply, share_card, music_share
from app.util.emoji import get_emoji_url
from app.util.file import get_file
from app.util.image import get_image_path, get_image
from app.util.music import get_music_path
class HtmlExporter(ExporterBase):
def text(self, doc, message):
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message)
str_content = escape_js_and_html(str_content)
doc.write(
f'''{{ type:{1}, text: '{str_content}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def image(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
str_content = escape_js_and_html(str_content)
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
return
image_path = image_thumb_path
image_path = get_image_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
doc.write(
f'''{{ type:{type_}, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def audio(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
str_content = message[7]
str_time = message[8]
is_send = message[4]
msgSvrId = message[9]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
try:
audio_path = media_msg_db.get_audio_path(msgSvrId, output_path=origin_docx_path + "/voice")
audio_path = "./voice/" + os.path.basename(audio_path)
voice_to_text = escape_js_and_html(media_msg_db.get_audio_text(str_content))
except:
logger.error(traceback.format_exc())
return
doc.write(
f'''{{ type:34, text:'{audio_path}',is_send:{is_send},avatar_path:'{avatar}',voice_to_text:'{voice_to_text}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def emoji(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
emoji_path = get_emoji_url(str_content, thumb=True)
doc.write(
f'''{{ type:{3}, text: '{emoji_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def file(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
bytesExtra = message[10]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
link = get_file(bytesExtra, thumb=True, output_path=origin_docx_path + '/file')
file_name = ''
file_path = './icon/file.png'
if link != "":
file_name = os.path.basename(link)
link = './file/' + file_name
doc.write(
f'''{{ type:49, text: '{file_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',link: '{link}',sub_type:6,file_name: '{file_name}'}},'''
)
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
contentText = escape_js_and_html(content.get('title'))
if refer_msg:
referText = f"{escape_js_and_html(refer_msg.get('displayname'))}{escape_js_and_html(refer_msg.get('content'))}"
doc.write(
f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},refer_text: '{referText}',avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
else:
doc.write(
f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def system_msg(self, doc, message):
str_content = message[7]
is_send = message[4]
str_time = message[8]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
res = findall('(</{0,1}(img|revo|_wc_cus|a).*?>)', str_content)
for xmlstr, b in res:
str_content = str_content.replace(xmlstr, "")
str_content = escape_js_and_html(str_content)
doc.write(
f'''{{ type:0, text: '{str_content}',is_send:{is_send},avatar_path:'',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:''}},'''
)
def video(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
video_path = hard_link_db.get_video(str_content, BytesExtra, thumb=False)
image_path = hard_link_db.get_video(str_content, BytesExtra, thumb=True)
if video_path is None and image_path is not None:
image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
# todo 网络图片问题
print(origin_docx_path + image_path[1:])
os.utime(origin_docx_path + image_path[1:], (timestamp, timestamp))
doc.write(
f'''{{ type:3, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
except:
doc.write(
f'''{{ type:1, text: '视频丢失',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
return
if video_path is None and image_path is None:
return
video_path = f'{Me().wx_dir}/{video_path}'
if os.path.exists(video_path):
new_path = origin_docx_path + '/video/' + os.path.basename(video_path)
if not os.path.exists(new_path):
shutil.copy(video_path, os.path.join(origin_docx_path, 'video'))
os.utime(new_path, (timestamp, timestamp))
video_path = f'./video/{os.path.basename(video_path)}'
doc.write(
f'''{{ type:{type_}, text: '{video_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def music_share(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
content = music_share(message[11])
music_path = ''
if content.get('audio_url') != '':
music_path = get_music_path(content.get('audio_url'), content.get('title'),
output_path=origin_docx_path + '/music')
if music_path != '':
music_path = f'./music/{os.path.basename(music_path)}'
music_path = music_path.replace('\\', '/')
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if content.get('is_error') == False:
doc.write(
f'''{{ type:49, text:'{music_path}',is_send:{is_send},avatar_path:'{avatar}',link_url:'{content.get('link_url')}',
timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',sub_type:3,title:'{content.get('title')}',
artist:'{content.get('artist')}', website_name:'{content.get('website_name')}'}},'''
)
def share_card(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
bytesExtra = message[10]
compress_content_ = message[11]
card_data = share_card(bytesExtra, compress_content_)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
thumbnail = ''
if card_data.get('thumbnail'):
thumbnail = os.path.join(Me().wx_dir, card_data.get('thumbnail'))
if os.path.exists(thumbnail):
shutil.copy(thumbnail, os.path.join(origin_docx_path, 'image', os.path.basename(thumbnail)))
thumbnail = './image/' + os.path.basename(thumbnail)
else:
thumbnail = ''
app_logo = ''
if card_data.get('app_logo'):
app_logo = os.path.join(Me().wx_dir, card_data.get('app_logo'))
if os.path.exists(app_logo):
shutil.copy(app_logo, os.path.join(origin_docx_path, 'image', os.path.basename(app_logo)))
app_logo = './image/' + os.path.basename(app_logo)
else:
app_logo = ''
doc.write(
f'''{{ type:49,sub_type:5, text:'',is_send:{is_send},avatar_path:'{avatar}',url:'{card_data.get('url')}',
timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',title:'{card_data.get('title')}',
description:'{card_data.get('description')}',thumbnail:'{thumbnail}',app_logo:'{app_logo}',
app_name:'{card_data.get('app_name')}'
}},\n'''
)
def export(self):
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html"
file_path = './app/resources/data/template.html'
if not os.path.exists(file_path):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
file_path = os.path.join(resource_dir, 'app', 'resources', 'data', 'template.html')
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
html_head, html_end = content.split('/*注意看这是分割线*/')
f = open(filename, 'w', encoding='utf-8')
f.write(html_head.replace("<title>Chat Records</title>", f"<title>{self.contact.remark}</title>"))
self.rangeSignal.emit(len(messages))
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
timestamp = message[5]
if (type_ == 3 and self.message_types.get(3)) or (type_ == 34 and self.message_types.get(34)) or (
type_ == 47 and self.message_types.get(47)):
pass
else:
self.progressSignal.emit(1)
if self.is_5_min(timestamp):
str_time = message[8]
f.write(
f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:'',timestamp:{timestamp}}},'''
)
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(f, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(f, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(f, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(f, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(f, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(f, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(f, message)
elif type_ == 49 and sub_type == 3 and self.message_types.get(4903):
self.music_share(f, message)
elif type_ == 49 and sub_type == 5 and self.message_types.get(4905):
self.share_card(f, message)
f.write(html_end)
f.close()
self.count_finish_num(1)
def count_finish_num(self, num):
"""
记录子线程完成个数
@param num:
@return:
"""
self.num += 1
print('子线程完成',self.num,'/',self.total_num)
if self.num == self.total_num:
# 所有子线程都完成之后就发送完成信号
self.okSignal.emit(1)
class OutputMedia(QThread):
"""
导出语音消息
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact):
super().__init__()
self.contact = contact
def run(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
messages = msg_db.get_messages_by_type(self.contact.wxid, 34)
for message in messages:
is_send = message[4]
msgSvrId = message[9]
try:
audio_path = media_msg_db.get_audio(msgSvrId, output_path=origin_docx_path + "/voice")
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(34)
class OutputEmoji(QThread):
"""
导出表情包
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact):
super().__init__()
self.contact = contact
def run(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
messages = msg_db.get_messages_by_type(self.contact.wxid, 47)
for message in messages:
str_content = message[7]
try:
pass
# emoji_path = get_emoji(str_content, thumb=True, output_path=origin_docx_path + '/emoji')
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
class OutputImage(QThread):
"""
导出图片
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact):
super().__init__()
self.contact = contact
self.child_thread_num = 2
self.child_threads = [0] * (self.child_thread_num + 1)
self.num = 0
def count1(self, num):
self.num += 1
print('图片导出完成一个')
if self.num == self.child_thread_num:
self.okSingal.emit(47)
print('图片导出完成')
def run(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
messages = msg_db.get_messages_by_type(self.contact.wxid, 3)
for message in messages:
str_content = message[7]
BytesExtra = message[10]
timestamp = message[5]
try:
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
continue
image_path = image_thumb_path
image_path = get_image(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
os.utime(origin_docx_path + image_path[1:], (timestamp, timestamp))
except:
pass
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
# sublist_length = len(messages) // self.child_thread_num
# index = 0
# for i in range(0, len(messages), sublist_length):
# child_messages = messages[i:i + sublist_length]
# self.child_threads[index] = OutputImageChild(self.contact, child_messages)
# self.child_threads[index].okSingal.connect(self.count1)
# self.child_threads[index].progressSignal.connect(self.progressSignal)
# self.child_threads[index].start()
# print('开启一个新线程')
# index += 1
class OutputImageChild(QThread):
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact, messages):
super().__init__()
self.contact = contact
self.messages = messages
def run(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
for message in self.messages:
str_content = message[7]
BytesExtra = message[10]
timestamp = message[5]
try:
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
continue
image_path = image_thumb_path
image_path = get_image(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
os.utime(origin_docx_path + image_path[1:], (timestamp, timestamp))
except:
pass
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
print('图片子线程完成')

View File

@ -0,0 +1,148 @@
import os
from app.DataBase import msg_db
from app.DataBase.output import ExporterBase
from app.DataBase.package_msg import PackageMsg
from app.util.compress_content import parser_reply, share_card
class TxtExporter(ExporterBase):
def text(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
name = display_name
doc.write(
f'''{str_time} {name}\n{str_content}\n\n'''
)
def image(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[图片]\n\n'''
)
def audio(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[语音]\n\n'''
)
def emoji(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[表情包]\n\n'''
)
def file(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[文件]\n\n'''
)
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
display_name = self.get_display_name(is_send, message)
if refer_msg:
doc.write(
f'''{str_time} {display_name}\n{content.get('title')}\n引用:{refer_msg.get('displayname')}:{refer_msg.get('content')}\n\n'''
)
else:
doc.write(
f'''{str_time} {display_name}\n{content.get('title')}\n引用:未知\n\n'''
)
def system_msg(self, doc, message):
str_content = message[7]
str_time = message[8]
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
doc.write(
f'''{str_time} {str_content}\n\n'''
)
def video(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[视频]\n\n'''
)
def music_share(self, doc, message):
is_send = message[4]
str_time = message[8]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[音乐分享]\n\n'''
)
def share_card(self, doc, message):
is_send = message[4]
bytesExtra = message[10]
compress_content_ = message[11]
str_time = message[8]
card_data = share_card(bytesExtra, compress_content_)
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}
[链接]:title:{card_data.get('title')}
description:{card_data.get('description')}
url:{card_data.get('url')}
name:{card_data.get('app_name')}
\n\n'''
)
def export(self):
# 实现导出为txt的逻辑
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
os.makedirs(origin_docx_path, exist_ok=True)
filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.txt"
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
total_steps = len(messages)
with open(filename, mode='w', newline='', encoding='utf-8') as f:
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
self.progressSignal.emit(int((index + 1) / total_steps * 100))
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(f, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(f, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(f, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(f, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(f, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(f, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(f, message)
elif type_ == 49 and sub_type == 3 and self.message_types.get(4903):
self.music_share(f, message)
elif type_ == 49 and sub_type == 5 and self.message_types.get(4905):
self.share_card(f, message)
self.okSignal.emit(1)

View File

@ -1,51 +1,92 @@
import csv
import html
import os
import re
import shutil
import sys
import time
import traceback
from re import findall
import docx
import numpy as np
import pandas as pd
import xmltodict
from PyQt5.QtCore import *
from PyQt5.QtCore import pyqtSignal, QThread
from PyQt5.QtWidgets import QFileDialog
from docx import shared
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.enum.text import WD_COLOR_INDEX, WD_PARAGRAPH_ALIGNMENT
from docxcompose.composer import Composer
from docx.oxml.ns import qn
from app import person
from app.DataBase import data
from app.log import log
from .package_msg import PackageMsg
from ..DataBase import media_msg_db, hard_link_db, micro_msg_db, msg_db
from ..log import logger
from ..person import Me, Contact
from ..util import path
from ..util.compress_content import parser_reply, music_share, share_card
from ..util.emoji import get_emoji_url
from ..util.file import get_file
from ..util.music import get_music_path
from ..util.image import get_image_path, get_image, get_image_abs_path
os.makedirs('./data/聊天记录', exist_ok=True)
# import data
def set_global_font(doc, font_name):
# 创建一个新样式
style = doc.styles['Normal']
# 设置字体名称
style.font.name = font_name
# 遍历文档中的所有段落,将样式应用到每个段落
for paragraph in doc.paragraphs:
for run in paragraph.runs:
run.font.name = font_name
def IS_5_min(last_m, now_m):
"""
#! 判断两次聊天时间是不是大于五分钟
#! 若大于五分钟则显示时间
#! 否则不显示
"""
'''两次聊天记录时间差,单位是秒'''
dt = now_m - last_m
return abs(dt // 1000) >= 300
def makedirs(path):
os.makedirs(path, exist_ok=True)
os.makedirs(os.path.join(path, 'image'), exist_ok=True)
os.makedirs(os.path.join(path, 'emoji'), exist_ok=True)
os.makedirs(os.path.join(path, 'video'), exist_ok=True)
os.makedirs(os.path.join(path, 'voice'), exist_ok=True)
os.makedirs(os.path.join(path, 'file'), exist_ok=True)
os.makedirs(os.path.join(path, 'avatar'), exist_ok=True)
os.makedirs(os.path.join(path, 'music'), exist_ok=True)
os.makedirs(os.path.join(path, 'icon'), exist_ok=True)
file = './app/resources/data/file.png'
if not os.path.exists(file):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
file = os.path.join(resource_dir, 'app', 'resources', 'data', 'file.png')
shutil.copy(file, path + '/icon/file.png')
play_file = './app/resources/data/play.png'
if not os.path.exists(play_file):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
play_file = os.path.join(resource_dir, 'app', 'resources', 'data', 'play.png')
shutil.copy(play_file, path + '/icon/play.png')
pause_file = './app/resources/data/pause.png'
if not os.path.exists(pause_file):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
pause_file = os.path.join(resource_dir, 'app', 'resources', 'data', 'pause.png')
shutil.copy(pause_file, path + '/icon/pause.png')
def time_format(timestamp):
'''
#! 将字符串类型的时间戳转换成日期
#! 返回格式化的时间字符串
#! %Y-%m-%d %H:%M:%S
'''
timestamp = timestamp / 1000
time_tuple = time.localtime(timestamp)
return time.strftime("%Y-%m-%d %H:%M:%S", time_tuple)
def escape_js_and_html(input_str):
# 转义HTML特殊字符
html_escaped = html.escape(input_str, quote=False)
# 手动处理JavaScript转义字符
js_escaped = (
html_escaped
.replace("\\", "\\\\")
.replace("'", r"\'")
.replace('"', r'\"')
.replace("\n", r'\n')
.replace("\r", r'\r')
.replace("\t", r'\t')
)
return js_escaped
class Output(QThread):
"""
发送信息线程
"""
class ExporterBase(QThread):
progressSignal = pyqtSignal(int)
rangeSignal = pyqtSignal(int)
okSignal = pyqtSignal(int)
@ -53,401 +94,82 @@ class Output(QThread):
CSV = 0
DOCX = 1
HTML = 2
CSV_ALL = 3
CONTACT_CSV = 4
TXT = 5
def __init__(self, Me: person.Me, ta_u, parent=None, type_=DOCX):
def __init__(self, contact, type_=DOCX, message_types={}, parent=None):
super().__init__(parent)
self.Me = Me
self.sec = 2 # 默认1000秒
self.ta_username = ta_u
self.my_avatar = self.Me.avatar_path
self.ta_avatar = data.get_avator(ta_u)
self.msg_id = 0
self.output_type = type_
self.total_num = 0
@log
def merge_docx(self, conRemark, n):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{conRemark}"
all_file_path = []
for i in range(n):
file_name = f"{conRemark}{i}.docx"
all_file_path.append(origin_docx_path + '/' + file_name)
filename = f"{conRemark}.docx"
# print(all_file_path)
doc = docx.Document()
doc.save(origin_docx_path + '/' + filename)
master = docx.Document(origin_docx_path + '/' + filename)
middle_new_docx = Composer(master)
num = 0
for word in all_file_path:
word_document = docx.Document(word)
word_document.add_page_break()
if num != 0:
middle_new_docx.append(word_document)
num = num + 1
os.remove(word)
middle_new_docx.save(origin_docx_path + '/' + filename)
def progress(self, value):
self.i += 1
# 处理完成之后将多个文件合并
if self.i == self.total_num:
QThread.sleep(1)
conRemark = data.get_conRemark(self.ta_username)
self.progressSignal.emit(self.total_num - 1)
self.merge_docx(conRemark, self.n)
print('ok')
self.progressSignal.emit(self.total_num)
self.okSignal.emit(1)
self.progressSignal.emit(self.i)
@log
def to_csv(self, conRemark, path):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{conRemark}"
messages = data.get_all_message(self.ta_username)
# print(messages)
self.Child0 = ChildThread(self.Me, self.ta_username, messages, conRemark, 0, type_=ChildThread.CSV)
self.Child0.progressSignal.connect(self.progress)
self.Child0.start()
print("成功导出CSV文件", origin_docx_path)
self.message_types = message_types # 导出的消息类型
self.contact: Contact = contact # 联系人
self.output_type = type_ # 导出文件类型
self.total_num = 1 # 总的消息数量
self.num = 0 # 当前处理的消息数量
self.last_timestamp = 0
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
makedirs(origin_docx_path)
def run(self):
conRemark = data.get_conRemark(self.ta_username)
data.mkdir(f"{os.path.abspath('.')}/data/聊天记录/{conRemark}")
if self.output_type == self.DOCX:
self.Child = {}
if 1:
messages = data.get_all_message(self.ta_username)
self.total_num = len(messages)
self.rangeSignal.emit(self.total_num)
l = len(messages)
self.n = 10
for i in range(self.n):
q = i * (l // self.n)
p = (i + 1) * (l // self.n)
if i == self.n - 1:
p = l
len_data = messages[q:p]
# self.to_docx(len_data, i, conRemark)
self.Child[i] = ChildThread(self.Me, self.ta_username, len_data, conRemark, i)
self.Child[i].progressSignal.connect(self.progress)
self.Child[i].start()
elif self.output_type == self.CSV:
# print("线程导出csv")
# self.to_csv(self.ta_username, "path")
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.ta_username}"
messages = data.get_all_message(self.ta_username)
# print(messages)
self.Child0 = ChildThread(self.Me, self.ta_username, messages, conRemark, 0, type_=ChildThread.CSV)
self.Child0.progressSignal.connect(self.progress)
self.Child0.run()
self.okSignal.emit(1)
self.export()
def export(self):
raise NotImplementedError("export method must be implemented in subclasses")
def cancel(self):
self.requestInterruption()
class ChildThread(QThread):
"""
子线程用于导出部分聊天记录
"""
progressSignal = pyqtSignal(int)
rangeSignal = pyqtSignal(int)
i = 1
CSV = 0
DOCX = 1
HTML = 2
def is_5_min(self, timestamp) -> bool:
if abs(timestamp - self.last_timestamp) > 300:
self.last_timestamp = timestamp
return True
return False
def __init__(self, Me: person.Me, ta_u, message, conRemark, num, parent=None, type_=DOCX):
super().__init__(parent)
self.Me = Me
self.sec = 2 # 默认1000秒
self.ta_username = ta_u
self.num = num
self.my_avatar = self.Me.avatar_path
self.ta_avatar = data.get_avator(ta_u)
self.conRemark = conRemark
self.message = message
self.msg_id = 0
self.output_type = type_
def create_table(self, doc, isSend):
'''
#! 创建一个1*2表格
#! isSend = 1 (0,0)存聊天内容,(0,1)存头像
#! isSend = 0 (0,0)存头像,(0,1)存聊天内容
#! 返回聊天内容的坐标
'''
table = doc.add_table(rows=1, cols=2, style='Normal Table')
table.cell(0, 1).height = shared.Inches(0.5)
table.cell(0, 0).height = shared.Inches(0.5)
text_size = 1
if isSend:
'''表格右对齐'''
table.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
avatar = table.cell(0, 1).paragraphs[0].add_run()
'''插入头像,设置头像宽度'''
avatar.add_picture(self.my_avatar, width=shared.Inches(0.5))
'''设置单元格宽度跟头像一致'''
table.cell(0, 1).width = shared.Inches(0.5)
content_cell = table.cell(0, 0)
'''聊天内容右对齐'''
content_cell.paragraphs[0].paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
def get_avatar_path(self, is_send, message, is_absolute_path=False) -> str:
if self.contact.is_chatroom:
avatar = message[12].smallHeadImgUrl
else:
avatar = table.cell(0, 0).paragraphs[0].add_run()
avatar.add_picture(self.ta_avatar, width=shared.Inches(0.5))
'''设置单元格宽度'''
table.cell(0, 0).width = shared.Inches(0.5)
content_cell = table.cell(0, 1)
'''聊天内容垂直居中对齐'''
content_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
return content_cell
def text(self, doc, isSend, message, status):
if status == 5:
message += '(未发出) '
content_cell = self.create_table(doc, isSend)
content_cell.paragraphs[0].add_run(message)
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
# self.self_text.emit(message)
if isSend:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def image(self, doc, isSend, Type, content, imgPath):
'''
#! 插入聊天图片
#! isSend = 1 只有缩略图
#! isSend = 0 有原图
:param doc:
:param isSend:
:param Type:
:param content:
:param imgPath:
:return:
'''
content = self.create_table(doc, isSend)
run = content.paragraphs[0].add_run()
if Type == 3:
imgPath = imgPath.split('th_')[1]
imgPath = f'./app/data/image2/{imgPath[0:2]}/{imgPath[2:4]}/th_{imgPath}'
imgPath = data.clearImagePath(imgPath)
try:
run.add_picture(f'{imgPath}', height=shared.Inches(2))
doc.add_paragraph()
except Exception:
print("Error!image")
# run.add_picture(f'{Path}/{imgPath}', height=shared.Inches(2))
def emoji(self, doc, isSend, content, imgPath):
'''
#! 添加表情包
:param isSend:
:param content:
:param imgPath:
:return:
'''
imgPath = data.get_emoji(imgPath)
if 1:
is_Exist = os.path.exists(imgPath)
self.image(doc, isSend, Type=47, content=content, imgPath=imgPath)
def wx_file(self, doc, isSend, content, status):
'''
#! 添加微信文件
:param isSend:
:param content:
:param status:
:return:
'''
pattern = re.compile(r"<title>(.*?)<")
r = pattern.search(content).group()
filename = r.lstrip('<title>').rstrip('<')
self.text(doc, isSend, filename, status)
def retract_message(self, doc, isSend, content, status):
'''
#! 显示撤回消息
:param isSend:
:param content:
:param status:
:return:
'''
paragraph = doc.add_paragraph(content)
paragraph.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
def reply(self, doc, isSend, content, status):
'''
#! 添加回复信息
:param isSend:
:param content:
:param status:
:return:
'''
pattern1 = re.compile(r"<title>(?P<title>(.*?))</title>")
title = pattern1.search(content).groupdict()['title']
pattern2 = re.compile(r"<displayname>(?P<displayname>(.*?))</displayname>")
displayname = pattern2.search(content).groupdict()['displayname']
'''匹配回复的回复'''
pattern3 = re.compile(r"\n?title&gt;(?P<content>(.*?))\n?&lt;/title&gt")
if not pattern3.search(content):
if isSend == 0:
'''匹配对方的回复'''
pattern3 = re.compile(r"<content>(?P<content>(.*?))</content>")
avatar = Me().smallHeadImgUrl if is_send else self.contact.smallHeadImgUrl
if is_absolute_path:
if self.contact.is_chatroom:
avatar = message[12].avatar_path
else:
'''匹配自己的回复'''
pattern3 = re.compile(r"</msgsource>\n?<content>(?P<content>(.*?))\n?</content>")
avatar = Me().avatar_path if is_send else self.contact.avatar_path
return avatar
'''这部分代码完全可以用if代替'''
try:
'''试错'''
text = pattern3.search(content).groupdict()['content']
except Exception:
try:
'''试错'''
text = pattern3.search(content).groupdict()['content']
except Exception:
'''试错'''
pattern3 = re.compile(r"\n?<content>(?P<content>(.*?))\n?</content>")
'''试错'''
if pattern3.search(content):
text = pattern3.search(content).groupdict()['content']
def get_display_name(self, is_send, message) -> str:
if self.contact.is_chatroom:
if is_send:
display_name = Me().name
else:
text = '图片'
if status == 5:
message = '(未发出) ' + ''
content_cell = self.create_table(doc, isSend)
content_cell.paragraphs[0].add_run(title)
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
reply_p = content_cell.add_paragraph()
run = content_cell.paragraphs[1].add_run(displayname + ':' + text)
'''设置被回复内容格式'''
run.font.color.rgb = shared.RGBColor(121, 121, 121)
run.font_size = shared.Inches(0.3)
run.font.highlight_color = WD_COLOR_INDEX.GRAY_25
display_name = message[12].remark
else:
display_name = Me().name if is_send else self.contact.remark
return escape_js_and_html(display_name)
if isSend:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
reply_p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def text(self, doc, message):
return
def pat_a_pat(self, doc, isSend, content, status):
"""
#! 添加拍一拍信息
todo 把wxid转化成昵称
:param isSend:
:param content:
:param status:
:return:
"""
try:
pat_data = xmltodict.parse(content)
pat_data = pat_data['msg']['appmsg']['patMsg']['records']['record']
fromUser = pat_data['fromUser']
pattedUser = pat_data['pattedUser']
template = pat_data['template']
template = ''.join(template.split('${pattedusername@textstatusicon}'))
template = ''.join(template.split('${fromusername@textstatusicon}'))
template = template.replace(f'${{{fromUser}}}', data.get_conRemark(fromUser))
template = template.replace(f'${{{pattedUser}}}', data.get_conRemark(pattedUser))
print(template)
except Exception as e:
print(e)
template = '糟糕!出错了。'
p = doc.add_paragraph()
run = p.add_run(template)
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
'''设置拍一拍文字格式'''
run.font.color.rgb = shared.RGBColor(121, 121, 121)
run.font_size = shared.Inches(0.3)
# run.font.highlight_color=WD_COLOR_INDEX.GRAY_25
def image(self, doc, message):
return
def video(self, doc, isSend, content, status, img_path):
print(content, img_path)
def audio(self, doc, message):
return
def to_docx(self, messages, i, conRemark):
'''创建联系人目录'''
def emoji(self, doc, message):
return
filename = f"{os.path.abspath('.')}/data/聊天记录/{conRemark}/{conRemark}{i}.docx"
doc = docx.Document()
last_timestamp = 1601968667000
def file(self, doc, message):
return
for message in messages:
self.progressSignal.emit(self.i)
self.i += 1
msgId = message[0]
ta_username = message[7]
Type = int(message[2])
isSend = message[4]
content = message[8]
imgPath = message[9]
now_timestamp = message[6]
status = message[3]
createTime = time_format(now_timestamp)
# print(createTime, isSend, content)
if IS_5_min(last_timestamp, now_timestamp):
doc.add_paragraph(createTime).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
last_timestamp = now_timestamp
if Type == 1:
try:
self.text(doc, isSend, content, status)
except Exception as e:
print(e)
elif Type == 3:
self.image(doc, isSend, 3, content, imgPath)
elif Type == 47:
self.emoji(doc, isSend, content, imgPath)
elif Type == 1090519089:
self.wx_file(doc, isSend, content, status)
elif Type == 268445456:
self.retract_message(doc, isSend, content, status)
elif Type == 822083633:
self.reply(doc, isSend, content, status)
elif Type == 922746929:
self.pat_a_pat(doc, isSend, content, status)
elif Type == 43:
# print(createTime)
self.video(doc, isSend, content, status, imgPath)
# doc.add_paragraph(str(i))
print(filename)
doc.save(filename)
def refermsg(self, doc, message):
return
def to_csv(self, messages, i, conRemark):
'''创建联系人目录'''
# print('123', messages)
filename = f"{os.path.abspath('.')}/data/聊天记录/{conRemark}/{conRemark}.csv"
last_timestamp = 1601968667000
columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"]
df = pd.DataFrame()
df["用户名"] = np.array(list(map(lambda x: x[7], messages)))
df["消息内容"] = np.array(list(map(lambda x: x[8], messages)))
df["发送时间"] = np.array(list(map(lambda x: time_format(x[6]), messages)))
df["发送状态"] = np.array(list(map(lambda x: x[3], messages)))
df["消息类型"] = np.array(list(map(lambda x: x[2], messages)))
df["isSend"] = np.array(list(map(lambda x: x[4], messages)))
df["msgId"] = np.array(list(map(lambda x: x[0], messages)))
df.to_csv(filename)
# df.to_csv('data.csv')
print(df)
self.progressSignal.emit(self.num)
def system_msg(self, doc, message):
return
def to_html(self, messages, i, conRemark):
pass
def video(self, doc, message):
return
def run(self):
if self.output_type == self.DOCX:
# print("导出docx")
self.to_docx(self.message, self.num, self.conRemark)
elif self.output_type == self.CSV:
print("导出csv001")
# print('00', self.message[0])
self.to_csv(self.message, self.num, self.conRemark)
def music_share(self, doc, message):
return
if __name__ == '__main__':
# wxid_0o18ef858vnu22
# wxid_fdkbu92el15h22
me = data.Me_Person('wxid_fdkbu92el15h22')
t = Output(Me=me, ta_u='wxid_0o18ef858vnu22', type_=Output.CSV)
t.run()
def share_card(self, doc, message):
return

View File

@ -1,91 +1,23 @@
import csv
import html
import os
import shutil
import sys
import time
import traceback
from re import findall
import docx
from PyQt5.QtCore import pyqtSignal, QThread
from PyQt5.QtWidgets import QFileDialog
from docx import shared
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.enum.text import WD_COLOR_INDEX, WD_PARAGRAPH_ALIGNMENT
from docx.oxml.ns import qn
from app.DataBase.exporter_csv import CSVExporter
from app.DataBase.exporter_docx import DocxExporter
from app.DataBase.exporter_html import HtmlExporter
from app.DataBase.exporter_txt import TxtExporter
from .package_msg import PackageMsg
from ..DataBase import media_msg_db, hard_link_db, micro_msg_db, msg_db
from ..log import logger
from ..person import Me
from ..util import path
from ..util.compress_content import parser_reply, music_share, share_card
from ..util.emoji import get_emoji_url
from ..util.file import get_file
from ..util.music import get_music_path
from ..util.image import get_image_path, get_image, get_image_abs_path
from ..util.image import get_image
os.makedirs('./data/聊天记录', exist_ok=True)
def set_global_font(doc, font_name):
# 创建一个新样式
style = doc.styles['Normal']
# 设置字体名称
style.font.name = font_name
# 遍历文档中的所有段落,将样式应用到每个段落
for paragraph in doc.paragraphs:
for run in paragraph.runs:
run.font.name = font_name
def makedirs(path):
os.makedirs(path, exist_ok=True)
os.makedirs(os.path.join(path, 'image'), exist_ok=True)
os.makedirs(os.path.join(path, 'emoji'), exist_ok=True)
os.makedirs(os.path.join(path, 'video'), exist_ok=True)
os.makedirs(os.path.join(path, 'voice'), exist_ok=True)
os.makedirs(os.path.join(path, 'file'), exist_ok=True)
os.makedirs(os.path.join(path, 'avatar'), exist_ok=True)
os.makedirs(os.path.join(path, 'music'), exist_ok=True)
os.makedirs(os.path.join(path, 'icon'), exist_ok=True)
file = './app/resources/data/file.png'
if not os.path.exists(file):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
file = os.path.join(resource_dir, 'app', 'resources', 'data', 'file.png')
shutil.copy(file, path + '/icon/file.png')
play_file = './app/resources/data/play.png'
if not os.path.exists(play_file):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
play_file = os.path.join(resource_dir, 'app', 'resources', 'data', 'play.png')
shutil.copy(play_file, path + '/icon/play.png')
pause_file = './app/resources/data/pause.png'
if not os.path.exists(pause_file):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
pause_file = os.path.join(resource_dir, 'app', 'resources', 'data', 'pause.png')
shutil.copy(pause_file, path + '/icon/pause.png')
def escape_js_and_html(input_str):
# 转义HTML特殊字符
html_escaped = html.escape(input_str, quote=False)
# 手动处理JavaScript转义字符
js_escaped = (
html_escaped
.replace("\\", "\\\\")
.replace("'", r"\'")
.replace('"', r'\"')
.replace("\n", r'\n')
.replace("\r", r'\r')
.replace("\t", r'\t')
)
return js_escaped
class Output(QThread):
"""
发送信息线程
@ -182,7 +114,7 @@ class Output(QThread):
def run(self):
if self.output_type == self.DOCX:
self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types)
self.Child = DocxExporter(self.contact, type_=self.output_type, message_types=self.message_types)
self.Child.progressSignal.connect(self.progress)
self.Child.rangeSignal.connect(self.rangeSignal)
self.Child.okSignal.connect(self.okSignal)
@ -191,15 +123,21 @@ class Output(QThread):
self.to_csv_all()
elif self.output_type == self.CONTACT_CSV:
self.contact_to_csv()
elif self.output_type == self.CSV or self.output_type == self.TXT or self.output_type == self.DOCX:
self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types)
elif self.output_type == self.TXT:
self.Child = TxtExporter(self.contact, type_=self.output_type, message_types=self.message_types)
self.Child.progressSignal.connect(self.progress)
self.Child.rangeSignal.connect(self.rangeSignal)
self.Child.okSignal.connect(self.okSignal)
self.Child.start()
elif self.output_type == self.CSV:
self.Child = CSVExporter(self.contact, type_=self.output_type, message_types=self.message_types)
self.Child.progressSignal.connect(self.progress)
self.Child.rangeSignal.connect(self.rangeSignal)
self.Child.okSignal.connect(self.okSignal)
self.Child.start()
elif self.output_type == self.HTML:
self.Child = ChildThread(self.contact, type_=self.output_type, message_types=self.message_types)
self.Child.progressSignal.connect(self.progressSignal)
self.Child = HtmlExporter(self.contact, type_=self.output_type, message_types=self.message_types)
self.Child.progressSignal.connect(self.progress)
self.Child.rangeSignal.connect(self.rangeSignal)
self.Child.okSignal.connect(self.count_finish_num)
self.Child.start()
@ -210,7 +148,6 @@ class Output(QThread):
self.output_media.okSingal.connect(self.count_finish_num)
self.output_media.progressSignal.connect(self.progressSignal)
self.output_media.start()
if self.message_types.get(47):
# emoji消息单独的线程
self.total_num += 1
@ -241,643 +178,6 @@ class Output(QThread):
self.requestInterruption()
def modify_audio_metadata(audiofile, new_artist): # 修改音频元数据中的“创作者”标签
return
audiofile = load(audiofile)
# 检查文件是否有标签
if audiofile.tag is None:
audiofile.initTag()
# 修改艺术家名称
audiofile.tag.artist = new_artist
audiofile.tag.save()
class ChildThread(QThread):
"""
子线程用于导出部分聊天记录
"""
progressSignal = pyqtSignal(int)
rangeSignal = pyqtSignal(int)
okSignal = pyqtSignal(int)
i = 1
CSV = 0
DOCX = 1
HTML = 2
def __init__(self, contact, type_=DOCX, message_types={}, parent=None):
super().__init__(parent)
self.contact = contact
self.message_types = message_types
self.last_timestamp = 0
self.sec = 2 # 默认1000秒
self.msg_id = 0
self.output_type = type_
def is_5_min(self, timestamp) -> bool:
if abs(timestamp - self.last_timestamp) > 300:
self.last_timestamp = timestamp
return True
return False
def get_avatar_path(self, is_send, message, is_absolute_path=False) -> str:
if self.contact.is_chatroom:
avatar = message[12].smallHeadImgUrl
else:
avatar = Me().smallHeadImgUrl if is_send else self.contact.smallHeadImgUrl
if is_absolute_path:
if self.contact.is_chatroom:
avatar = message[12].avatar_path
else:
avatar = Me().avatar_path if is_send else self.contact.avatar_path
return avatar
def get_display_name(self, is_send, message) -> str:
if self.contact.is_chatroom:
if is_send:
display_name = Me().name
else:
display_name = message[12].remark
else:
display_name = Me().name if is_send else self.contact.remark
return escape_js_and_html(display_name)
def text(self, doc, message):
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
avatar = self.get_avatar_path(is_send, message)
str_content = escape_js_and_html(str_content)
doc.write(
f'''{{ type:{1}, text: '{str_content}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
elif self.output_type == Output.TXT:
name = display_name
doc.write(
f'''{str_time} {name}\n{str_content}\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run(str_content)
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def image(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
str_content = escape_js_and_html(str_content)
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
return
image_path = image_thumb_path
image_path = get_image_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
doc.write(
f'''{{ type:{type_}, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
elif self.output_type == Output.TXT:
doc.write(
f'''{str_time} {display_name}\n[图片]\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content = self.create_table(doc, is_send, avatar)
run = content.paragraphs[0].add_run()
str_content = escape_js_and_html(str_content)
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
return
image_path = image_thumb_path
image_path = get_image_abs_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
run.add_picture(image_path, height=shared.Inches(2))
doc.add_paragraph()
except Exception:
print("Error!image")
def audio(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
str_content = message[7]
str_time = message[8]
is_send = message[4]
msgSvrId = message[9]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
try:
audio_path = media_msg_db.get_audio_path(msgSvrId, output_path=origin_docx_path + "/voice")
audio_path = "./voice/" + os.path.basename(audio_path)
voice_to_text = escape_js_and_html(media_msg_db.get_audio_text(str_content))
except:
logger.error(traceback.format_exc())
return
doc.write(
f'''{{ type:34, text:'{audio_path}',is_send:{is_send},avatar_path:'{avatar}',voice_to_text:'{voice_to_text}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
elif self.output_type == Output.TXT:
doc.write(
f'''{str_time} {display_name}\n[语音]\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【表情包】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def emoji(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
emoji_path = get_emoji_url(str_content, thumb=True)
doc.write(
f'''{{ type:{3}, text: '{emoji_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
elif self.output_type == Output.TXT:
doc.write(
f'''{str_time} {display_name}\n[表情包]\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【表情包】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def file(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
bytesExtra = message[10]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
link = get_file(bytesExtra, thumb=True, output_path=origin_docx_path + '/file')
file_name = ''
file_path = './icon/file.png'
if link != "":
file_name = os.path.basename(link)
link = './file/' + file_name
doc.write(
f'''{{ type:49, text: '{file_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',link: '{link}',sub_type:6,file_name: '{file_name}'}},'''
)
elif self.output_type == Output.TXT:
doc.write(
f'''{str_time} {display_name}\n[文件]\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【文件】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
contentText = escape_js_and_html(content.get('title'))
if refer_msg:
referText = f"{escape_js_and_html(refer_msg.get('displayname'))}{escape_js_and_html(refer_msg.get('content'))}"
doc.write(
f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},refer_text: '{referText}',avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
else:
doc.write(
f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
elif self.output_type == Output.TXT:
if refer_msg:
doc.write(
f'''{str_time} {display_name}\n{content.get('title')}\n引用:{refer_msg.get('displayname')}:{refer_msg.get('content')}\n\n'''
)
else:
doc.write(
f'''{str_time} {display_name}\n{content.get('title')}\n引用:未知\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run(content.get('title'))
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
reply_p = content_cell.add_paragraph()
reply_content = f"{refer_msg.get('displayname')}:{refer_msg.get('content')}" if refer_msg else '未知引用'
run = content_cell.paragraphs[1].add_run(reply_content)
'''设置被回复内容格式'''
run.font.color.rgb = shared.RGBColor(121, 121, 121)
run.font_size = shared.Inches(0.3)
run.font.highlight_color = WD_COLOR_INDEX.GRAY_25
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
reply_p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def system_msg(self, doc, message):
str_content = message[7]
is_send = message[4]
str_time = message[8]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
res = findall('(</{0,1}(img|revo|_wc_cus|a).*?>)', str_content)
for xmlstr, b in res:
str_content = str_content.replace(xmlstr, "")
if self.output_type == Output.HTML:
str_content = escape_js_and_html(str_content)
doc.write(
f'''{{ type:0, text: '{str_content}',is_send:{is_send},avatar_path:'',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:''}},'''
)
elif self.output_type == Output.TXT:
doc.write(
f'''{str_time} {str_content}\n\n'''
)
elif self.output_type == Output.DOCX:
doc.add_paragraph(str_content).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
def video(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
video_path = hard_link_db.get_video(str_content, BytesExtra, thumb=False)
image_path = hard_link_db.get_video(str_content, BytesExtra, thumb=True)
if video_path is None and image_path is not None:
image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
# todo 网络图片问题
print(origin_docx_path + image_path[1:])
os.utime(origin_docx_path + image_path[1:], (timestamp, timestamp))
doc.write(
f'''{{ type:3, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
except:
doc.write(
f'''{{ type:1, text: '视频丢失',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
return
if video_path is None and image_path is None:
return
video_path = f'{Me().wx_dir}/{video_path}'
if os.path.exists(video_path):
new_path = origin_docx_path + '/video/' + os.path.basename(video_path)
if not os.path.exists(new_path):
shutil.copy(video_path, os.path.join(origin_docx_path, 'video'))
os.utime(new_path, (timestamp, timestamp))
video_path = f'./video/{os.path.basename(video_path)}'
doc.write(
f'''{{ type:{type_}, text: '{video_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
elif self.output_type == Output.TXT:
doc.write(
f'''{str_time} {display_name}\n[视频]\n\n'''
)
elif self.output_type == Output.DOCX:
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【视频】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def create_table(self, doc, is_send, avatar_path):
'''
#! 创建一个1*2表格
#! isSend = 1 (0,0)存聊天内容,(0,1)存头像
#! isSend = 0 (0,0)存头像,(0,1)存聊天内容
#! 返回聊天内容的坐标
'''
table = doc.add_table(rows=1, cols=2, style='Normal Table')
table.cell(0, 1).height = shared.Inches(0.5)
table.cell(0, 0).height = shared.Inches(0.5)
if is_send:
'''表格右对齐'''
table.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
avatar = table.cell(0, 1).paragraphs[0].add_run()
'''插入头像,设置头像宽度'''
avatar.add_picture(avatar_path, width=shared.Inches(0.5))
'''设置单元格宽度跟头像一致'''
table.cell(0, 1).width = shared.Inches(0.5)
content_cell = table.cell(0, 0)
'''聊天内容右对齐'''
content_cell.paragraphs[0].paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
else:
avatar = table.cell(0, 0).paragraphs[0].add_run()
avatar.add_picture(avatar_path, width=shared.Inches(0.5))
'''设置单元格宽度'''
table.cell(0, 0).width = shared.Inches(0.5)
content_cell = table.cell(0, 1)
'''聊天内容垂直居中对齐'''
content_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
return content_cell
def music_share(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
content = music_share(message[11])
music_path = ''
if content.get('audio_url') != '':
music_path = get_music_path(content.get('audio_url'), content.get('title'),
output_path=origin_docx_path + '/music')
if music_path != '':
music_path = f'./music/{os.path.basename(music_path)}'
music_path = music_path.replace('\\', '/')
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
if self.output_type == Output.HTML:
if content.get('is_error') == False:
doc.write(
f'''{{ type:49, text:'{music_path}',is_send:{is_send},avatar_path:'{avatar}',link_url:'{content.get('link_url')}',
timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',sub_type:3,title:'{content.get('title')}',
artist:'{content.get('artist')}', website_name:'{content.get('website_name')}'}},'''
)
def share_card(self, doc, message):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
bytesExtra = message[10]
compress_content_ = message[11]
card_data = share_card(bytesExtra, compress_content_)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
thumbnail = ''
if card_data.get('thumbnail'):
thumbnail = os.path.join(Me().wx_dir, card_data.get('thumbnail'))
if os.path.exists(thumbnail):
shutil.copy(thumbnail, os.path.join(origin_docx_path, 'image', os.path.basename(thumbnail)))
thumbnail = './image/' + os.path.basename(thumbnail)
else:
thumbnail = ''
app_logo = ''
if card_data.get('app_logo'):
app_logo = os.path.join(Me().wx_dir, card_data.get('app_logo'))
if os.path.exists(app_logo):
shutil.copy(app_logo, os.path.join(origin_docx_path, 'image', os.path.basename(app_logo)))
app_logo = './image/' + os.path.basename(app_logo)
else:
app_logo = ''
if self.output_type == Output.HTML:
doc.write(
f'''{{ type:49,sub_type:5, text:'',is_send:{is_send},avatar_path:'{avatar}',url:'{card_data.get('url')}',
timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',title:'{card_data.get('title')}',
description:'{card_data.get('description')}',thumbnail:'{thumbnail}',app_logo:'{app_logo}',
app_name:'{card_data.get('app_name')}'
}},\n'''
)
def to_csv(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
os.makedirs(origin_docx_path, exist_ok=True)
filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}_utf8.csv"
columns = ['localId', 'TalkerId', 'Type', 'SubType',
'IsSender', 'CreateTime', 'Status', 'StrContent',
'StrTime', 'Remark', 'NickName', 'Sender']
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
# 写入CSV文件
with open(filename, mode='w', newline='', encoding='utf-8-sig') as file:
writer = csv.writer(file)
writer.writerow(columns)
# 写入数据
writer.writerows(messages)
self.okSignal.emit('ok')
def to_html_(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
makedirs(origin_docx_path)
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.html"
file_path = './app/resources/data/template.html'
if not os.path.exists(file_path):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
file_path = os.path.join(resource_dir, 'app', 'resources', 'data', 'template.html')
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
html_head, html_end = content.split('/*注意看这是分割线*/')
f = open(filename, 'w', encoding='utf-8')
f.write(html_head.replace("<title>Chat Records</title>", f"<title>{self.contact.remark}</title>"))
self.rangeSignal.emit(len(messages))
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
timestamp = message[5]
if (type_ == 3 and self.message_types.get(3)) or (type_ == 34 and self.message_types.get(34)) or (
type_ == 47 and self.message_types.get(47)):
pass
else:
self.progressSignal.emit(1)
if self.is_5_min(timestamp):
str_time = message[8]
f.write(
f'''{{ type:0, text: '{str_time}',is_send:0,avatar_path:'',timestamp:{timestamp}}},'''
)
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(f, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(f, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(f, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(f, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(f, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(f, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(f, message)
elif type_ == 49 and sub_type == 3 and self.message_types.get(4903):
self.music_share(f, message)
elif type_ == 49 and sub_type == 5 and self.message_types.get(4905):
self.share_card(f, message)
f.write(html_end)
f.close()
self.okSignal.emit(1)
def to_txt(self):
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
os.makedirs(origin_docx_path, exist_ok=True)
filename = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}/{self.contact.remark}.txt"
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
total_steps = len(messages)
with open(filename, mode='w', newline='', encoding='utf-8') as f:
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
self.progressSignal.emit(int((index + 1) / total_steps * 100))
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(f, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(f, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(f, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(f, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(f, message)
elif type_ == 49 and sub_type == 57:
self.refermsg(f, message)
self.okSignal.emit(1)
def to_docx(self):
print('导出docx')
origin_docx_path = f"{os.path.abspath('.')}/data/聊天记录/{self.contact.remark}"
filename = os.path.join(origin_docx_path, f"{self.contact.remark}.docx")
makedirs(origin_docx_path)
doc = docx.Document()
doc.styles['Normal'].font.name = u'Cambria'
doc.styles['Normal']._element.rPr.rFonts.set(qn('w:eastAsia'), u'宋体')
if self.contact.is_chatroom:
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_by_wxid(self.contact.wxid)
else:
messages = msg_db.get_messages(self.contact.wxid)
Me().save_avatar(os.path.join(f"{origin_docx_path}/avatar/{Me().wxid}.png"))
if self.contact.is_chatroom:
for message in messages:
if message[4]: # is_send
continue
try:
chatroom_avatar_path = f"{origin_docx_path}/avatar/{message[12].wxid}.png"
message[12].save_avatar(chatroom_avatar_path)
except:
print(message)
pass
else:
self.contact.save_avatar(os.path.join(f"{origin_docx_path}/avatar/{self.contact.wxid}.png"))
self.rangeSignal.emit(len(messages))
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
timestamp = message[5]
self.progressSignal.emit(1)
if self.is_5_min(timestamp):
str_time = message[8]
doc.add_paragraph(str_time).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
if type_ == 1 and self.message_types.get(type_):
self.text(doc, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(doc, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(doc, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(doc, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(doc, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(doc, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(doc, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(doc, message)
try:
doc.save(filename)
except PermissionError:
filename = filename[:-5] + f'{time.time()}' + '.docx'
doc.save(filename)
self.okSignal.emit(1)
def run(self):
if self.output_type == Output.DOCX:
self.to_docx()
elif self.output_type == Output.CSV:
self.to_csv()
elif self.output_type == Output.HTML:
self.to_html_()
elif self.output_type == Output.TXT:
self.to_txt()
def cancel(self):
self.requestInterruption()
class OutputMedia(QThread):
"""
导出语音消息
@ -975,16 +275,6 @@ class OutputImage(QThread):
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
# sublist_length = len(messages) // self.child_thread_num
# index = 0
# for i in range(0, len(messages), sublist_length):
# child_messages = messages[i:i + sublist_length]
# self.child_threads[index] = OutputImageChild(self.contact, child_messages)
# self.child_threads[index].okSingal.connect(self.count1)
# self.child_threads[index].progressSignal.connect(self.progressSignal)
# self.child_threads[index].start()
# print('开启一个新线程')
# index += 1
class OutputImageChild(QThread):

View File

@ -42,8 +42,9 @@ class ExportDialog(QDialog):
self.export_choices = {"文本": True, "图片": True, "视频": True, "表情包": True} # 定义导出的数据类型,默认全部选择
elif file_type == 'txt':
self.export_type = Output.TXT
self.export_choices = {"文本": True, "图片": True, "语音": True, "视频": True,
"表情包": True} # 定义导出的数据类型,默认全部选择
self.export_choices = {"文本": True, "图片": True, "语音": True, "视频": True, "表情包": True,
'音乐与音频': True, '分享卡片': True, '文件': True,
'拍一拍等系统消息': True} # 定义导出的数据类型,默认全部选择
elif file_type == 'docx':
self.export_type = Output.DOCX
self.export_choices = {"文本": True, "图片": False, "语音": False, "视频": False,