增加了对BytesExtra的解析,基本达到图片视频微信能看本地就能看的程度。其他类型消息的BytesExtra也顺带解析了(一个道理,以后可以从这里扩展)

This commit is contained in:
STDquantum 2023-12-10 11:19:22 +08:00
parent bf8fe3c60c
commit 2cb3f4e5b8
4 changed files with 153 additions and 6 deletions

3
.gitignore vendored
View File

@ -16,4 +16,5 @@ app/DataBase/Msg/*
*.pyc
*.log
*.spec
test*
test*
wordcloud.html

View File

@ -30,6 +30,139 @@ def get_md5_from_xml(content, type_='img'):
return None
class tencent_struct:
def __setVals__(self, data, off):
if data:
self.__data = data
if self.__data:
self.__size = len(self.__data)
self.__off = off
def __readString(self):
try:
length = self.__readUleb()
res = self.__data[self.__off : self.__off + length]
self.__add(length)
except:
raise
return res.decode("utf-8")
def __readUleb(self):
try:
i = self.__data[self.__off]
self.__add()
if i & 0x80:
j = self.__data[self.__off]
i = i & 0x7F
i = i | (j << 7)
self.__add()
if i & 0x4000:
j = self.__data[self.__off]
i = i & 0x3FFF
i = i | (j << 14)
self.__add()
if i & 0x200000:
j = self.__data[self.__off]
i = i & 0x1FFFFF
i = i | (j << 21)
self.__add()
if i & 0x10000000:
j = self.__data[self.__off]
i = i & 0xFFFFFFF
i = i | (j << 28)
self.__add()
return i
except:
raise
def __readData(self):
try:
length = self.__readUleb()
data = self.__data[self.__off : self.__off + length]
self.__add(length)
return data
except:
raise
def __init__(self, data=None, off=0):
self.__data = data
self.__off = off
if self.__data:
self.__size = len(self.__data)
else:
self.__size = 0
def __add(self, value=1):
self.__off += value
if self.__off > self.__size:
raise "偏移量超出size"
def readStruct(self, struct_type):
current_dict = None
if isinstance(struct_type, str):
current_dict = getattr(self, struct_type)
else:
current_dict = struct_type
res = {}
try:
while self.__off < self.__size:
key = self.__readUleb()
key = key >> 3
if key == 0:
break
op = None
fieldName = ""
if key in current_dict:
op = current_dict[key][1]
fieldName = current_dict[key][0]
else:
break
if isinstance(op, dict):
if not key in res:
res[key] = []
current_struct = self.__readData()
recursion = tencent_struct(current_struct)
res[key].append((fieldName, recursion.readStruct(op)))
elif op != "":
res[key] = (fieldName, self.__contenttype__[op](self))
else:
break
except:
raise
return res
__struct1__ = {1: ("", "I"), 2: ("", "I")}
__msgInfo__ = {1: ("", "I"), 2: ("msg_info", "s")}
__bytesExtra__ = {
1: ("", __struct1__),
3: ("msg_info_struct", __msgInfo__),
}
def get_bytesExta_Content(self, data=None, off=0):
self.__setVals__(data, off)
try:
return self.readStruct("__bytesExtra__")
except:
raise
__contenttype__ = {
"s": __readString,
"I": __readUleb,
"P": __readData,
}
def parseBytes(content: bytes):
try:
bytesExtra = tencent_struct().get_bytesExta_Content(content)
return bytesExtra
except:
pass
def singleton(cls):
_instance = {}
@ -115,7 +248,13 @@ class HardLink:
finally:
video_db_lock.release()
def get_image(self, content, thumb=False):
def get_image(self, content, bytesExtra, thumb=False):
bytesDict = parseBytes(bytesExtra)
for msginfo in bytesDict[3]:
if msginfo[1][1][1] == (3 if thumb else 4):
pathh = msginfo[1][2][1] # wxid\FileStorage\...
pathh = "\\".join(pathh.split('\\')[1:])
return pathh
md5 = get_md5_from_xml(content)
if not md5:
return None
@ -129,7 +268,13 @@ class HardLink:
dat_image = os.path.join(root_path, dir1, dir0, dir2, data_image)
return dat_image
def get_video(self, content, thumb=False):
def get_video(self, content, bytesExtra, thumb=False):
bytesDict = parseBytes(bytesExtra)
for msginfo in bytesDict[3]:
if msginfo[1][1][1] == (3 if thumb else 4):
pathh = msginfo[1][2][1] # wxid\FileStorage\...
pathh = "\\".join(pathh.split('\\')[1:])
return pathh
md5 = get_md5_from_xml(content, type_='video')
if not md5:
return None

View File

@ -56,7 +56,7 @@ class Msg:
if not self.open_flag:
return None
sql = '''
select localId,TalkerId,Type,SubType,IsSender,CreateTime,Status,StrContent,strftime('%Y-%m-%d %H:%M:%S',CreateTime,'unixepoch','localtime') as StrTime,MsgSvrID
select localId,TalkerId,Type,SubType,IsSender,CreateTime,Status,StrContent,strftime('%Y-%m-%d %H:%M:%S',CreateTime,'unixepoch','localtime') as StrTime,MsgSvrID,BytesExtra
from MSG
where StrTalker=?
order by CreateTime

View File

@ -637,6 +637,7 @@ const chatMessages = [
str_time = message[8]
# print(type_, type(type_))
is_send = message[4]
BytesExtra = message[10]
# avatar = MePC().avatar_path if is_send else self.contact.avatar_path
# avatar = avatar.replace('\\', '\\\\')
avatar = 'myhead.png' if is_send else 'tahead.png'
@ -657,7 +658,7 @@ const chatMessages = [
f'''{{ type:{type_}, text: '{str_content}',is_send:{is_send},avatar_path:'{avatar}'}},'''
)
elif type_ == 3:
image_path = hard_link_db.get_image(content=str_content, thumb=False)
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
image_path = image_path.replace('\\', '/')
# print(f"tohtml:---{image_path}")
@ -669,7 +670,7 @@ const chatMessages = [
f'''{{ type:{type_}, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}'}},'''
)
elif type_ == 43:
video_path = hard_link_db.get_video(content=str_content, thumb=False)
video_path = hard_link_db.get_video(str_content, BytesExtra, thumb=False)
video_path = f'{MePC().wx_dir}/{video_path}'
if os.path.exists(video_path):
new_path = origin_docx_path + '/video/' + os.path.basename(video_path)