import hashlib import os import re import threading import time import docx import pandas as pd import requests from docx import shared from docx.enum.table import WD_ALIGN_VERTICAL from docx.enum.text import WD_COLOR_INDEX, WD_PARAGRAPH_ALIGNMENT from docxcompose.composer import Composer import rcontact import sys from PyQt5.QtGui import QIcon from PyQt5.QtWidgets import * from PyQt5.QtCore import * from PyQt5.QtGui import QPixmap path = 'D:\Project\PythonProject\WeChat' # conRemark = '曹雨萱' # self_wxid = rcontact.get_self_wxid() # ta_wxid = rcontact.get_one_wxid(conRemark) ''' #! 创建emoji目录,存放emoji文件 ''' def mkdir(path): path = path.strip() path = path.rstrip("\\") if os.path.exists(path): return False os.makedirs(path) return True mkdir(path+'/emoji') # mkdir('..//db_tables') ''' #! 将wxid使用MD5编码加密 #! 加密结果是用户头像路径 ''' def avatar_md5(wxid): m = hashlib.md5() # 参数必须是byte类型,否则报Unicode-objects must be encoded before hashing错误 m.update(bytes(wxid.encode('utf-8'))) return m.hexdigest() ''' #! 获取头像文件完整路径 ''' def get_avator(wxid): avatar = avatar_md5(wxid) avatar_path = path + "/avatar/" Path = avatar_path + avatar[:2] + '/' + avatar[2:4] for root, dirs, files in os.walk(path): for file in files: if avatar in file: avatar = file break return Path + '/' + avatar def read_csv(conRemark): ''' :param conRemark: (str) 要导出的联系人备注名 :return: pandas数据 ''' user_data = pd.read_csv(f'{path}/db_tables/{conRemark}.csv') '''将浮点数转化成字符串类型,否则会舍入影响时间结果''' user_data['createTime'] = user_data['createTime'].astype(str) # print(user_data) return user_data def download_emoji(content, img_path): ''' #! 下载emoji文件 #! #! ''' # if 1: try: # print(img_path) url = content.split('cdnurl = "')[1].split('"')[0] print(url) url = ':'.join(url.split('*#*')) if 'amp;' in url: url = ''.join(url.split('amp;')) print('emoji downloading!!!') resp = requests.get(url) with open(f'{path}/emoji/{img_path}', 'wb') as f: f.write(resp.content) except Exception: print("emoji download error") def time_format(timestamp): ''' #! 将字符串类型的时间戳转换成日期 #! 返回格式化的时间字符串 #! %Y-%m-%d %H:%M:%S ''' # print(timestamp) # timestamp = timestamp[:-5] timestamp = float(timestamp[:-3] + '.' + timestamp[-3:]) # print(timestamp) time_tuple = time.localtime(timestamp) # print(time.strftime("%Y-%m-%d %H:%M:%S", time_tuple)) # quit() return time.strftime("%Y-%m-%d %H:%M:%S", time_tuple) def IS_5_min(last_m, now_m): ''' #! 判断两次聊天时间是不是大于五分钟 #! 若大于五分钟则显示时间 #! 否则不显示 ''' last_m = last_m[:-5] last_m = float(last_m + '.' + last_m[-5:2]) now_m = now_m[:-5] now_m = float(now_m + '.' + now_m[-5:2]) '''两次聊天记录时间差,单位是秒''' time_sub = now_m - last_m return time_sub >= 300 def judge_type(Type): pass '''合并word文档到一个文件里''' def merge_docx(conRemark, n): origin_docx_path = f"{path}/{conRemark}" all_word = os.listdir(origin_docx_path) all_file_path = [] for i in range(n): file_name = f"{conRemark}{i}.docx" all_file_path.append(origin_docx_path + '/' + file_name) filename = f"{conRemark}.docx" # print(all_file_path) doc = docx.Document() doc.save(origin_docx_path + '/' + filename) master = docx.Document(origin_docx_path + '/' + filename) middle_new_docx = Composer(master) num = 0 for word in all_file_path: word_document = docx.Document(word) word_document.add_page_break() if num != 0: middle_new_docx.append(word_document) num = num + 1 middle_new_docx.save(origin_docx_path + '/' + filename) class MyThread(QThread): signal = pyqtSignal(str) self_text = pyqtSignal(str) ta_text = pyqtSignal(str) bar = pyqtSignal(int) def __init__(self): super(MyThread, self).__init__() self.ta_info = {} self.self_info = {} self.textBrowser = None self.num = 0 self.total_num = 1 def get_avator(self): self.wxid = self.self_info['wxid'] self.ta_wxid = self.ta_info['wxid'] self.avator = get_avator(self.wxid) print(self.avator) self.ta_avator = get_avator(self.ta_wxid) print(self.ta_avator) # quit() self.img_self = open(self.avator, 'rb') self.img_ta = open(self.ta_avator, 'rb') def read_csv(self, conRemark): ''' :param conRemark: (str) 要导出的联系人备注名 :return: pandas数据 ''' user_data = pd.read_csv(f'{path}/db_tables/{conRemark}.csv') '''将浮点数转化成字符串类型,否则会舍入影响时间结果''' user_data['createTime'] = user_data['createTime'].astype(str) # print(user_data) self.total_num = len(user_data) return user_data def create_table(self,doc, isSend): ''' #! 创建一个1*2表格 #! isSend = 1 (0,0)存聊天内容,(0,1)存头像 #! isSend = 0 (0,0)存头像,(0,1)存聊天内容 #! 返回聊天内容的坐标 ''' table = doc.add_table(rows=1, cols=2, style='Normal Table') table.cell(0, 1).height = shared.Inches(0.5) table.cell(0, 0).height = shared.Inches(0.5) text_size = 1 if isSend: '''表格右对齐''' table.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT avatar = table.cell(0, 1).paragraphs[0].add_run() '''插入头像,设置头像宽度''' avatar.add_picture(self.img_self, width=shared.Inches(0.5)) '''设置单元格宽度跟头像一致''' table.cell(0, 1).width = shared.Inches(0.5) content_cell = table.cell(0, 0) '''聊天内容右对齐''' content_cell.paragraphs[0].paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT else: avatar = table.cell(0, 0).paragraphs[0].add_run() avatar.add_picture(self.img_ta, width=shared.Inches(0.5)) '''设置单元格宽度''' table.cell(0, 0).width = shared.Inches(0.5) content_cell = table.cell(0, 1) '''聊天内容垂直居中对齐''' content_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER return content_cell def text(self, doc, isSend, message, status): if status == 5: message += '(未发出) ' content_cell = self.create_table(doc, isSend) content_cell.paragraphs[0].add_run(message) content_cell.paragraphs[0].font_size = shared.Inches(0.5) self.self_text.emit(message) if isSend: p = content_cell.paragraphs[0] p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT doc.add_paragraph() def image(self, doc, isSend, Type, content, imgPath): ''' #! 插入聊天图片 #! isSend = 1 只有缩略图 #! isSend = 0 有原图 :param doc: :param isSend: :param Type: :param content: :param imgPath: :return: ''' content = self.create_table(doc, isSend) run = content.paragraphs[0].add_run() imgPath = imgPath.split('//th_')[-1] Path = None if Type == 3: Path = f'{path}/image2//{imgPath[:2]}//{imgPath[2:4]}' elif Type == 47: Path = '{path}/emoji' for root, dirs, files in os.walk(Path): for file in files: if isSend: if imgPath + 'hd' in file: imgPath = file try: run.add_picture(f'{Path}/{imgPath}', height=shared.Inches(2)) doc.add_paragraph() except Exception: print("Error!image") return elif imgPath in file: imgPath = file break try: run.add_picture(f'{Path}/{imgPath}', height=shared.Inches(2)) doc.add_paragraph() except Exception: print("Error!image") # run.add_picture(f'{Path}/{imgPath}', height=shared.Inches(2)) def emoji(self, doc, isSend, content, imgPath): ''' #! 添加表情包 :param isSend: :param content: :param imgPath: :return: ''' if 1: # try: Path = f'{path}/emoji/{imgPath}' is_Exist = os.path.exists(Path) if not is_Exist: '''表情包不存在,则下载表情包到emoji文件夹中''' download_emoji(content, imgPath) self.image(doc, isSend, Type=47, content=content, imgPath=imgPath) # except Exception: # print("can't find emoji!") def wx_file(self, doc, isSend, content, status): ''' #! 添加微信文件 :param isSend: :param content: :param status: :return: ''' pattern = re.compile(r"