新增导出AI对话专用TXT

This commit is contained in:
SiYuan 2024-07-06 17:51:39 +08:00
parent 8e33a2db57
commit 8be7d0d34e
5 changed files with 171 additions and 8 deletions

View File

@ -3,6 +3,7 @@ import random
import sqlite3
import threading
import traceback
from collections import defaultdict
from datetime import datetime, date
from typing import Tuple
@ -222,7 +223,7 @@ class Msg:
# result.sort(key=lambda x: x[5])
# return self.add_sender(result)
def get_messages_all(self,time_range=None):
def get_messages_all(self, time_range=None):
if time_range:
start_time, end_time = convert_to_timestamp(time_range)
sql = f'''
@ -242,6 +243,45 @@ class Msg:
result.sort(key=lambda x: x[5])
return result
def get_messages_group_by_day(
self,
username_: str,
time_range: Tuple[int | float | str | date, int | float | str | date] = None,
) -> dict:
"""
return dict {
date: messages
}
"""
if not self.open_flag:
return {}
if time_range:
start_time, end_time = convert_to_timestamp(time_range)
sql = f'''
select localId,TalkerId,Type,SubType,IsSender,CreateTime,Status,StrContent,strftime('%Y-%m-%d %H:%M:%S',CreateTime,'unixepoch','localtime') as StrTime,MsgSvrID,BytesExtra,CompressContent,DisplayContent
from MSG
where StrTalker=? AND type=1
{'AND CreateTime>' + str(start_time) + ' AND CreateTime<' + str(end_time) if time_range else ''}
order by CreateTime;
'''
try:
lock.acquire(True)
self.cursor.execute(sql, [username_])
result = self.cursor.fetchall()
finally:
lock.release()
result = parser_chatroom_message(result) if username_.__contains__('@chatroom') else result
# 按天分组存储聊天记录
grouped_results = defaultdict(list)
for row in result:
'2024-01-01'
date = row[8][:10] # 获取日期部分
grouped_results[date].append(row) # 将消息加入对应的日期列表中
return grouped_results
def get_messages_length(self):
sql = '''
select count(*)
@ -329,7 +369,7 @@ class Msg:
result = self.cursor.fetchall()
return result
def get_messages_by_keyword(self, username_, keyword, num=5, max_len=10,time_range=None, year_='all'):
def get_messages_by_keyword(self, username_, keyword, num=5, max_len=10, time_range=None, year_='all'):
if not self.open_flag:
return None
if time_range:
@ -491,7 +531,7 @@ class Msg:
lock.release()
return result
def get_messages_by_hour(self, username_, time_range=None,year_='all'):
def get_messages_by_hour(self, username_, time_range=None, year_='all'):
result = []
if not self.open_flag:
return result
@ -535,7 +575,7 @@ class Msg:
lock.release()
return result
def get_latest_time_of_message(self, username_='', time_range=None,year_='all'):
def get_latest_time_of_message(self, username_='', time_range=None, year_='all'):
if not self.open_flag:
return None
if time_range:
@ -743,7 +783,7 @@ class Msg:
def get_send_messages_number_by_hour(
self,
time_range: Tuple[int | float | str | date, int | float | str | date] = None,
)->list:
) -> list:
"""
统计每个小时时段自己总共发了多少消息从最多到最少排序\n
return be like [('23', 9526), ('00', 7890), ('22', 7600), ..., ('05', 29)]
@ -773,11 +813,12 @@ class Msg:
finally:
lock.release()
return result
def get_message_length(
self,
username_='',
time_range: Tuple[int | float | str | date, int | float | str | date] = None,
)->int:
) -> int:
"""
统计自己总共发消息的字数包含type=1的文本和type=49,subtype=57里面自己发的文本
"""
@ -806,9 +847,9 @@ class Msg:
return None
try:
lock.acquire(True)
self.cursor.execute(sql_type_1,[username_])
self.cursor.execute(sql_type_1, [username_])
result_type_1 = self.cursor.fetchall()[0][0]
self.cursor.execute(sql_type_49,[username_])
self.cursor.execute(sql_type_49, [username_])
result_type_49 = self.cursor.fetchall()
except sqlite3.DatabaseError:
logger.error(f'{traceback.format_exc()}\n数据库损坏请删除msg文件夹重试')
@ -822,6 +863,7 @@ class Msg:
sum_type_49 += len(content["title"])
sum_type_1 = result_type_1 if result_type_1 else 0
return sum_type_1 + sum_type_49
def close(self):
if self.open_flag:
try:

View File

@ -45,6 +45,7 @@ class ContactInfo(QWidget, Ui_Form):
self.toCSVAct = QAction(Icon.ToCSV, '导出CSV', self)
self.toHtmlAct = QAction(Icon.ToHTML, '导出HTML', self)
self.toTxtAct = QAction(Icon.ToTXT, '导出TXT', self)
self.toAiTxtAct = QAction(Icon.ToTXT, '导出AI对话专用TXT', self)
self.toJsonAct = QAction(Icon.ToTXT, '导出json', self)
self.toolButton_output.setPopupMode(QToolButton.MenuButtonPopup)
self.toolButton_output.clicked.connect(self.toolButton_show)
@ -52,6 +53,7 @@ class ContactInfo(QWidget, Ui_Form):
menu.addAction(self.toCSVAct)
menu.addAction(self.toHtmlAct)
menu.addAction(self.toTxtAct)
menu.addAction(self.toAiTxtAct)
menu.addAction(self.toJsonAct)
self.toolButton_output.setMenu(menu)
self.toolButton_output.setIcon(Icon.Output)
@ -61,6 +63,7 @@ class ContactInfo(QWidget, Ui_Form):
self.toCSVAct.triggered.connect(self.output)
self.toTxtAct.triggered.connect(self.output)
self.toJsonAct.triggered.connect(self.output)
self.toAiTxtAct.triggered.connect(self.output)
def set_contact(self, contact: Contact):
self.view_userinfo.set_contact(contact)
@ -129,6 +132,9 @@ class ContactInfo(QWidget, Ui_Form):
elif self.sender() == self.toTxtAct:
dialog = ExportDialog(self.contact, title='选择导出的消息类型', file_type='txt', parent=self)
result = dialog.exec_() # 使用exec_()获取用户的操作结果
elif self.sender() == self.toAiTxtAct:
dialog = ExportDialog(self.contact, title='选择导出的消息类型', file_type='ai_txt', parent=self)
result = dialog.exec_() # 使用exec_()获取用户的操作结果
elif self.sender() == self.toJsonAct:
dialog = ExportDialog(self.contact, title='选择导出的消息类型', file_type='json', parent=self)
result = dialog.exec_() # 使用exec_()获取用户的操作结果

View File

@ -62,6 +62,9 @@ class ExportDialog(QDialog, Ui_Dialog):
self.export_choices = {"文本": True, "图片": True, "语音": True, "视频": True, "表情包": True,
'音乐与音频': True, '分享卡片': True, '文件': True,
'拍一拍等系统消息': True} # 定义导出的数据类型,默认全部选择
elif file_type == 'ai_txt':
self.export_type = Output.AI_TXT
self.export_choices = {"文本": True} # 定义导出的数据类型,默认全部选择
elif file_type == 'docx':
self.export_type = Output.DOCX
self.export_choices = {"文本": True, "图片": False, "语音": False, "视频": False,

View File

@ -0,0 +1,96 @@
import os
import re
from app.DataBase import msg_db
from app.util.compress_content import parser_reply, share_card
from app.util.exporter.exporter import ExporterBase
def remove_privacy_info(text):
# 正则表达式模式
patterns = {
'phone': r'\b(\+?86[-\s]?)?1[3-9]\d{9}\b', # 手机号
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 邮箱
'id_card': r'\b\d{15}|\d{18}|\d{17}X\b', # 身份证号
'password': r'\b(?:password|pwd|pass|psw)[\s=:]*\S+\b', # 密码
'account': r'\b(?:account|username|user|acct)[\s=:]*\S+\b' # 账号
}
for key, pattern in patterns.items():
text = re.sub(pattern, f'[{key} xxx]', text)
return text
class AiTxtExporter(ExporterBase):
last_is_send = -1
def title(self, message):
str_time = message[8]
is_send = message[4]
display_name = ''
if is_send != self.last_is_send:
display_name = '\n' + self.get_display_name(is_send, message) + ':'
self.last_is_send = is_send
return display_name
def text(self, doc, message):
str_content = remove_privacy_info(message[7])
doc.write(
f'''{self.title(message)}{str_content} '''
)
def image(self, doc, message):
doc.write(
f'''{self.title(message)}[图片]'''
)
def audio(self, doc, message):
doc.write(
f'''{self.title(message)}[语音]'''
)
def emoji(self, doc, message):
doc.write(
f'''{self.title(message)}[表情包]'''
)
def file(self, doc, message):
doc.write(
f'''{self.title(message)}[文件]'''
)
def system_msg(self, doc, message):
str_content = message[7]
str_time = message[8]
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
doc.write(
f'''{str_time} {str_content}'''
)
def video(self, doc, message):
is_send = message[4]
doc.write(
f'''{self.title(message)}[视频]'''
)
def export(self):
# 实现导出为txt的逻辑
print(f"【开始导出 TXT {self.contact.remark}")
origin_path = self.origin_path
os.makedirs(origin_path, exist_ok=True)
filename = os.path.join(origin_path, self.contact.remark + '_chat.txt')
messages = msg_db.get_messages_group_by_day(self.contact.wxid, time_range=self.time_range)
total_steps = len(messages)
with open(filename, mode='w', newline='', encoding='utf-8') as f:
for date, messages in messages.items():
f.write(f"\n\n{'*' * 20}{date}{'*' * 20}\n")
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
self.progressSignal.emit(int((index + 1) / total_steps * 100))
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
print(f"【完成导出 TXT {self.contact.remark}")
self.okSignal.emit(1)

View File

@ -10,6 +10,7 @@ from PyQt5.QtWidgets import QFileDialog
from docx.oxml.ns import qn
from docxcompose.composer import Composer
from app.util.exporter.exporter_ai_txt import AiTxtExporter
from app.util.exporter.exporter_csv import CSVExporter
from app.util.exporter.exporter_docx import DocxExporter
from app.util.exporter.exporter_html import HtmlExporter
@ -44,6 +45,7 @@ class Output(QThread):
CONTACT_CSV = 4
TXT = 5
JSON = 6
AI_TXT = 7
Batch = 10086
def __init__(self, contact, type_=DOCX, message_types={}, sub_type=[], time_range=None, parent=None):
@ -158,6 +160,9 @@ class Output(QThread):
elif type_ == self.TXT:
# print('批量导出txt')
self.to_txt(contact, self.message_types, True)
elif type_ == self.AI_TXT:
# print('批量导出txt')
self.to_ai_txt(contact, self.message_types, True)
elif type_ == self.CSV:
self.to_csv(contact, self.message_types, True)
elif type_ == self.HTML:
@ -232,6 +237,15 @@ class Output(QThread):
Child.okSignal.connect(self.okSignal if not is_batch else self.batch_finish_one)
Child.start()
def to_ai_txt(self, contact, message_types, is_batch=False):
Child = AiTxtExporter(contact, type_=self.TXT, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.okSignal if not is_batch else self.batch_finish_one)
Child.start()
def to_html(self, contact, message_types, is_batch=False):
Child = HtmlExporter(contact, type_=self.output_type, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
@ -284,6 +298,8 @@ class Output(QThread):
self.contact_to_csv()
elif self.output_type == self.TXT:
self.to_txt(self.contact, self.message_types)
elif self.output_type == self.AI_TXT:
self.to_ai_txt(self.contact, self.message_types)
elif self.output_type == self.CSV:
self.to_csv(self.contact, self.message_types)
elif self.output_type == self.HTML: