WeChat Automation Skill
Expert guidance for WeChat monitoring and automation using wxauto (Windows) and Accessibility API (macOS).
Overview
WeReply uses Platform-specific Agents to monitor WeChat conversations and control the input box:
- Windows Agent: Python 3.12 + wxauto v4
- macOS Agent: Swift + Accessibility API
- Communication: JSON protocol via stdin/stdout with Rust Orchestrator
Architecture Pattern
微信窗口
↓ (UI Automation)
Platform Agent
├→ 监听消息(定时轮询)
├→ 提取消息内容
├→ 发送到 Orchestrator (JSON via stdout)
└→ 接收命令 (JSON via stdin)
↓
控制输入框(写入建议)
Windows Agent - wxauto v4
Installation and Setup
bash
1# 安装依赖
2pip install wxauto==4.0.0
3
4# 确保微信已登录且窗口可见
Message Monitoring Pattern
python
1import json
2import time
3import sys
4from wxauto import WeChat
5
6class WeChatMonitor:
7 def __init__(self, interval_ms: int = 500):
8 """
9 初始化微信监听器
10
11 Args:
12 interval_ms: 监听间隔(毫秒),默认 500ms
13 """
14 self.wechat = WeChat()
15 self.interval_ms = interval_ms
16 self.last_message_id = None
17
18 def start_monitoring(self):
19 """开始监听微信消息"""
20 try:
21 while True:
22 # 获取当前聊天窗口的最新消息
23 messages = self.wechat.GetAllMessage()
24
25 if messages and len(messages) > 0:
26 latest_message = messages[-1]
27
28 # 检查是否是新消息(避免重复处理)
29 message_id = self._generate_message_id(latest_message)
30 if message_id != self.last_message_id:
31 self.last_message_id = message_id
32 self._send_message_to_orchestrator(latest_message)
33
34 # 间隔等待
35 time.sleep(self.interval_ms / 1000.0)
36
37 except KeyboardInterrupt:
38 self._send_error("监听被用户中断")
39 except Exception as e:
40 self._send_error(f"监听错误: {str(e)}")
41
42 def _generate_message_id(self, message) -> str:
43 """生成消息唯一ID(用于去重)"""
44 # 结合时间戳、发送者、内容生成ID
45 content = message.get('content', '')
46 sender = message.get('sender', '')
47 timestamp = message.get('time', '')
48 return f"{sender}:{timestamp}:{hash(content)}"
49
50 def _send_message_to_orchestrator(self, message):
51 """
52 发送消息到 Rust Orchestrator
53
54 格式:
55 {
56 "type": "MessageNew",
57 "content": "消息内容",
58 "sender": "发送者",
59 "timestamp": "2024-01-23T10:30:00"
60 }
61 """
62 payload = {
63 "type": "MessageNew",
64 "content": message.get('content', ''),
65 "sender": message.get('sender', ''),
66 "timestamp": message.get('time', '')
67 }
68
69 # 输出到 stdout(Rust 会读取)
70 print(json.dumps(payload, ensure_ascii=False), flush=True)
71
72 def _send_error(self, error_message: str):
73 """发送错误信息到 Orchestrator"""
74 payload = {
75 "type": "Error",
76 "message": error_message
77 }
78 print(json.dumps(payload, ensure_ascii=False), flush=True)
79
80# 使用示例
81if __name__ == '__main__':
82 monitor = WeChatMonitor(interval_ms=500)
83 monitor.start_monitoring()
python
1class WeChatInputWriter:
2 def __init__(self):
3 self.wechat = WeChat()
4
5 def write_to_input(self, content: str) -> bool:
6 """
7 写入内容到微信输入框
8
9 Args:
10 content: 要写入的文本
11
12 Returns:
13 bool: 写入是否成功
14 """
15 try:
16 # 使用 wxauto 写入输入框
17 self.wechat.SendMsg(content)
18 return True
19 except Exception as e:
20 self._send_error(f"写入失败: {str(e)}")
21 return False
22
23 def clear_input(self) -> bool:
24 """清空输入框"""
25 try:
26 # wxauto v4 提供的清空方法
27 self.wechat.ClearMsg()
28 return True
29 except Exception as e:
30 self._send_error(f"清空失败: {str(e)}")
31 return False
32
33 def _send_error(self, error_message: str):
34 """发送错误到 Orchestrator"""
35 payload = {
36 "type": "Error",
37 "message": error_message
38 }
39 print(json.dumps(payload, ensure_ascii=False), flush=True)
Command Handling Pattern
python
1import sys
2import json
3import threading
4
5class AgentCommandHandler:
6 def __init__(self):
7 self.input_writer = WeChatInputWriter()
8 self.running = True
9
10 def start_command_listener(self):
11 """监听来自 Orchestrator 的命令(stdin)"""
12 thread = threading.Thread(target=self._listen_commands, daemon=True)
13 thread.start()
14
15 def _listen_commands(self):
16 """从 stdin 读取命令"""
17 try:
18 for line in sys.stdin:
19 if not self.running:
20 break
21
22 try:
23 command = json.loads(line.strip())
24 self._handle_command(command)
25 except json.JSONDecodeError:
26 self._send_error(f"无效的 JSON 命令: {line}")
27
28 except Exception as e:
29 self._send_error(f"命令监听错误: {str(e)}")
30
31 def _handle_command(self, command: dict):
32 """处理命令"""
33 cmd_type = command.get('type')
34
35 if cmd_type == 'WriteInput':
36 content = command.get('content', '')
37 success = self.input_writer.write_to_input(content)
38 self._send_response(success)
39
40 elif cmd_type == 'ClearInput':
41 success = self.input_writer.clear_input()
42 self._send_response(success)
43
44 elif cmd_type == 'HealthCheck':
45 self._send_health_status()
46
47 else:
48 self._send_error(f"未知命令类型: {cmd_type}")
49
50 def _send_response(self, success: bool):
51 """发送命令执行结果"""
52 payload = {
53 "type": "CommandResponse",
54 "success": success
55 }
56 print(json.dumps(payload, ensure_ascii=False), flush=True)
57
58 def _send_health_status(self):
59 """发送健康状态"""
60 payload = {
61 "type": "HealthStatus",
62 "status": "ok",
63 "agent_type": "windows_wxauto"
64 }
65 print(json.dumps(payload, ensure_ascii=False), flush=True)
66
67 def _send_error(self, error_message: str):
68 """发送错误"""
69 payload = {
70 "type": "Error",
71 "message": error_message
72 }
73 print(json.dumps(payload, ensure_ascii=False), flush=True)
macOS Agent - Accessibility API
Swift Implementation Pattern
swift
1import Cocoa
2import ApplicationServices
3
4class WeChatMonitor {
5 private var monitoringTimer: Timer?
6 private var lastMessageId: String?
7 private let intervalMs: Int
8
9 init(intervalMs: Int = 500) {
10 self.intervalMs = intervalMs
11 }
12
13 func startMonitoring() {
14 // 请求 Accessibility 权限
15 if !AXIsProcessTrusted() {
16 let options = [kAXTrustedCheckOptionPrompt.takeUnretainedValue() as String: true]
17 AXIsProcessTrustedWithOptions(options as CFDictionary)
18 return
19 }
20
21 // 启动定时器
22 monitoringTimer = Timer.scheduledTimer(
23 withTimeInterval: TimeInterval(intervalMs) / 1000.0,
24 repeats: true
25 ) { [weak self] _ in
26 self?.checkForNewMessages()
27 }
28
29 RunLoop.main.run()
30 }
31
32 private func checkForNewMessages() {
33 guard let wechatApp = getWeChatApplication() else {
34 return
35 }
36
37 // 使用 Accessibility API 获取消息
38 if let messages = extractMessages(from: wechatApp) {
39 if let latestMessage = messages.last {
40 let messageId = generateMessageId(message: latestMessage)
41
42 if messageId != lastMessageId {
43 lastMessageId = messageId
44 sendMessageToOrchestrator(message: latestMessage)
45 }
46 }
47 }
48 }
49
50 private func getWeChatApplication() -> AXUIElement? {
51 let runningApps = NSWorkspace.shared.runningApplications
52 guard let wechatApp = runningApps.first(where: { $0.bundleIdentifier == "com.tencent.xinWeChat" }) else {
53 return nil
54 }
55
56 return AXUIElementCreateApplication(wechatApp.processIdentifier)
57 }
58
59 private func extractMessages(from app: AXUIElement) -> [[String: String]]? {
60 // 使用 Accessibility API 提取消息列表
61 // 这需要深入分析微信的 UI 层次结构
62
63 var messagesValue: AnyObject?
64 let result = AXUIElementCopyAttributeValue(app, kAXChildrenAttribute as CFString, &messagesValue)
65
66 guard result == .success, let windows = messagesValue as? [AXUIElement] else {
67 return nil
68 }
69
70 // 遍历窗口,找到聊天窗口,提取消息
71 // 具体实现需要根据微信的 UI 结构调整
72
73 return nil // Placeholder
74 }
75
76 private func generateMessageId(message: [String: String]) -> String {
77 let content = message["content"] ?? ""
78 let sender = message["sender"] ?? ""
79 let timestamp = message["timestamp"] ?? ""
80 return "\(sender):\(timestamp):\(content.hashValue)"
81 }
82
83 private func sendMessageToOrchestrator(message: [String: String]) {
84 let payload: [String: Any] = [
85 "type": "MessageNew",
86 "content": message["content"] ?? "",
87 "sender": message["sender"] ?? "",
88 "timestamp": message["timestamp"] ?? ""
89 ]
90
91 if let jsonData = try? JSONSerialization.data(withJSONObject: payload),
92 let jsonString = String(data: jsonData, encoding: .utf8) {
93 print(jsonString, terminator: "\n")
94 fflush(stdout)
95 }
96 }
97}
swift
1class WeChatInputWriter {
2 func writeToInput(content: String) -> Bool {
3 guard let wechatApp = getWeChatApplication() else {
4 sendError(message: "未找到微信应用")
5 return false
6 }
7
8 // 查找输入框
9 guard let inputField = findInputField(in: wechatApp) else {
10 sendError(message: "未找到输入框")
11 return false
12 }
13
14 // 写入内容
15 var value = content as CFTypeRef
16 let result = AXUIElementSetAttributeValue(inputField, kAXValueAttribute as CFString, value)
17
18 if result == .success {
19 return true
20 } else {
21 sendError(message: "写入失败: \(result.rawValue)")
22 return false
23 }
24 }
25
26 private func findInputField(in app: AXUIElement) -> AXUIElement? {
27 // 使用 Accessibility API 查找输入框
28 // 需要遍历 UI 层次结构找到输入框元素
29 return nil // Placeholder
30 }
31
32 private func getWeChatApplication() -> AXUIElement? {
33 // 同上
34 return nil
35 }
36
37 private func sendError(message: String) {
38 let payload: [String: Any] = [
39 "type": "Error",
40 "message": message
41 ]
42
43 if let jsonData = try? JSONSerialization.data(withJSONObject: payload),
44 let jsonString = String(data: jsonData, encoding: .utf8) {
45 print(jsonString, terminator: "\n")
46 fflush(stdout)
47 }
48 }
49}
Message Deduplication Strategy
Time-based Deduplication
python
1from datetime import datetime, timedelta
2
3class MessageDeduplicator:
4 def __init__(self, window_seconds: int = 5):
5 """
6 消息去重器
7
8 Args:
9 window_seconds: 去重时间窗口(秒)
10 """
11 self.seen_messages = {} # {message_id: timestamp}
12 self.window_seconds = window_seconds
13
14 def is_duplicate(self, message_id: str) -> bool:
15 """检查消息是否重复"""
16 now = datetime.now()
17
18 # 清理过期的消息记录
19 self._clean_old_messages(now)
20
21 # 检查是否已见过
22 if message_id in self.seen_messages:
23 return True
24
25 # 记录新消息
26 self.seen_messages[message_id] = now
27 return False
28
29 def _clean_old_messages(self, now: datetime):
30 """清理过期的消息记录"""
31 cutoff = now - timedelta(seconds=self.window_seconds)
32 self.seen_messages = {
33 msg_id: timestamp
34 for msg_id, timestamp in self.seen_messages.items()
35 if timestamp > cutoff
36 }
Polling Interval Tuning
python
1class AdaptiveMonitor:
2 def __init__(self, min_interval_ms: int = 200, max_interval_ms: int = 1000):
3 """
4 自适应监听间隔
5
6 当有活跃消息时,使用较短间隔(200ms)
7 当长时间无消息时,逐渐增加到最大间隔(1000ms)
8 """
9 self.min_interval = min_interval_ms / 1000.0
10 self.max_interval = max_interval_ms / 1000.0
11 self.current_interval = self.min_interval
12 self.idle_count = 0
13
14 def get_next_interval(self, has_new_message: bool) -> float:
15 """获取下次轮询间隔"""
16 if has_new_message:
17 # 有新消息,使用最短间隔
18 self.current_interval = self.min_interval
19 self.idle_count = 0
20 else:
21 # 无新消息,逐渐增加间隔
22 self.idle_count += 1
23 if self.idle_count > 5: # 5次无消息后开始增加间隔
24 self.current_interval = min(
25 self.current_interval * 1.2,
26 self.max_interval
27 )
28
29 return self.current_interval
Memory Optimization
python
1import gc
2
3class MemoryEfficientMonitor:
4 def __init__(self):
5 self.message_buffer_size = 100 # 只保留最近100条消息
6 self.message_buffer = []
7
8 def add_message(self, message):
9 """添加消息到缓冲区"""
10 self.message_buffer.append(message)
11
12 # 超过缓冲区大小,清理旧消息
13 if len(self.message_buffer) > self.message_buffer_size:
14 self.message_buffer = self.message_buffer[-self.message_buffer_size:]
15 gc.collect() # 触发垃圾回收
Error Handling and Recovery
Graceful Degradation
python
1class RobustAgent:
2 def __init__(self):
3 self.max_retries = 3
4 self.retry_delay_seconds = 2
5
6 def monitor_with_retry(self):
7 """带重试的监听"""
8 retry_count = 0
9
10 while retry_count < self.max_retries:
11 try:
12 self.start_monitoring()
13 break # 成功,跳出循环
14 except Exception as e:
15 retry_count += 1
16 self._send_error(f"监听失败 (尝试 {retry_count}/{self.max_retries}): {str(e)}")
17
18 if retry_count < self.max_retries:
19 time.sleep(self.retry_delay_seconds)
20 else:
21 self._send_error("监听失败次数过多,Agent 退出")
22 sys.exit(1)
Health Check
python
1class HealthMonitor:
2 def __init__(self):
3 self.last_heartbeat = time.time()
4 self.heartbeat_interval = 10 # 每10秒发送一次心跳
5
6 def send_heartbeat(self):
7 """发送心跳到 Orchestrator"""
8 payload = {
9 "type": "Heartbeat",
10 "timestamp": time.time(),
11 "status": "ok"
12 }
13 print(json.dumps(payload, ensure_ascii=False), flush=True)
14 self.last_heartbeat = time.time()
Security Considerations
python
1def validate_command(command: dict) -> bool:
2 """验证来自 Orchestrator 的命令"""
3 # 检查命令类型
4 if 'type' not in command:
5 return False
6
7 cmd_type = command['type']
8
9 # 只接受预定义的命令类型
10 valid_types = ['WriteInput', 'ClearInput', 'HealthCheck', 'Stop']
11 if cmd_type not in valid_types:
12 return False
13
14 # 验证内容长度(防止恶意超长内容)
15 if cmd_type == 'WriteInput':
16 content = command.get('content', '')
17 if len(content) > 10000: # 最大10KB
18 return False
19
20 return True
Privacy Protection
python
1def sanitize_message_for_logging(message: dict) -> dict:
2 """清理消息中的敏感信息(用于日志)"""
3 sanitized = message.copy()
4
5 # 不记录完整的消息内容
6 if 'content' in sanitized:
7 content = sanitized['content']
8 if len(content) > 50:
9 sanitized['content'] = content[:50] + '...'
10
11 return sanitized
Testing Guidelines
Unit Testing
python
1import unittest
2from unittest.mock import Mock, patch
3
4class TestWeChatMonitor(unittest.TestCase):
5 def test_message_deduplication(self):
6 """测试消息去重"""
7 deduplicator = MessageDeduplicator(window_seconds=5)
8
9 message_id = "test_message_1"
10
11 # 第一次应该不是重复
12 self.assertFalse(deduplicator.is_duplicate(message_id))
13
14 # 第二次应该是重复
15 self.assertTrue(deduplicator.is_duplicate(message_id))
16
17 @patch('wxauto.WeChat')
18 def test_monitor_initialization(self, mock_wechat):
19 """测试监听器初始化"""
20 monitor = WeChatMonitor(interval_ms=500)
21 self.assertEqual(monitor.interval_ms, 500)
22 self.assertIsNone(monitor.last_message_id)
When to Use This Skill
Activate this skill when:
- Implementing WeChat message monitoring
- Developing Platform Agents (Windows/macOS)
- Working with wxauto or Accessibility API
- Handling message extraction and deduplication
- Implementing input box control
- Optimizing Agent performance
- Handling Agent errors and recovery
- Setting up IPC communication with Orchestrator