wechat-automation
0
总安装量
4
周安装量
安装命令
npx skills add https://github.com/cacr92/wereply --skill wechat-automation
Agent 安装分布
openclaw
2
cursor
2
opencode
2
replit
2
mcpjam
1
Skill 文档
WeChat Automation Skill
Expert guidance for WeChat monitoring and automation using wxauto (Windows) and Accessibility API (macOS).
Overview
WeReply uses Platform-specific Agents to monitor WeChat conversations and control the input box:
- Windows Agent: Python 3.12 + wxauto v4
- macOS Agent: Swift + Accessibility API
- Communication: JSON protocol via stdin/stdout with Rust Orchestrator
Architecture Pattern
微信çªå£
â (UI Automation)
Platform Agent
ââ ç嬿¶æ¯ï¼å®æ¶è½®è¯¢ï¼
ââ æåæ¶æ¯å
容
ââ åéå° Orchestrator (JSON via stdout)
ââ æ¥æ¶å½ä»¤ (JSON via stdin)
â
æ§å¶è¾å
¥æ¡ï¼åå
¥å»ºè®®ï¼
Windows Agent – wxauto v4
Installation and Setup
# å®è£
ä¾èµ
pip install wxauto==4.0.0
# ç¡®ä¿å¾®ä¿¡å·²ç»å½ä¸çªå£å¯è§
Message Monitoring Pattern
import json
import time
import sys
from wxauto import WeChat
class WeChatMonitor:
def __init__(self, interval_ms: int = 500):
"""
åå§å微信çå¬å¨
Args:
interval_ms: çå¬é´éï¼æ¯«ç§ï¼ï¼é»è®¤ 500ms
"""
self.wechat = WeChat()
self.interval_ms = interval_ms
self.last_message_id = None
def start_monitoring(self):
"""å¼å§çå¬å¾®ä¿¡æ¶æ¯"""
try:
while True:
# è·åå½åè天çªå£çææ°æ¶æ¯
messages = self.wechat.GetAllMessage()
if messages and len(messages) > 0:
latest_message = messages[-1]
# æ£æ¥æ¯å¦æ¯æ°æ¶æ¯ï¼é¿å
éå¤å¤çï¼
message_id = self._generate_message_id(latest_message)
if message_id != self.last_message_id:
self.last_message_id = message_id
self._send_message_to_orchestrator(latest_message)
# é´éçå¾
time.sleep(self.interval_ms / 1000.0)
except KeyboardInterrupt:
self._send_error("çå¬è¢«ç¨æ·ä¸æ")
except Exception as e:
self._send_error(f"çå¬é误: {str(e)}")
def _generate_message_id(self, message) -> str:
"""çææ¶æ¯å¯ä¸IDï¼ç¨äºå»éï¼"""
# ç»åæ¶é´æ³ãåéè
ãå
容çæID
content = message.get('content', '')
sender = message.get('sender', '')
timestamp = message.get('time', '')
return f"{sender}:{timestamp}:{hash(content)}"
def _send_message_to_orchestrator(self, message):
"""
åéæ¶æ¯å° Rust Orchestrator
æ ¼å¼ï¼
{
"type": "MessageNew",
"content": "æ¶æ¯å
容",
"sender": "åéè
",
"timestamp": "2024-01-23T10:30:00"
}
"""
payload = {
"type": "MessageNew",
"content": message.get('content', ''),
"sender": message.get('sender', ''),
"timestamp": message.get('time', '')
}
# è¾åºå° stdoutï¼Rust ä¼è¯»åï¼
print(json.dumps(payload, ensure_ascii=False), flush=True)
def _send_error(self, error_message: str):
"""åéé误信æ¯å° Orchestrator"""
payload = {
"type": "Error",
"message": error_message
}
print(json.dumps(payload, ensure_ascii=False), flush=True)
# 使ç¨ç¤ºä¾
if __name__ == '__main__':
monitor = WeChatMonitor(interval_ms=500)
monitor.start_monitoring()
Input Box Control Pattern
class WeChatInputWriter:
def __init__(self):
self.wechat = WeChat()
def write_to_input(self, content: str) -> bool:
"""
åå
¥å
容å°å¾®ä¿¡è¾å
¥æ¡
Args:
content: è¦åå
¥çææ¬
Returns:
bool: åå
¥æ¯å¦æå
"""
try:
# ä½¿ç¨ wxauto åå
¥è¾å
¥æ¡
self.wechat.SendMsg(content)
return True
except Exception as e:
self._send_error(f"åå
¥å¤±è´¥: {str(e)}")
return False
def clear_input(self) -> bool:
"""æ¸
空è¾å
¥æ¡"""
try:
# wxauto v4 æä¾çæ¸
ç©ºæ¹æ³
self.wechat.ClearMsg()
return True
except Exception as e:
self._send_error(f"æ¸
空失败: {str(e)}")
return False
def _send_error(self, error_message: str):
"""åééè¯¯å° Orchestrator"""
payload = {
"type": "Error",
"message": error_message
}
print(json.dumps(payload, ensure_ascii=False), flush=True)
Command Handling Pattern
import sys
import json
import threading
class AgentCommandHandler:
def __init__(self):
self.input_writer = WeChatInputWriter()
self.running = True
def start_command_listener(self):
"""ç嬿¥èª Orchestrator çå½ä»¤ï¼stdinï¼"""
thread = threading.Thread(target=self._listen_commands, daemon=True)
thread.start()
def _listen_commands(self):
"""ä» stdin 读åå½ä»¤"""
try:
for line in sys.stdin:
if not self.running:
break
try:
command = json.loads(line.strip())
self._handle_command(command)
except json.JSONDecodeError:
self._send_error(f"æ æç JSON å½ä»¤: {line}")
except Exception as e:
self._send_error(f"å½ä»¤çå¬é误: {str(e)}")
def _handle_command(self, command: dict):
"""å¤çå½ä»¤"""
cmd_type = command.get('type')
if cmd_type == 'WriteInput':
content = command.get('content', '')
success = self.input_writer.write_to_input(content)
self._send_response(success)
elif cmd_type == 'ClearInput':
success = self.input_writer.clear_input()
self._send_response(success)
elif cmd_type == 'HealthCheck':
self._send_health_status()
else:
self._send_error(f"æªç¥å½ä»¤ç±»å: {cmd_type}")
def _send_response(self, success: bool):
"""åéå½ä»¤æ§è¡ç»æ"""
payload = {
"type": "CommandResponse",
"success": success
}
print(json.dumps(payload, ensure_ascii=False), flush=True)
def _send_health_status(self):
"""åéå¥åº·ç¶æ"""
payload = {
"type": "HealthStatus",
"status": "ok",
"agent_type": "windows_wxauto"
}
print(json.dumps(payload, ensure_ascii=False), flush=True)
def _send_error(self, error_message: str):
"""åéé误"""
payload = {
"type": "Error",
"message": error_message
}
print(json.dumps(payload, ensure_ascii=False), flush=True)
macOS Agent – Accessibility API
Swift Implementation Pattern
import Cocoa
import ApplicationServices
class WeChatMonitor {
private var monitoringTimer: Timer?
private var lastMessageId: String?
private let intervalMs: Int
init(intervalMs: Int = 500) {
self.intervalMs = intervalMs
}
func startMonitoring() {
// è¯·æ± Accessibility æé
if !AXIsProcessTrusted() {
let options = [kAXTrustedCheckOptionPrompt.takeUnretainedValue() as String: true]
AXIsProcessTrustedWithOptions(options as CFDictionary)
return
}
// å¯å¨å®æ¶å¨
monitoringTimer = Timer.scheduledTimer(
withTimeInterval: TimeInterval(intervalMs) / 1000.0,
repeats: true
) { [weak self] _ in
self?.checkForNewMessages()
}
RunLoop.main.run()
}
private func checkForNewMessages() {
guard let wechatApp = getWeChatApplication() else {
return
}
// ä½¿ç¨ Accessibility API è·åæ¶æ¯
if let messages = extractMessages(from: wechatApp) {
if let latestMessage = messages.last {
let messageId = generateMessageId(message: latestMessage)
if messageId != lastMessageId {
lastMessageId = messageId
sendMessageToOrchestrator(message: latestMessage)
}
}
}
}
private func getWeChatApplication() -> AXUIElement? {
let runningApps = NSWorkspace.shared.runningApplications
guard let wechatApp = runningApps.first(where: { $0.bundleIdentifier == "com.tencent.xinWeChat" }) else {
return nil
}
return AXUIElementCreateApplication(wechatApp.processIdentifier)
}
private func extractMessages(from app: AXUIElement) -> [[String: String]]? {
// ä½¿ç¨ Accessibility API æåæ¶æ¯å表
// è¿éè¦æ·±å
¥åæå¾®ä¿¡ç UI 屿¬¡ç»æ
var messagesValue: AnyObject?
let result = AXUIElementCopyAttributeValue(app, kAXChildrenAttribute as CFString, &messagesValue)
guard result == .success, let windows = messagesValue as? [AXUIElement] else {
return nil
}
// éåçªå£ï¼æ¾å°è天çªå£ï¼æåæ¶æ¯
// å
·ä½å®ç°éè¦æ ¹æ®å¾®ä¿¡ç UI ç»æè°æ´
return nil // Placeholder
}
private func generateMessageId(message: [String: String]) -> String {
let content = message["content"] ?? ""
let sender = message["sender"] ?? ""
let timestamp = message["timestamp"] ?? ""
return "\(sender):\(timestamp):\(content.hashValue)"
}
private func sendMessageToOrchestrator(message: [String: String]) {
let payload: [String: Any] = [
"type": "MessageNew",
"content": message["content"] ?? "",
"sender": message["sender"] ?? "",
"timestamp": message["timestamp"] ?? ""
]
if let jsonData = try? JSONSerialization.data(withJSONObject: payload),
let jsonString = String(data: jsonData, encoding: .utf8) {
print(jsonString, terminator: "\n")
fflush(stdout)
}
}
}
Input Writer (Swift)
class WeChatInputWriter {
func writeToInput(content: String) -> Bool {
guard let wechatApp = getWeChatApplication() else {
sendError(message: "æªæ¾å°å¾®ä¿¡åºç¨")
return false
}
// æ¥æ¾è¾å
¥æ¡
guard let inputField = findInputField(in: wechatApp) else {
sendError(message: "æªæ¾å°è¾å
¥æ¡")
return false
}
// åå
¥å
容
var value = content as CFTypeRef
let result = AXUIElementSetAttributeValue(inputField, kAXValueAttribute as CFString, value)
if result == .success {
return true
} else {
sendError(message: "åå
¥å¤±è´¥: \(result.rawValue)")
return false
}
}
private func findInputField(in app: AXUIElement) -> AXUIElement? {
// ä½¿ç¨ Accessibility API æ¥æ¾è¾å
¥æ¡
// éè¦éå UI 屿¬¡ç»ææ¾å°è¾å
¥æ¡å
ç´
return nil // Placeholder
}
private func getWeChatApplication() -> AXUIElement? {
// åä¸
return nil
}
private func sendError(message: String) {
let payload: [String: Any] = [
"type": "Error",
"message": message
]
if let jsonData = try? JSONSerialization.data(withJSONObject: payload),
let jsonString = String(data: jsonData, encoding: .utf8) {
print(jsonString, terminator: "\n")
fflush(stdout)
}
}
}
Message Deduplication Strategy
Time-based Deduplication
from datetime import datetime, timedelta
class MessageDeduplicator:
def __init__(self, window_seconds: int = 5):
"""
æ¶æ¯å»éå¨
Args:
window_seconds: å»éæ¶é´çªå£ï¼ç§ï¼
"""
self.seen_messages = {} # {message_id: timestamp}
self.window_seconds = window_seconds
def is_duplicate(self, message_id: str) -> bool:
"""æ£æ¥æ¶æ¯æ¯å¦éå¤"""
now = datetime.now()
# æ¸
çè¿æçæ¶æ¯è®°å½
self._clean_old_messages(now)
# æ£æ¥æ¯å¦å·²è§è¿
if message_id in self.seen_messages:
return True
# è®°å½æ°æ¶æ¯
self.seen_messages[message_id] = now
return False
def _clean_old_messages(self, now: datetime):
"""æ¸
çè¿æçæ¶æ¯è®°å½"""
cutoff = now - timedelta(seconds=self.window_seconds)
self.seen_messages = {
msg_id: timestamp
for msg_id, timestamp in self.seen_messages.items()
if timestamp > cutoff
}
Performance Optimization
Polling Interval Tuning
class AdaptiveMonitor:
def __init__(self, min_interval_ms: int = 200, max_interval_ms: int = 1000):
"""
èªéåºçå¬é´é
å½ææ´»è·æ¶æ¯æ¶ï¼ä½¿ç¨è¾çé´éï¼200msï¼
å½é¿æ¶é´æ æ¶æ¯æ¶ï¼éæ¸å¢å 尿大é´éï¼1000msï¼
"""
self.min_interval = min_interval_ms / 1000.0
self.max_interval = max_interval_ms / 1000.0
self.current_interval = self.min_interval
self.idle_count = 0
def get_next_interval(self, has_new_message: bool) -> float:
"""è·å䏿¬¡è½®è¯¢é´é"""
if has_new_message:
# ææ°æ¶æ¯ï¼ä½¿ç¨æçé´é
self.current_interval = self.min_interval
self.idle_count = 0
else:
# æ æ°æ¶æ¯ï¼éæ¸å¢å é´é
self.idle_count += 1
if self.idle_count > 5: # 5æ¬¡æ æ¶æ¯åå¼å§å¢å é´é
self.current_interval = min(
self.current_interval * 1.2,
self.max_interval
)
return self.current_interval
Memory Optimization
import gc
class MemoryEfficientMonitor:
def __init__(self):
self.message_buffer_size = 100 # åªä¿çæè¿100æ¡æ¶æ¯
self.message_buffer = []
def add_message(self, message):
"""æ·»å æ¶æ¯å°ç¼å²åº"""
self.message_buffer.append(message)
# è¶
è¿ç¼å²åºå¤§å°ï¼æ¸
çæ§æ¶æ¯
if len(self.message_buffer) > self.message_buffer_size:
self.message_buffer = self.message_buffer[-self.message_buffer_size:]
gc.collect() # 触ååå¾åæ¶
Error Handling and Recovery
Graceful Degradation
class RobustAgent:
def __init__(self):
self.max_retries = 3
self.retry_delay_seconds = 2
def monitor_with_retry(self):
"""带éè¯ççå¬"""
retry_count = 0
while retry_count < self.max_retries:
try:
self.start_monitoring()
break # æåï¼è·³åºå¾ªç¯
except Exception as e:
retry_count += 1
self._send_error(f"çå¬å¤±è´¥ (å°è¯ {retry_count}/{self.max_retries}): {str(e)}")
if retry_count < self.max_retries:
time.sleep(self.retry_delay_seconds)
else:
self._send_error("çå¬å¤±è´¥æ¬¡æ°è¿å¤ï¼Agent éåº")
sys.exit(1)
Health Check
class HealthMonitor:
def __init__(self):
self.last_heartbeat = time.time()
self.heartbeat_interval = 10 # æ¯10ç§åé䏿¬¡å¿è·³
def send_heartbeat(self):
"""åéå¿è·³å° Orchestrator"""
payload = {
"type": "Heartbeat",
"timestamp": time.time(),
"status": "ok"
}
print(json.dumps(payload, ensure_ascii=False), flush=True)
self.last_heartbeat = time.time()
Security Considerations
Input Validation
def validate_command(command: dict) -> bool:
"""éªè¯æ¥èª Orchestrator çå½ä»¤"""
# æ£æ¥å½ä»¤ç±»å
if 'type' not in command:
return False
cmd_type = command['type']
# åªæ¥åé¢å®ä¹çå½ä»¤ç±»å
valid_types = ['WriteInput', 'ClearInput', 'HealthCheck', 'Stop']
if cmd_type not in valid_types:
return False
# éªè¯å
容é¿åº¦ï¼é²æ¢æ¶æè¶
é¿å
容ï¼
if cmd_type == 'WriteInput':
content = command.get('content', '')
if len(content) > 10000: # æå¤§10KB
return False
return True
Privacy Protection
def sanitize_message_for_logging(message: dict) -> dict:
"""æ¸
çæ¶æ¯ä¸çææä¿¡æ¯ï¼ç¨äºæ¥å¿ï¼"""
sanitized = message.copy()
# ä¸è®°å½å®æ´çæ¶æ¯å
容
if 'content' in sanitized:
content = sanitized['content']
if len(content) > 50:
sanitized['content'] = content[:50] + '...'
return sanitized
Testing Guidelines
Unit Testing
import unittest
from unittest.mock import Mock, patch
class TestWeChatMonitor(unittest.TestCase):
def test_message_deduplication(self):
"""æµè¯æ¶æ¯å»é"""
deduplicator = MessageDeduplicator(window_seconds=5)
message_id = "test_message_1"
# ç¬¬ä¸æ¬¡åºè¯¥ä¸æ¯éå¤
self.assertFalse(deduplicator.is_duplicate(message_id))
# ç¬¬äºæ¬¡åºè¯¥æ¯éå¤
self.assertTrue(deduplicator.is_duplicate(message_id))
@patch('wxauto.WeChat')
def test_monitor_initialization(self, mock_wechat):
"""æµè¯çå¬å¨åå§å"""
monitor = WeChatMonitor(interval_ms=500)
self.assertEqual(monitor.interval_ms, 500)
self.assertIsNone(monitor.last_message_id)
When to Use This Skill
Activate this skill when:
- Implementing WeChat message monitoring
- Developing Platform Agents (Windows/macOS)
- Working with wxauto or Accessibility API
- Handling message extraction and deduplication
- Implementing input box control
- Optimizing Agent performance
- Handling Agent errors and recovery
- Setting up IPC communication with Orchestrator