WeChatMsg/app/analysis/analysis.py

290 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from collections import Counter
from PyQt5.QtCore import QFile, QTextStream, QIODevice
import sys
sys.path.append('.')
from app.DataBase import msg_db, MsgType
from pyecharts import options as opts
from pyecharts.charts import WordCloud, Calendar, Bar, Line
from app.resources import resource_rc
import tkinter as tk
import os
from tkinter import messagebox
import shutil
var = resource_rc.qt_resource_name
charts_width = 800
charts_height = 450
wordcloud_width = 780
wordcloud_height = 720
class StopwordsWindow(tk.Tk):
def __init__(self):
super().__init__()
self.title("屏蔽词管理")
# Set the size of the window
self.geometry("250x170")
# File paths
original_stopwords_file = os.path.abspath('./app/data/stopwords.txt')
default_stopwords_file = os.path.abspath('./app/data/default_stopwords.txt')
# Read existing stopwords from the original file
with open(original_stopwords_file, "r", encoding="utf-8") as original_stopword_file:
self.original_stopwords = set(original_stopword_file.read().splitlines())
# Create a replicate of the original stopwords file as the default file
if not os.path.exists(default_stopwords_file):
shutil.copy(original_stopwords_file, default_stopwords_file)
# UI elements
self.label = tk.Label(self, text="输入你想添加的屏蔽词:")
self.label.pack(pady=10)
self.user_input = tk.StringVar()
entry = tk.Entry(self, textvariable=self.user_input)
entry.pack(pady=10)
self.add_button = tk.Button(self, text="添加屏蔽词", command=self.add_stopwords)
self.add_button.pack(pady=5)
self.undo_button = tk.Button(self, text="撤销", command=self.undo_stopwords)
self.undo_button.pack(pady=5)
def add_stopwords(self):
original_stopwords_file = os.path.abspath('./app/data/stopwords.txt')
default_stopwords_file = os.path.abspath('./app/data/default_stopwords.txt')
user_input = self.user_input.get()
word_list = user_input.split()
# Check if each word already exists in the original file before appending
duplicates = [word for word in word_list if word in self.original_stopwords]
if duplicates:
message = f"Words {', '.join(duplicates)} already exist in the original file."
messagebox.showinfo("Duplicate Words", message)
else:
# Update the original stopwords set with new words
self.original_stopwords.update(word_list)
# Write the updated original stopwords set back to the file
with open(original_stopwords_file, 'w', encoding="utf-8") as original_stopword_file:
original_stopword_file.write("\n".join(self.original_stopwords))
messagebox.showinfo("Success", "屏蔽词已添加")
def undo_stopwords(self):
try:
# Revert the stopwords file to its original state
original_stopwords_file = os.path.abspath('./app/data/stopwords.txt')
default_stopwords_file = os.path.abspath('./app/data/default_stopwords.txt')
shutil.copy(default_stopwords_file, original_stopwords_file)
messagebox.showinfo("Undo", "屏蔽词已还原至原始状态")
except Exception as e:
messagebox.showerror("Error", f"An error occurred: {str(e)}")
def wordcloud(wxid, is_Annual_report=False, year='2023'):
import jieba
txt_messages = msg_db.get_messages_by_type(wxid, MsgType.TEXT, is_Annual_report, year)
if not txt_messages:
return {
'chart_data': None,
'keyword': "没有聊天你想分析啥",
'max_num': "0",
'dialogs': []
}
text = ''.join(map(lambda x: x[7], txt_messages))
total_msg_len = len(text)
# 使用jieba进行分词并加入停用词
words = jieba.cut(text)
# 统计词频
word_count = Counter(words)
# 过滤停用词
stopwords_file = './app/data/stopwords.txt'
stopwords_window = StopwordsWindow()
stopwords_window.mainloop()
try:
with open(stopwords_file, "r", encoding="utf-8") as stopword_file:
stopwords = set(stopword_file.read().splitlines())
except:
file = QFile(':/data/stopwords.txt')
if file.open(QIODevice.ReadOnly | QIODevice.Text):
stream = QTextStream(file)
stream.setCodec('utf-8')
content = stream.readAll()
file.close()
stopwords = set(content.splitlines())
filtered_word_count = {word: count for word, count in word_count.items() if len(word) > 1 and word not in stopwords}
# 转换为词云数据格式
data = [(word, count) for word, count in filtered_word_count.items()]
# text_data = data
data.sort(key=lambda x: x[1], reverse=True)
text_data = data[:100] if len(data) > 100 else data
# 创建词云图
keyword, max_num = text_data[0]
w = (
WordCloud(init_opts=opts.InitOpts(width=f"{wordcloud_width}px", height=f"{wordcloud_height}px"))
.add(series_name="聊天文字", data_pair=text_data, word_size_range=[20, 100])
.set_global_opts(
title_opts=opts.TitleOpts(
title=f"词云图", subtitle=f"总计{total_msg_len}",
title_textstyle_opts=opts.TextStyleOpts(font_size=23)
),
tooltip_opts=opts.TooltipOpts(is_show=True),
legend_opts=opts.LegendOpts(is_show=False)
)
)
# return w.render_embed()
return {
'chart_data': w.dump_options_with_quotes(),
'keyword': keyword,
'max_num': str(max_num),
'dialogs': msg_db.get_messages_by_keyword(wxid, keyword, num=5, max_len=12)
}
def calendar_chart(wxid, is_Annual_report=False, year='2023'):
calendar_data = msg_db.get_messages_by_days(wxid, is_Annual_report, year)
if not calendar_data:
return False
min_ = min(map(lambda x: x[1], calendar_data))
max_ = max(map(lambda x: x[1], calendar_data))
start_date_ = calendar_data[0][0]
end_date_ = calendar_data[-1][0]
print(start_date_, '---->', end_date_)
if is_Annual_report:
calendar_days = year
calendar_title = f'{year}年聊天情况'
else:
calendar_days = (start_date_, end_date_)
calendar_title = '和Ta的聊天情况'
c = (
Calendar(init_opts=opts.InitOpts(width=f"{charts_width}px", height=f"{charts_height}px"))
.add(
"",
calendar_data,
calendar_opts=opts.CalendarOpts(range_=calendar_days)
)
.set_global_opts(
title_opts=opts.TitleOpts(title=calendar_title),
visualmap_opts=opts.VisualMapOpts(
max_=max_,
min_=min_,
orient="horizontal",
# is_piecewise=True,
# pos_top="200px",
pos_bottom="0px",
pos_left="0px",
),
legend_opts=opts.LegendOpts(is_show=False)
)
)
return {
'chart_data': c
}
def month_count(wxid, is_Annual_report=False, year='2023'):
"""
每月聊天条数
"""
msg_data = msg_db.get_messages_by_month(wxid, is_Annual_report, year)
y_data = list(map(lambda x: x[1], msg_data))
x_axis = list(map(lambda x: x[0], msg_data))
m = (
Bar(init_opts=opts.InitOpts(width=f"{charts_width}px", height=f"{charts_height}px"))
.add_xaxis(x_axis)
.add_yaxis("消息数量", y_data,
label_opts=opts.LabelOpts(is_show=False),
itemstyle_opts=opts.ItemStyleOpts(color="skyblue"),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="逐月统计", subtitle=None),
datazoom_opts=opts.DataZoomOpts(),
toolbox_opts=opts.ToolboxOpts(),
visualmap_opts=opts.VisualMapOpts(
min_=min(y_data),
max_=max(y_data),
dimension=1, # 根据第2个维度y 轴)进行映射
is_piecewise=False, # 是否分段显示
range_color=["#66ccff", "#003366"], # 设置颜色范围
type_="color",
pos_right="0%",
),
)
)
return {
'chart_data': m
}
def hour_count(wxid, is_Annual_report=False, year='2023'):
"""
小时计数聊天条数
"""
msg_data = msg_db.get_messages_by_hour(wxid, is_Annual_report, year)
print(msg_data)
y_data = list(map(lambda x: x[1], msg_data))
x_axis = list(map(lambda x: x[0], msg_data))
h = (
Line(init_opts=opts.InitOpts(width=f"{charts_width}px", height=f"{charts_height}px"))
.add_xaxis(xaxis_data=x_axis)
.add_yaxis(
series_name="聊天频率",
y_axis=y_data,
markpoint_opts=opts.MarkPointOpts(
data=[
opts.MarkPointItem(type_="max", name="最大值"),
opts.MarkPointItem(type_="min", name="最小值", value=int(10)),
]
),
markline_opts=opts.MarkLineOpts(
data=[opts.MarkLineItem(type_="average", name="平均值")]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="聊天时段", subtitle=None),
# datazoom_opts=opts.DataZoomOpts(),
# toolbox_opts=opts.ToolboxOpts(),
)
.set_series_opts(
label_opts=opts.LabelOpts(
is_show=False
)
)
)
return {
'chart_data': h
}
class Analysis:
pass
if __name__ == '__main__':
msg_db.init_database(path='../DataBase/Msg/MSG.db')
# w = wordcloud('wxid_0o18ef858vnu22')
w_data = wordcloud('wxid_27hqbq7vx5hf22', True, '2023')
# print(w_data)
# w['chart_data'].render("./data/聊天统计/wordcloud.html")
c = calendar_chart('wxid_27hqbq7vx5hf22', False, '2023')
c['chart_data'].render("./data/聊天统计/calendar.html")
# print('c:::', c)
m = month_count('wxid_27hqbq7vx5hf22', False, '2023')
m['chart_data'].render("./data/聊天统计/month_num.html")
h = hour_count('wxid_27hqbq7vx5hf22')
h['chart_data'].render("./data/聊天统计/hour_count.html")