V免签PC监控端1.0源码

Wanruihui微信电脑版收款监控脚本:自动化识别与推送付款信息

一、脚本概述

该脚本基于Python编写,利用requests库发送HTTP请求,结合json、time、hashlib等库处理数据和时间戳,以及uiautomation库自动化操作微信电脑版界面。脚本通过识别微信窗口中的收款到账通知,自动提取收款金额和收款时间信息,并通过API推送至服务器,实现收款信息的实时监控与推送。

图片[1]-V免签PC监控端1.0源码-玩锐汇爱分享 图片[2]-V免签PC监控端1.0源码-玩锐汇爱分享 图片[3]-V免签PC监控端1.0源码-玩锐汇爱分享

二、主要功能

  1. 实时监控:脚本能够持续监控微信电脑版界面,一旦检测到新的收款到账通知,立即提取相关信息。
  2. 信息提取:通过正则表达式匹配,准确提取收款金额和收款时间信息。
  3. 签名验证:在发送请求前,通过MD5算法生成签名,确保数据的安全性。
  4. 心跳发送:通过定时发送心跳包,保持与服务器的连接,确保脚本的稳定运行。
  5. 日志记录:详细记录脚本的运行日志,方便用户查看和排查问题。

三、使用说明

  1. 配置API密钥:在使用脚本前,需要配置正确的API密钥,以便能够成功推送数据至服务器。
  2. 启动脚本:运行main函数,脚本将自动进入监控状态,开始识别并推送收款信息。
  3. 注意事项:请确保微信电脑版窗口处于显示状态,以便脚本能够正确识别收款到账通知。

一、脚本概述

该脚本基于Python编写,利用requests库发送HTTP请求,结合json、time、hashlib等库处理数据和时间戳,以及uiautomation库自动化操作微信电脑版界面。脚本通过识别微信窗口中的收款到账通知,自动提取收款金额和收款时间信息,并通过API推送至服务器,实现收款信息的实时监控与推送。

二、主要功能

  1. 实时监控:脚本能够持续监控微信电脑版界面,一旦检测到新的收款到账通知,立即提取相关信息。
  2. 信息提取:通过正则表达式匹配,准确提取收款金额和收款时间信息。
  3. 签名验证:在发送请求前,通过MD5算法生成签名,确保数据的安全性。
  4. 心跳发送:通过定时发送心跳包,保持与服务器的连接,确保脚本的稳定运行。
  5. 日志记录:详细记录脚本的运行日志,方便用户查看和排查问题。

三、使用说明

  1. 配置API密钥:在使用脚本前,需要配置正确的API密钥,以便能够成功推送数据至服务器。
  2. 启动脚本:运行main函数,脚本将自动进入监控状态,开始识别并推送收款信息。
  3. 注意事项:请确保微信电脑版窗口处于显示状态,以便脚本能够正确识别收款到账通知。

四、应用场景

该脚本适用于需要实时监控微信收款信息的各种场景,如电商平台、线下店铺、个人收款等。通过自动化处理,可以大大提高收款监控的效率和准确性,减少人工操作的繁琐和错误。

5,源码如下

import requests


import json


import time


import hashlib


import re


import uiautomation as automation


import os


import logging


import threading

# 定义保存和读取全局变量的文件路径


INFO_FILE = 'last_payment_info.json'


# 配置日志(如果还没有配置的话)


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 初始化全局变量


last_matched_info = None


last_push_time = None


last_pushed_amount = None

def save_last_info(timestamp, amount):


try:


logging.info(f"保存最后付款信息:: {timestamp}, {amount}")


with open(INFO_FILE, 'w', encoding='utf-8') as f:


json.dump({'last_push_time': timestamp, 'last_pushed_amount': amount}, f)


logging.info(f"保存最后付款信息到:{INFO_FILE}: {timestamp}, {amount}")


except Exception as e:


logging.info(f"保存最后付款信息失败: {str(e)}")

def load_last_info():


try:


with open(INFO_FILE, 'r', encoding='utf-8') as f:


return json.load(f), False # 返回数据和“不是第一次运行”的标志


except FileNotFoundError:


return {'last_push_time': None, 'last_pushed_amount': None}, True # 如果是第一次运行,返回初始数据和“是第一次运行”的标志

# PHP应用程序的URL前缀


BASE_URL = '你的网址'

# 识别收款金额+收款时间信息


def explore_control(control, depth, target_depth):


global last_matched_info, amount


try:


name = control.Name


if name:


if depth == target_depth:


# 匹配收款时间信息


match = re.search(r'收款到账通知[\s\S]*?(\d{2}月\d{2}日 \d{2}:\d{2})', name)


if match:


time_str = match.group(1)


current_year = time.strftime('%Y')


full_time_str = f"{current_year}年{time_str}"


timestamp = int(time.mktime(time.strptime(full_time_str, '%Y年%m月%d日 %H:%M')))

# 匹配收款金额信息


match_amount = re.search(r'收款金额¥([\d.]+)', name)


if match_amount:


amount = match_amount.group(1)


last_matched_info = (timestamp, amount)


return


# 递归处理子控件


for child in control.GetChildren():


explore_control(child, depth + 4, target_depth)


except Exception as e:


logging.info(f"发生错误: {str(e)}")

def process_wechat_window(wechat_window, prev_info, order_type, api_key):


global last_matched_info, last_push_time, last_pushed_amount


if wechat_window.Exists(0):


explore_control(wechat_window, 0, 60)


if last_matched_info and last_matched_info != prev_info:


timestamp, amount = last_matched_info


if last_push_time != timestamp or last_pushed_amount != amount:


logging.info(f"检测到新付款: {last_matched_info}")


logging.info("-----------------------------------------------------------------")


logging.info("正在推送数据...")


logging.info("-----------------------------------------------------------------")

# 向服务器发送付款成功请求


push_payment_data(timestamp, order_type, float(amount), api_key)


# 如果检测到新的付款信息,则更新并保存


save_last_info(timestamp, amount)


# 更新上一次推送的信息


last_push_time = timestamp


last_pushed_amount = amount

prev_info = last_matched_info


else:


logging.info("无法获取到窗口,请保持微信支付窗口显示...")


return prev_info


def md5_sign(timestamp, api_key):


sign_str = f"{timestamp}{api_key}"


hashed = hashlib.md5(sign_str.encode('utf-8')).hexdigest()


return hashed

# 调用getState接口获取监控端状态


def get_state(api_key, timestamp):


url = f"{BASE_URL}getState"


params = {


't': timestamp,


'sign': md5_sign(timestamp, api_key)


}


response = requests.get(url, params=params)


if response.status_code == 200:


data = response.json()


if data['code'] == 1:


logging.info("获取监控端状态成功: %s", data['data'])


else:


logging.info("获取监控端状态失败: %s", data['msg'])


else:


logging.info("请求失败:", response.status_code)


# 心跳发送线程类


class HeartbeatThread(threading.Thread):


def __init__(self, api_key, interval=350):


super().__init__()


self.api_key = api_key


self.interval = interval


self.stop_event = threading.Event()

def run(self):


while not self.stop_event.is_set():


send_heartbeat(self.api_key)


time.sleep(self.interval)

def stop(self):


self.stop_event.set()


# 调用appHeart接口发送心跳


def send_heartbeat(api_key):


timestamp = int(time.time())


sign = md5_sign(timestamp, api_key)


url = f"{BASE_URL}appHeart"


params = {


't': timestamp,


'sign': sign


}


response = requests.get(url, params=params)


if response.status_code == 200:


data = response.json()


if data['code'] == 1:


logging.info("心跳发送成功")


else:


logging.info("心跳发送失败: %s", data['msg'])


else:


logging.info("请求失败:", response.status_code)

def md5_sign_push(timestamp, order_type, price, api_key):


# 生成签名


data = f"{order_type}{price}{timestamp}{api_key}"


return hashlib.md5(data.encode()).hexdigest()

def push_payment_data(timestamp, order_type, price, api_key):


# 构建请求参数


sign = md5_sign_push(timestamp, order_type, price, api_key)


params = {


't': timestamp,


'type': order_type,


'price': price,


'sign': sign


}

# 发送POST请求


response = requests.post(f"{BASE_URL}appPush", data=params)

# 检查响应状态码


if response.status_code == 200:


data = response.json()


if data['code'] == 1:


logging.info("推送付款数据成功")

else:


logging.info("推送付款数据失败: %s", data['msg'])


else:


logging.info("请求失败:", response.status_code)

def main():


global last_push_time, last_pushed_amount

# 尝试从文件中加载上次的信息


# 从load_last_info接收两个值:last_info_dict(字典)和is_first_run(布尔值)


last_info_dict, is_first_run = load_last_info()


# 现在可以从字典中获取值,而不是从元组


last_push_time = last_info_dict.get('last_push_time')


last_pushed_amount = last_info_dict.get('last_pushed_amount')


# 检查文件是否存在,如果不存在则创建一个初始文件


if not os.path.exists(INFO_FILE):


initial_time = int(time.time()) # 设置初始时间戳为当前时间


initial_amount = '0.00' # 设置初始金额为0.00,可以根据需要调整


save_last_info(initial_time, initial_amount)


logging.info("初始文件已创建")

order_type = '1' # 替换为实际的订单类型


api_key = '你的aip密匙




' # 替换为实际的API密钥

get_state(api_key, int(time.time()))


# 创建并启动心跳发送线程


heartbeat_thread = HeartbeatThread(api_key)


heartbeat_thread.start()


# 主循环


while True:


try:


# 获取微信窗口


wechat_window = automation.WindowControl(searchDepth=1, ClassName='ChatWnd')


# 处理微信窗口并获取新的付款信息


prev_info = process_wechat_window(wechat_window, (last_push_time, last_pushed_amount), order_type, api_key)

# 如果检测到新的付款信息,并且不是第一次运行,则推送并保存


if last_matched_info and (last_push_time != last_matched_info[0] or last_pushed_amount != last_matched_info[1]) and not is_first_run:


timestamp, amount = last_matched_info


push_payment_data(timestamp, order_type, float(amount), api_key)


save_last_info(timestamp, amount)


logging.info(f"新的付款信息已推送并保存: 时间戳={timestamp}, 收款金额={amount}")

except Exception as e:


logging.info(f"发生错误: {str(e)}")

time.sleep(2) # 暂停2秒后继续监听

# 主程序入口


if __name__ == "__main__":


logging.info("-----------------------------------------------------------------")


logging.info("欢迎使用wanruihui微信电脑版收款监控脚本...")


logging.info("-----------------------------------------------------------------")


main()

 

 

本文最后更新于2024年05月04日,若涉及的内容可能已经失效,直接留言反馈补链即可,我们会处理,谢谢。

V免签PC监控端1.0源码-玩锐汇爱分享
V免签PC监控端1.0源码
此内容为付费阅读,请付费后查看
99
限时特惠
9999
立即购买
您当前未登录!建议登陆后购买,可保存购买订单
付费阅读
© 版权声明
THE END
喜欢就支持一下吧
点赞8赞赏 分享