Wanruihui微信电脑版收款监控脚本:自动化识别与推送付款信息
一、脚本概述
该脚本基于Python编写,利用requests库发送HTTP请求,结合json、time、hashlib等库处理数据和时间戳,以及uiautomation库自动化操作微信电脑版界面。脚本通过识别微信窗口中的收款到账通知,自动提取收款金额和收款时间信息,并通过API推送至服务器,实现收款信息的实时监控与推送。
二、主要功能
- 实时监控:脚本能够持续监控微信电脑版界面,一旦检测到新的收款到账通知,立即提取相关信息。
- 信息提取:通过正则表达式匹配,准确提取收款金额和收款时间信息。
- 签名验证:在发送请求前,通过MD5算法生成签名,确保数据的安全性。
- 心跳发送:通过定时发送心跳包,保持与服务器的连接,确保脚本的稳定运行。
- 日志记录:详细记录脚本的运行日志,方便用户查看和排查问题。
三、使用说明
- 配置API密钥:在使用脚本前,需要配置正确的API密钥,以便能够成功推送数据至服务器。
- 启动脚本:运行main函数,脚本将自动进入监控状态,开始识别并推送收款信息。
- 注意事项:请确保微信电脑版窗口处于显示状态,以便脚本能够正确识别收款到账通知。
一、脚本概述
该脚本基于Python编写,利用requests库发送HTTP请求,结合json、time、hashlib等库处理数据和时间戳,以及uiautomation库自动化操作微信电脑版界面。脚本通过识别微信窗口中的收款到账通知,自动提取收款金额和收款时间信息,并通过API推送至服务器,实现收款信息的实时监控与推送。
二、主要功能
- 实时监控:脚本能够持续监控微信电脑版界面,一旦检测到新的收款到账通知,立即提取相关信息。
- 信息提取:通过正则表达式匹配,准确提取收款金额和收款时间信息。
- 签名验证:在发送请求前,通过MD5算法生成签名,确保数据的安全性。
- 心跳发送:通过定时发送心跳包,保持与服务器的连接,确保脚本的稳定运行。
- 日志记录:详细记录脚本的运行日志,方便用户查看和排查问题。
三、使用说明
- 配置API密钥:在使用脚本前,需要配置正确的API密钥,以便能够成功推送数据至服务器。
- 启动脚本:运行main函数,脚本将自动进入监控状态,开始识别并推送收款信息。
- 注意事项:请确保微信电脑版窗口处于显示状态,以便脚本能够正确识别收款到账通知。
四、应用场景
该脚本适用于需要实时监控微信收款信息的各种场景,如电商平台、线下店铺、个人收款等。通过自动化处理,可以大大提高收款监控的效率和准确性,减少人工操作的繁琐和错误。
5,源码如下
import requests
import json
import time
import hashlib
import re
import uiautomation as automation
import os
import logging
import threading
# 定义保存和读取全局变量的文件路径
INFO_FILE = 'last_payment_info.json'
# 配置日志(如果还没有配置的话)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 初始化全局变量
last_matched_info = None
last_push_time = None
last_pushed_amount = None
def save_last_info(timestamp, amount):
try:
logging.info(f"保存最后付款信息:: {timestamp}, {amount}")
with open(INFO_FILE, 'w', encoding='utf-8') as f:
json.dump({'last_push_time': timestamp, 'last_pushed_amount': amount}, f)
logging.info(f"保存最后付款信息到:{INFO_FILE}: {timestamp}, {amount}")
except Exception as e:
logging.info(f"保存最后付款信息失败: {str(e)}")
def load_last_info():
try:
with open(INFO_FILE, 'r', encoding='utf-8') as f:
return json.load(f), False # 返回数据和“不是第一次运行”的标志
except FileNotFoundError:
return {'last_push_time': None, 'last_pushed_amount': None}, True # 如果是第一次运行,返回初始数据和“是第一次运行”的标志
# PHP应用程序的URL前缀
BASE_URL = '你的网址'
# 识别收款金额+收款时间信息
def explore_control(control, depth, target_depth):
global last_matched_info, amount
try:
name = control.Name
if name:
if depth == target_depth:
# 匹配收款时间信息
match = re.search(r'收款到账通知[\s\S]*?(\d{2}月\d{2}日 \d{2}:\d{2})', name)
if match:
time_str = match.group(1)
current_year = time.strftime('%Y')
full_time_str = f"{current_year}年{time_str}"
timestamp = int(time.mktime(time.strptime(full_time_str, '%Y年%m月%d日 %H:%M')))
# 匹配收款金额信息
match_amount = re.search(r'收款金额¥([\d.]+)', name)
if match_amount:
amount = match_amount.group(1)
last_matched_info = (timestamp, amount)
return
# 递归处理子控件
for child in control.GetChildren():
explore_control(child, depth + 4, target_depth)
except Exception as e:
logging.info(f"发生错误: {str(e)}")
def process_wechat_window(wechat_window, prev_info, order_type, api_key):
global last_matched_info, last_push_time, last_pushed_amount
if wechat_window.Exists(0):
explore_control(wechat_window, 0, 60)
if last_matched_info and last_matched_info != prev_info:
timestamp, amount = last_matched_info
if last_push_time != timestamp or last_pushed_amount != amount:
logging.info(f"检测到新付款: {last_matched_info}")
logging.info("-----------------------------------------------------------------")
logging.info("正在推送数据...")
logging.info("-----------------------------------------------------------------")
# 向服务器发送付款成功请求
push_payment_data(timestamp, order_type, float(amount), api_key)
# 如果检测到新的付款信息,则更新并保存
save_last_info(timestamp, amount)
# 更新上一次推送的信息
last_push_time = timestamp
last_pushed_amount = amount
prev_info = last_matched_info
else:
logging.info("无法获取到窗口,请保持微信支付窗口显示...")
return prev_info
def md5_sign(timestamp, api_key):
sign_str = f"{timestamp}{api_key}"
hashed = hashlib.md5(sign_str.encode('utf-8')).hexdigest()
return hashed
# 调用getState接口获取监控端状态
def get_state(api_key, timestamp):
url = f"{BASE_URL}getState"
params = {
't': timestamp,
'sign': md5_sign(timestamp, api_key)
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data['code'] == 1:
logging.info("获取监控端状态成功: %s", data['data'])
else:
logging.info("获取监控端状态失败: %s", data['msg'])
else:
logging.info("请求失败:", response.status_code)
# 心跳发送线程类
class HeartbeatThread(threading.Thread):
def __init__(self, api_key, interval=350):
super().__init__()
self.api_key = api_key
self.interval = interval
self.stop_event = threading.Event()
def run(self):
while not self.stop_event.is_set():
send_heartbeat(self.api_key)
time.sleep(self.interval)
def stop(self):
self.stop_event.set()
# 调用appHeart接口发送心跳
def send_heartbeat(api_key):
timestamp = int(time.time())
sign = md5_sign(timestamp, api_key)
url = f"{BASE_URL}appHeart"
params = {
't': timestamp,
'sign': sign
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data['code'] == 1:
logging.info("心跳发送成功")
else:
logging.info("心跳发送失败: %s", data['msg'])
else:
logging.info("请求失败:", response.status_code)
def md5_sign_push(timestamp, order_type, price, api_key):
# 生成签名
data = f"{order_type}{price}{timestamp}{api_key}"
return hashlib.md5(data.encode()).hexdigest()
def push_payment_data(timestamp, order_type, price, api_key):
# 构建请求参数
sign = md5_sign_push(timestamp, order_type, price, api_key)
params = {
't': timestamp,
'type': order_type,
'price': price,
'sign': sign
}
# 发送POST请求
response = requests.post(f"{BASE_URL}appPush", data=params)
# 检查响应状态码
if response.status_code == 200:
data = response.json()
if data['code'] == 1:
logging.info("推送付款数据成功")
else:
logging.info("推送付款数据失败: %s", data['msg'])
else:
logging.info("请求失败:", response.status_code)
def main():
global last_push_time, last_pushed_amount
# 尝试从文件中加载上次的信息
# 从load_last_info接收两个值:last_info_dict(字典)和is_first_run(布尔值)
last_info_dict, is_first_run = load_last_info()
# 现在可以从字典中获取值,而不是从元组
last_push_time = last_info_dict.get('last_push_time')
last_pushed_amount = last_info_dict.get('last_pushed_amount')
# 检查文件是否存在,如果不存在则创建一个初始文件
if not os.path.exists(INFO_FILE):
initial_time = int(time.time()) # 设置初始时间戳为当前时间
initial_amount = '0.00' # 设置初始金额为0.00,可以根据需要调整
save_last_info(initial_time, initial_amount)
logging.info("初始文件已创建")
order_type = '1' # 替换为实际的订单类型
api_key = '你的aip密匙
' # 替换为实际的API密钥
get_state(api_key, int(time.time()))
# 创建并启动心跳发送线程
heartbeat_thread = HeartbeatThread(api_key)
heartbeat_thread.start()
# 主循环
while True:
try:
# 获取微信窗口
wechat_window = automation.WindowControl(searchDepth=1, ClassName='ChatWnd')
# 处理微信窗口并获取新的付款信息
prev_info = process_wechat_window(wechat_window, (last_push_time, last_pushed_amount), order_type, api_key)
# 如果检测到新的付款信息,并且不是第一次运行,则推送并保存
if last_matched_info and (last_push_time != last_matched_info[0] or last_pushed_amount != last_matched_info[1]) and not is_first_run:
timestamp, amount = last_matched_info
push_payment_data(timestamp, order_type, float(amount), api_key)
save_last_info(timestamp, amount)
logging.info(f"新的付款信息已推送并保存: 时间戳={timestamp}, 收款金额={amount}")
except Exception as e:
logging.info(f"发生错误: {str(e)}")
time.sleep(2) # 暂停2秒后继续监听
# 主程序入口
if __name__ == "__main__":
logging.info("-----------------------------------------------------------------")
logging.info("欢迎使用wanruihui微信电脑版收款监控脚本...")
logging.info("-----------------------------------------------------------------")
main()
本文最后更新于2024年05月04日,若涉及的内容可能已经失效,直接留言反馈补链即可,我们会处理,谢谢。
© 版权声明
THE END