更新解密文件

This commit is contained in:
zhoushuaikang 2023-12-08 21:42:04 +08:00
parent d7ebe9c1d5
commit 5bee934ed2
2 changed files with 274 additions and 78 deletions

View File

@ -1,12 +1,24 @@
import hashlib
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: getwxinfo.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# 微信数据库采用的加密算法是256位的AES-CBC。数据库的默认的页大小是4096字节即4KB其中每一个页都是被单独加解密的。
# 加密文件的每一个页都有一个随机的初始化向量,它被保存在每一页的末尾。
# 加密文件的每一页都存有着消息认证码算法使用的是HMAC-SHA1安卓数据库使用的是SHA512。它也被保存在每一页的末尾。
# 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值作为HMAC的验证和数据的解密。
# 用来计算HMAC的key与解密的key是不同的解密用的密钥是主密钥和之前提到的16字节的盐值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代64000次计算得到的。而计算HMAC的密钥是刚提到的解密密钥和16字节盐值异或0x3a的值通过PKCS5_PBKF2_HMAC1密钥扩展算法迭代2次计算得到的。
# 为了保证数据部分长度是16字节即AES块大小的整倍数每一页的末尾将填充一段空字节使得保留字段的长度为48字节。
# 综上加密文件结构为第一页4KB数据前16字节为盐值紧接着4032字节数据再加上16字节IV和20字节HMAC以及12字节空字节而后的页均是4048字节长度的加密数据段和48字节的保留段。
# -------------------------------------------------------------------------------
import argparse
import hmac
import hashlib
import os
from typing import Union, List
from Cryptodome.Cipher import AES
from app.log import log, logger
# from Crypto.Cipher import AES # 如果上面的导入失败,可以尝试使用这个
SQLITE_FILE_HEADER = "SQLite format 3\x00" # SQLite文件头
@ -17,15 +29,22 @@ DEFAULT_ITER = 64000
# 通过密钥解密数据库
@log
def decrypt(key: str, db_path, out_path):
if not os.path.exists(db_path):
return f"[-] db_path:'{db_path}' File not found!"
"""
通过密钥解密数据库
:param key: 密钥 64位16进制字符串
:param db_path: 待解密的数据库路径(必须是文件)
:param out_path: 解密后的数据库输出路径(必须是文件)
:return:
"""
if not os.path.exists(db_path) or not os.path.isfile(db_path):
return False, f"[-] db_path:'{db_path}' File not found!"
if not os.path.exists(os.path.dirname(out_path)):
return f"[-] out_path:'{out_path}' File not found!"
return False, f"[-] out_path:'{out_path}' File not found!"
if len(key) != 64:
logger.error(f"[-] key:'{key}' Error!")
return -1
return False, f"[-] key:'{key}' Len Error!"
password = bytes.fromhex(key.strip())
with open(db_path, "rb") as file:
blist = file.read()
@ -33,6 +52,8 @@ def decrypt(key: str, db_path, out_path):
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
if len(salt) != 16:
return False, f"[-] db_path:'{db_path}' File Error!"
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
@ -40,8 +61,7 @@ def decrypt(key: str, db_path, out_path):
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
logger.error(f"[-] Password Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )")
return -1
return False, f"[-] Key Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )"
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
@ -57,19 +77,22 @@ def decrypt(key: str, db_path, out_path):
decrypted = t.decrypt(i[:-48])
deFile.write(decrypted)
deFile.write(i[-48:])
return [True, db_path, out_path, key]
return True, [db_path, out_path, key]
@log
def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str, is_logging: bool = False):
if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64:
return f"[-] (key:'{key}' or out_path:'{out_path}') Error!"
error = f"[-] (key:'{key}' or out_path:'{out_path}') Error!"
if is_logging: print(error)
return False, error
process_list = []
if isinstance(db_path, str):
if not os.path.exists(db_path):
return f"[-] db_path:'{db_path}' not found!"
error = f"[-] db_path:'{db_path}' not found!"
if is_logging: print(error)
return False, error
if os.path.isfile(db_path):
inpath = db_path
@ -87,7 +110,10 @@ def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
os.makedirs(os.path.dirname(outpath))
process_list.append([key, inpath, outpath])
else:
return f"[-] db_path:'{db_path}' Error "
error = f"[-] db_path:'{db_path}' Error "
if is_logging: print(error)
return False, error
elif isinstance(db_path, list):
rt_path = os.path.commonprefix(db_path)
if not os.path.exists(rt_path):
@ -95,7 +121,9 @@ def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
for inpath in db_path:
if not os.path.exists(inpath):
return f"[-] db_path:'{db_path}' not found!"
erreor = f"[-] db_path:'{db_path}' not found!"
if is_logging: print(erreor)
return False, erreor
inpath = os.path.normpath(inpath)
rel = os.path.relpath(os.path.dirname(inpath), rt_path)
@ -104,7 +132,9 @@ def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
os.makedirs(os.path.dirname(outpath))
process_list.append([key, inpath, outpath])
else:
return f"[-] db_path:'{db_path}' Error "
error = f"[-] db_path:'{db_path}' Error "
if is_logging: print(error)
return False, error
result = []
for i in process_list:
@ -115,12 +145,63 @@ def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
for dir in dirs:
if not os.listdir(os.path.join(root, dir)):
os.rmdir(os.path.join(root, dir))
return result
if is_logging:
print("=" * 32)
success_count = 0
fail_count = 0
for code, ret in result:
if code == False:
print(ret)
fail_count += 1
else:
print(f'[+] "{ret[0]}" -> "{ret[1]}"')
success_count += 1
print("-" * 32)
print(f"[+] 共 {len(result)} 个文件, 成功 {success_count} 个, 失败 {fail_count}")
print("=" * 32)
return True, result
if __name__ == '__main__':
# 调用 decrypt 函数,并传入参数
key = "2aafab10af7940328bb92ac9d2a8ab5fc07a685646b14f2e9ae6948a7060c0fc"
db_path = "E:\86390\Documents\WeChat Files\wxid_27hqbq7vx5hf22\FileStorage\CustomEmotion\\71\\71CE49ED3CE9E57E43E07F802983BF45"
out_path = "./test/1.png"
print(decrypt(key, db_path, out_path))
def encrypt(key: str, db_path, out_path):
"""
通过密钥加密数据库
:param key: 密钥 64位16进制字符串
:param db_path: 待加密的数据库路径(必须是文件)
:param out_path: 加密后的数据库输出路径(必须是文件)
:return:
"""
if not os.path.exists(db_path) or not os.path.isfile(db_path):
return False, f"[-] db_path:'{db_path}' File not found!"
if not os.path.exists(os.path.dirname(out_path)):
return False, f"[-] out_path:'{out_path}' File not found!"
if len(key) != 64:
return False, f"[-] key:'{key}' Len Error!"
password = bytes.fromhex(key.strip())
with open(db_path, "rb") as file:
blist = file.read()
salt = os.urandom(16) # 生成随机盐值
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
# 计算消息认证码
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, blist[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
mac_digest = hash_mac.digest()
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
with open(out_path, "wb") as enFile:
enFile.write(salt) # 写入盐值
enFile.write(mac_digest) # 写入消息认证码
for i in newblist:
t = AES.new(byteKey, AES.MODE_CBC, os.urandom(16)) # 生成随机的初始向量
encrypted = t.encrypt(i) # 加密数据块
enFile.write(encrypted)
return True, [db_path, out_path, key]

View File

@ -5,24 +5,58 @@
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
import hmac
import hashlib
import ctypes
import json
import psutil
import os
import re
import winreg
import pymem
from win32com.client import Dispatch
from app.log import log
import psutil
import sys
from typing import List, Union
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
# 获取exe文件的位数
def get_exe_bit(file_path):
"""
获取 PE 文件的位数: 32 位或 64
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回 64
"""
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
print('get exe bit error: Unknown architecture: %s' % hex(machine))
return 64
except IOError:
print('get exe bit error: File not found or cannot be opened')
return 64
# 读取内存中的字符串(非key部分)
@log
def get_info_without_key(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
@ -31,7 +65,6 @@ def get_info_without_key(h_process, address, n_size=64):
return text.strip() if text.strip() != "" else "None"
@log
def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
next_region = 0
found = []
@ -56,39 +89,117 @@ def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
return found
@log
def get_info_wxid(h_process):
find_num = 100
addrs = pattern_scan_all(h_process, br'\\FileStorage', return_multiple=True, find_num=find_num)
addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(33)
if ReadProcessMemory(h_process, void_p(addr - 21), array, 33, 0) == 0: return "None"
array = bytes(array) # .decode('utf-8', errors='ignore')
array = array.split(br'\FileStorage')[0]
for part in [b'}', b'\x7f', b'\\']:
if part in array:
array = array.split(part)[1]
wxids.append(array.decode('utf-8', errors='ignore'))
break
array = ctypes.create_string_buffer(80)
if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None"
array = bytes(array) # .split(b"\\")[0]
array = array.split(b"\\Msg")[0]
array = array.split(b"\\")[-1]
wxids.append(array.decode('utf-8', errors='ignore'))
wxid = max(wxids, key=wxids.count) if wxids else "None"
return wxid
# 读取内存中的key
@log
def get_key(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_string = bytes(key).hex()
return key_string
def get_info_filePath(wxid="all"):
if not wxid:
return "None"
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users", "config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if wxid == "all" and os.path.exists(msg_dir):
return msg_dir
filePath = os.path.join(msg_dir, wxid)
return filePath if os.path.exists(filePath) else "None"
def get_key(db_path, addr_len):
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_bytes = bytes(key)
return key_bytes
def verify_key(key, wx_db_path):
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
if hash_mac.digest() != first[-32:-12]:
return False
return True
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
pm = pymem.Pymem("WeChat.exe")
module_name = "WeChatWin.dll"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True)
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True)
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True)
type_addrs = type1_addrs if len(type1_addrs) >= 2 else type2_addrs if len(type2_addrs) >= 2 else type3_addrs if len(
type3_addrs) >= 2 else "None"
# print(type_addrs)
if type_addrs == "None":
return "None"
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
if key_bytes == "None":
continue
if verify_key(key_bytes, MicroMsg_path):
return key_bytes.hex()
return "None"
# 读取微信信息(account,mobile,name,mail,wxid,key)
@log
def read_info(version_list, is_logging=False):
wechat_process = []
result = []
@ -108,12 +219,6 @@ def read_info(version_list, is_logging=False):
tmp_rd['pid'] = process.pid
tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
bias_list = version_list.get(tmp_rd['version'], None)
if not isinstance(bias_list, list):
error = f"[-] WeChat Current Version {tmp_rd['version']} Is Not Supported"
if is_logging: print(error)
return -2
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'WeChatWin.dll' in module.path:
@ -126,20 +231,32 @@ def read_info(version_list, is_logging=False):
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
name_baseaddr = wechat_base_address + bias_list[0]
account__baseaddr = wechat_base_address + bias_list[1]
mobile_baseaddr = wechat_base_address + bias_list[2]
mail_baseaddr = wechat_base_address + bias_list[3]
key_baseaddr = wechat_base_address + bias_list[4]
bias_list = version_list.get(tmp_rd['version'], None)
if not isinstance(bias_list, list) or len(bias_list) <= 4:
error = f"[-] WeChat Current Version Is Not Supported(maybe not get account,mobile,name,mail)"
if is_logging: print(error)
tmp_rd['account'] = "None"
tmp_rd['mobile'] = "None"
tmp_rd['name'] = "None"
tmp_rd['mail'] = "None"
return -2
else:
name_baseaddr = wechat_base_address + bias_list[0]
account__baseaddr = wechat_base_address + bias_list[1]
mobile_baseaddr = wechat_base_address + bias_list[2]
mail_baseaddr = wechat_base_address + bias_list[3]
# key_baseaddr = wechat_base_address + bias_list[4]
addrLen = 4 if tmp_rd['version'] in ["3.9.2.23", "3.9.2.26"] else 8
tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
tmp_rd['mobile'] = get_info_without_key(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None"
tmp_rd['name'] = get_info_without_key(Handle, name_baseaddr, 64) if bias_list[0] != 0 else "None"
tmp_rd['mail'] = get_info_without_key(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None"
addrLen = get_exe_bit(process.exe()) // 8
tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
tmp_rd['mobile'] = get_info_without_key(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None"
tmp_rd['name'] = get_info_without_key(Handle, name_baseaddr, 64) if bias_list[0] != 0 else "None"
tmp_rd['mail'] = get_info_without_key(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None"
tmp_rd['wxid'] = get_info_wxid(Handle)
tmp_rd['key'] = get_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None"
tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid']) if tmp_rd['wxid'] != "None" else "None"
tmp_rd['key'] = get_key(tmp_rd['filePath'], addrLen) if tmp_rd['filePath'] != "None" else "None"
result.append(tmp_rd)
if is_logging:
@ -154,8 +271,6 @@ def read_info(version_list, is_logging=False):
print("=" * 32)
return result
import os
import sys
@ -166,7 +281,7 @@ def resource_path(relative_path):
return os.path.join(base_path, relative_path)
@log
def get_info(VERSION_LIST):
result = read_info(VERSION_LIST, True) # 读取微信信息
return result