modified: README.md

new file:   dependence.py
	new file:   package.py
	new file:   packup.bat
	new file:   packup.sh
	new file:   requirements.txt
	new file:   scripts/__init__.py
	new file:   scripts/file_store_api.py
	new file:   src/__init__.py
	new file:   src/config.toml
	new file:   src/modules/__init__.py
	new file:   src/modules/plugin_modules.py
	new file:   src/modules/user_module.py
	new file:   src/process.py
	new file:   test.py
This commit is contained in:
JianFeeeee
2025-08-13 22:43:25 +08:00
parent 30be06cefb
commit f787218e51
15 changed files with 760 additions and 2 deletions

256
README.md
View File

@ -1,3 +1,255 @@
# chat_rebot_plugen_support
以下是针对您提供的SDK工具的开发者文档包含使用说明、工作流程和最佳实践
聊天机器人插件开发支持
---
# 插件SDK工具文档
## 1. 工具概览
### 文件结构
```
sdk/
├── dist/ # 打包输出目录
├── src/ # 插件源码目录
│ ├── packages/ # 依赖库目录(自动生成)
│ ├── process.py # 插件主逻辑文件
│ └── config.toml # 插件配置文件
├── packup.bat # Windows打包脚本
├── packup.sh # Linux/macOS打包脚本
├── dependence.py # 依赖安装工具
├── package.py # 打包工具
└── requirements.txt # 依赖声明文件
```
## 2. 快速开始
### 2.1 开发流程
1.`src/process.py`中开发插件逻辑
2. 添加依赖到 `requirements.txt`
3. 运行打包脚本:
```bash
# Windows
./packup.bat
# Linux/macOS
chmod +x packup.sh
./packup.sh
```
### 2.2 示例插件结构
`process.py` 最小示例:
```python
from src.modules.plugin_modules import BasePlugin
class MyPlugin(BasePlugin):
def after_save(self):
self.ctx.user.send_message("Hello World")
return "ok"
```
## 3. SDK工具详解
### 3.1 dependence.py
**功能**:安装依赖到 `src/packages/`
| 参数 | 说明 |
| ---------------- | -------------------------------- |
| `--no-deps` | 仅安装直接依赖(不含依赖的依赖) |
| `--no-upgrade` | 禁用自动升级 |
**示例**
```bash
python dependence.py --no-deps
```
### 3.2 package.py
**打包规则**
1. 自动检测继承 `BasePlugin`的类
2. 打包生成 `dist/{插件类名小写}.zip`
3. 包含文件:
- `process.py`
- `config.toml`
- `packages/`(依赖目录)
**错误处理**
| 错误码 | 说明 | 解决方案 |
| ------ | -------------------- | --------------- |
| 1 | process.py未找到 | 检查src目录结构 |
| 2 | 未找到BasePlugin子类 | 检查类继承关系 |
### 3.3 打包脚本
**packup.bat/packup.sh** 执行顺序:
1. 安装依赖dependence.py
2. 打包插件package.py
3. 输出到dist目录
## 4. 测试与调试
### 4.1 测试流程
使用 `test.py`进行本地测试:
```python
from sdk.src.process import MyPlugin
from src.modules.plugin_modules import MessageContext
# 模拟上下文
ctx = MessageContext(uid="test", gid=None, raw_message="hello", id="bot")
# 测试插件
plugin = MyPlugin(ctx)
print(plugin.after_save()) # 应该输出"ok"
```
### 4.2 调试技巧
```python
# 在process.py中添加调试代码
import pdb; pdb.set_trace() # 添加断点
# 查看可用属性
print(dir(self.ctx.user))
```
## 5. 最佳实践
### 5.1 依赖管理
✅ **推荐做法**
```text
# requirements.txt 示例
requests==2.31.0 # 固定版本
numpy>=1.21.0 # 最低版本限制
```
❌ **应避免**
```text
pytorch # 无版本声明
```
### 5.2 配置建议
`config.toml`标准结构:
```toml
[plugin]
name = "my_plugin"
version = "1.0.0"
[settings]
timeout = 30
```
## 6. 常见问题
### Q1: 打包后插件不生效
- ✅ 检查类名是否继承 `BasePlugin`
- ✅ 确认ZIP内文件在根目录不在src子目录
### Q2:
## **7. `self.ctx` 核心对象方法/属性总表**
#### **1. 基础信息**
| 属性/方法 | 类型 | 说明 | 示例 |
| --------------- | ----------------- | -------------------------------------- | ------------------------------ |
| `raw_message` | `str` | 用户原始消息文本 | `ctx.raw_message` → "hello" |
| `response` | `Optional[str]` | 可设置的响应内容(设置后拦截后续处理) | `ctx.response = "ok"` |
| `rebot_id` | `str` | 当前机器人ID | `print(ctx.rebot_id)` |
| `_processed` | `bool` | 标记消息是否已被处理(自动管理) | `if ctx._processed: ...` |
#### **2. 用户相关 (`ctx.user`)**
| 属性/方法 | 类型 | 说明 | 示例 |
| --------------------------- | ----------------- | --------------------------------------------- | --------------------------------------- |
| `user.user_id` | `str` | 用户唯一ID | `uid = ctx.user.user_id` |
| `user.nickname` | `Optional[str]` | 用户昵称自动从API获取 | `greet = f"Hi {ctx.user.nickname}"` |
| `user.messages` | `List[dict]` | 用户历史消息记录(需 `after_load`后才有值) | `last_msg = ctx.user.messages[-1]` |
| `user.send_message()` | `method` | **发送私聊消息** | `ctx.user.send_message("Hello")` |
| `user.set_input_status()` | `method` | 设置用户输入状态(如"typing" | `ctx.user.set_input_status("typing")` |
#### **3. 群组相关 (`ctx.group`)**
> *仅当消息来自群聊时可用*
>
> | 属性/方法 | 类型 | 说明 | 示例 |
> | ------------------------ | ----------------- | ------------------------------------- | --------------------------------------- |
> | `group.group_id` | `str` | 群组唯一ID | `gid = ctx.group.group_id` |
> | `group.nickname` | `Optional[str]` | 群名称自动从API获取 | `print(ctx.group.nickname)` |
> | `group.users` | `List[dict]` | 群成员列表 | `members = ctx.group.users` |
> | `group.current_user` | `User` | 当前发言用户(即 `ctx.user`的引用) | `sender = ctx.group.current_user` |
> | `group.send_message()` | `method` | **发送群消息** | `ctx.group.send_message("@all 通知")` |
> | `group.messages` | `List[dict]` | 群聊历史消息 | `last_msg = ctx.group.messages[-1]` |
在群聊消息中current_user中的message存储了用户在群里的近十条消息。
#### **4. 数据存储 (`ctx.chat_manager`)**
| 方法 | 参数 | 说明 | 示例 |
| --------------------------- | --------------------------------------------------------- | -------------------- | ---------------------------------------------------------------------------------- |
| `save_private_message()` | `user: User, role: str, content: str` | 保存私聊消息到数据库 | `ctx.chat_manager.save_private_message(ctx.user, "user", "hi")` |
| `save_group_message()` | `group: Group, role: str, content: str, sender_id: str` | 保存群消息 | `ctx.chat_manager.save_group_message(ctx.group, "user", "hi", ctx.user.user_id)` |
| `load_private_messages()` | `user: User` → `List[dict]` | 加载用户私聊历史 | `ctx.user.messages = ctx.chat_manager.load_private_messages(ctx.user)` |
| `load_group_messages()` | `group: Group` → `List[dict]` | 加载群聊历史 | `ctx.group.messages = ctx.chat_manager.load_group_messages(ctx.group)` |
#### **5. 插件配置 (`self.config`)**
| 方法/属性 | 说明 | 示例 |
| ------------------------------- | ---------------------------------------------------- | ------------------------------------------------- |
| `self.config` | **自动加载**的配置字典(来自 `config.toml` | `timeout = self.config.get("timeout", 30)` |
| `self.save_config()` | 保存修改后的配置 | `self.save_config({"key": "value"})` |
| `self._get_plugin_resource()` | 从插件ZIP包内读取文件 | `data = self._get_plugin_resource("data.json")` |
---
#### **6. 调试工具**
| 属性 | 类型 | 说明 |
| ------------------- | -------- | ------------------------------------------------------------- |
| `ctx.phase` | `str` | 当前处理阶段(`before_load`/`after_load`/`after_save` |
| `ctx.intercepted` | `bool` | 是否已被其他插件拦截 |
### **典型使用场景**
#### 场景1消息预处理
```python
def before_load(self):
if "admin" in ctx.raw_message:
if not self.check_admin(ctx.user.user_id):
ctx.response = "权限不足" # 拦截请求
return
```
#### 场景2响应生成
```python
def after_save(self):
if "天气" in ctx.raw_message:
city = extract_city(ctx.raw_message) # 自定义解析逻辑
ctx.response = get_weather(city) # 返回天气信息
```
### **注意事项**
1. **`ctx.group` 可能为 `None`**:私聊消息时需判空
```python
if ctx.group: # 群聊专属逻辑
```
2. **配置热更新**:修改 `self.config` 后需手动调用 `save_config()`
3. **大文件处理**:通过 `_get_plugin_resource()` 读取ZIP内资源避免解压

87
dependence.py Normal file
View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import os
import sys
import shutil
import subprocess
from pathlib import Path
def check_requirements_file(requirements_file="requirements.txt"):
"""检查 requirements.txt 是否存在"""
if not os.path.exists(requirements_file):
print(f"❌ 错误:未找到 {requirements_file}!请确保它在当前目录。")
print(f"👉 生成 requirements.txt 的方法:`pip freeze > requirements.txt`")
sys.exit(1)
def install_deps_to_folder(
requirements_file="requirements.txt",
target_dir="package",
upgrade=True,
no_deps=False
):
"""
安装依赖到目标文件夹
:param requirements_file: 依赖文件路径
:param target_dir: 目标目录(默认 package
:param upgrade: 是否更新已安装的包
:param no_deps: 是否跳过依赖包(仅安装直接依赖)
"""
target_path = Path(target_dir).resolve()
# 1. 如果目标文件夹已存在,提醒用户确认覆盖
if target_path.exists():
print(f"⚠️ 警告:目标文件夹 {target_path} 已存在!")
choice = input("是否删除并重建?(y/N) ").strip().lower()
if choice != "y":
print("⏹ 用户取消操作")
return False
shutil.rmtree(target_path)
target_path.mkdir(parents=True)
# 2. 构造 pip 安装命令
pip_cmd = [
sys.executable, "-m", "pip", "install",
"-r", requirements_file,
"--target", str(target_path),
]
if upgrade:
pip_cmd.append("--upgrade")
if no_deps:
pip_cmd.append("--no-deps")
# 3. 执行安装
print(f"\n📦 正在安装依赖到 {target_path} ...")
result = subprocess.run(pip_cmd, capture_output=True, text=True)
if result.returncode != 0:
print("\n❌ 安装失败!错误信息:")
print(result.stderr)
return False
# 4. 成功时打印提示
print("\n✅ 安装成功!依赖已保存到:")
print(f"{target_path}\n")
# 5. 打印后续使用说明
return True
def main():
check_requirements_file()
dir = "src"
dir = os.path.join(dir,"packages")
install_success = install_deps_to_folder(
requirements_file="requirements.txt",
target_dir=dir,
upgrade=True,
no_deps=False, # 设为 True 仅安装直接依赖
)
if install_success:
# 检查是否安装了 pip 依赖
package_size = sum(f.stat().size for f in Path("package").glob("**/*") if f.is_file())
print(f"📂 安装大小: {package_size / 1024 / 1024:.2f} MB")
else:
print("\n❌ 安装失败!请检查报错信息。")
if __name__ == "__main__":
main()

62
package.py Normal file
View File

@ -0,0 +1,62 @@
import os
import re
import shutil
import ast
from pathlib import Path
def find_plugin_class(process_file: str) -> str:
"""从process.py中找到继承BasePlugin的类名"""
with open(process_file, "r", encoding="utf-8") as f:
content = f.read()
# 使用AST解析Python代码比正则更可靠
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
# 检查是否继承BasePlugin
for base in node.bases:
if isinstance(base, ast.Name) and base.id == "BasePlugin":
return node.name
raise ValueError("❌ 未找到继承自BasePlugin的类请检查process.py")
def create_zip_package(class_name: str):
"""打包成ZIP文件类名全小写"""
src_path = Path("src")
zip_name = class_name.lower() # 类名转全小写作为ZIP名
temp_dir = Path(f"temp_{zip_name}") # 临时打包目录
# 1. 创建临时目录
temp_dir.mkdir(exist_ok=True)
# 2. 复制需要的文件
shutil.copytree(src_path / "packages", temp_dir / "packages")
shutil.copy(src_path / "process.py", temp_dir)
shutil.copy(src_path / "config.toml", temp_dir)
# 3. 生成ZIP
shutil.make_archive(f"dist/{zip_name}", "zip", temp_dir)
# 4. 清理临时目录
shutil.rmtree(temp_dir)
print(f"✅ 打包完成dist/{zip_name}.zip")
def main():
process_file = "src/process.py"
if not os.path.exists(process_file):
print(f"❌ 错误:{process_file} 文件不存在!")
return
try:
plugin_class = find_plugin_class(process_file)
print(f"找到插件类: {plugin_class} → 包名: {plugin_class.lower()}.zip")
Path("dist").mkdir(exist_ok=True) # 创建dist目录
create_zip_package(plugin_class)
except Exception as e:
print(f"❌ 打包失败:{e}")
if __name__ == "__main__":
main()

21
packup.bat Normal file
View File

@ -0,0 +1,21 @@
@echo off
REM 依次运行dependence.py和package.py的批处理脚本
echo 正在运行依赖安装脚本...
python dependence.py
if %ERRORLEVEL% neq 0 (
echo 运行dependence.py失败! 错误码: %ERRORLEVEL%
pause
exit /b %ERRORLEVEL%
)
echo 正在运行打包脚本...
python package.py
if %ERRORLEVEL% neq 0 (
echo 运行package.py失败! 错误码: %ERRORLEVEL%
pause
exit /b %ERRORLEVEL%
)
echo 所有脚本执行完毕!
pause

18
packup.sh Normal file
View File

@ -0,0 +1,18 @@
#!/bin/bash
# 依次运行dependence.py和package.py的shell脚本
echo "正在运行依赖安装脚本..."
python3 dependence.py
if [ $? -ne 0 ]; then
echo "运行dependence.py失败! 错误码: $?"
exit $?
fi
echo "正在运行打包脚本..."
python3 package.py
if [ $? -ne 0 ]; then
echo "运行package.py失败! 错误码: $?"
exit $?
fi
echo "所有脚本执行完毕!"

0
requirements.txt Normal file
View File

0
scripts/__init__.py Normal file
View File

65
scripts/file_store_api.py Normal file
View File

@ -0,0 +1,65 @@
import json
import os
import logging
import sqlite3
import os
import time
import toml
from pathlib import Path
from http import HTTPStatus
from datetime import datetime
class ConfigManager:
"""配置管理类,处理应用配置"""
def __init__(self, config_path="src"):
self.config = {}
self.config_path = config_path
self.build_config_dict()
def build_config_dict(self) -> dict[str, str]:
config_dict = {}
for config_file in Path(self.config_path).rglob("*.toml"):
if not config_file.is_file():
continue
# 获取相对路径的父目录名
rel_path = config_file.relative_to(self.config_path)
parent_name = rel_path.parent.name if rel_path.parent.name else None
if parent_name:
key = parent_name
else:
key = config_file.stem # 去掉扩展名
config_dict[key] = str(config_file.absolute())
self.config = config_dict
def load_config(self,name):
"""加载配置文件"""
if not os.path.exists(self.config[name]):
return {}
with open(self.config[name], 'r', encoding='utf-8') as f:
try:
return toml.load(f)
except toml.TomlDecodeError:
return {}
def save_config(self, key=None, value=None):
"""保存配置项"""
if key is not None and value is not None:
# 如果提供了 key 和 value则更新单个值
self.config[key] = value
with open(self.config_path, 'w', encoding='utf-8') as f:
toml.dump(self.config, f)
def update_config(self, config_dict):
"""更新配置字典"""
self.config.update(config_dict)
self.save_config()
class ChatManager:
def __init__(self):
return None

0
src/__init__.py Normal file
View File

2
src/config.toml Normal file
View File

@ -0,0 +1,2 @@
[test]
message = "helloworld"

0
src/modules/__init__.py Normal file
View File

View File

@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass
from src.modules import user_module as usermod
import scripts.file_store_api as file_M
class MessageContext:
"""封装消息处理的上下文数据"""
def __init__(self, uid: str, gid: Optional[str], raw_message: str,id:str):
self.raw_message = raw_message
self._processed = False
self.response: Optional[str] = None
# 核心服务实例化
self.chat_manager = file_M.ChatManager()
self.user = usermod.User(user_id=uid)
self.rebot_id = id
# 动态加载数据
if gid:
self.group = usermod.Group(group_id=gid)
self.group.current_user = self.user
else:
self.group = None
@dataclass
class PluginPermission:
access_private: bool = False # 允许处理私聊消息
access_group: bool = True # 允许处理群消息
read_history: bool = False # 允许读取历史记录
from pathlib import Path
import os
class BasePlugin(ABC):
def __init__(self, ctx: MessageContext):
self.ctx = ctx
self._config_manager = file_M.ConfigManager(self._get_plugin_config_path())
@property
def config(self) -> dict:
"""直接访问插件配置的快捷方式"""
return self._config_manager.load_config(self.name)
def _get_plugin_config_path(self) -> str:
"""获取插件配置目录路径config/插件名)"""
plugin_name = self.__class__.__name__.lower()
return str(Path("config") / plugin_name)
def save_config(self, config_dict: dict = None) -> bool:
"""快捷保存配置"""
if config_dict:
self._config_manager.update_config({"config": config_dict})
return self._config_manager.save_config()

View File

@ -0,0 +1,42 @@
import json
import time
class User:
def __init__(self, user_id):
self.user_id = user_id
self.nickname = "test"
self.messages = []
self.signal = True
def set_input_status(self, status):
payload = json.dumps({
"user_id": self.user_id,
"event_type": status
})
headers = {
'Content-Type': 'application/json'
}
while self.signal:
print(f"刷新 {self.nickname} 的输入状态为: {status}")
time.sleep(0.5)
def send_message(self, message):
print(f"send message{0}".format(message))
class Group:
def __init__(self, group_id,user=None,users=None):
self.group_id = group_id
self.current_user = user
self.nickname = "test"
self.users = users
self.get_group_users()
self.messages =[]
def get_group_users(self):
return(["user1","user2"])
def send_message(self,message):
print(f"send message{0}".format(message))

13
src/process.py Normal file
View File

@ -0,0 +1,13 @@
import threading
from src.modules.plugin_modules import BasePlugin, MessageContext
import time
class Hello_world(BasePlugin):
def before_load(self):#加载消息前
self.ctx.user.send_message(self.ctx.config.get("test").get("message"))
def after_load(self):#加载消息后
self.ctx.user.send_message(self.ctx.user.messages)
def after_save(self):#保存用户消息后
self.ctx.user.send_message("hello_world")
return "ok"

141
test.py Normal file
View File

@ -0,0 +1,141 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import os
import importlib
from pathlib import Path
from typing import Type, List
from unittest.mock import Mock
# 添加src目录到系统路径
# 导入基础类
from src.modules.plugin_modules import MessageContext, BasePlugin
class PluginTester:
"""
插件测试助手类
创建模拟上下文和测试配置环境
"""
def __init__(self, plugin_class: Type[BasePlugin], config_dir: str = "test_configs"):
self.plugin_class = plugin_class
self.config_dir = config_dir
os.makedirs(self.config_dir, exist_ok=True)
# 创建模拟上下文
self.mock_ctx = MessageContext(
uid="test_user",
gid="test_group",
raw_message="测试消息",
id="test_bot"
)
# 创建插件实例
self.plugin = self.plugin_class(self.mock_ctx)
def test_before_load(self):
"""测试before_load方法如果存在"""
if hasattr(self.plugin, 'before_load'):
print(f"\n正在测试 {self.plugin_class.__name__} 的 before_load")
try:
result = self.plugin.before_load()
print(f"before_load 执行成功,返回值: {result}")
return True
except Exception as e:
print(f"before_load 执行出错: {str(e)}")
return False
else:
print(f"\n{self.plugin_class.__name__} 中没有找到 before_load 方法")
return None
def test_after_load(self):
"""测试after_load方法如果存在"""
if hasattr(self.plugin, 'after_load'):
print(f"\n正在测试 {self.plugin_class.__name__} 的 after_load")
try:
result = self.plugin.after_load()
print(f"after_load 执行成功,返回值: {result}")
return True
except Exception as e:
print(f"after_load 执行出错: {str(e)}")
return False
else:
print(f"\n{self.plugin_class.__name__} 中没有找到 after_load 方法")
return None
def test_after_save(self):
"""测试after_save方法如果存在"""
if hasattr(self.plugin, 'after_save'):
print(f"\n正在测试 {self.plugin_class.__name__} 的 after_save")
try:
result = self.plugin.after_save()
print(f"after_save 执行成功,返回值: {result}")
return True
except Exception as e:
print(f"after_save 执行出错: {str(e)}")
return False
else:
print(f"\n{self.plugin_class.__name__} 中没有找到 after_save 方法")
return None
def find_plugin_classes(module_path: str) -> List[Type[BasePlugin]]:
"""
在process.py中查找所有继承自BasePlugin的类
返回类类型列表
"""
plugin_classes = []
# 动态导入模块
module_name = os.path.splitext(os.path.basename(module_path))[0]
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找所有继承自BasePlugin的类
for name, obj in vars(module).items():
try:
if isinstance(obj, type) and issubclass(obj, BasePlugin) and obj != BasePlugin:
plugin_classes.append(obj)
except TypeError:
continue
return plugin_classes
def main():
# process.py文件路径
process_path = Path("src/process.py")
if not process_path.exists():
print(f"错误: 在 {process_path} 没有找到 process.py")
return
# 查找process.py中的所有插件类
plugin_classes = find_plugin_classes(str(process_path))
if not plugin_classes:
print("在 process.py 中没有找到插件类")
return
print(f"发现了 {len(plugin_classes)} 个需要测试的插件类:")
for i, plugin_class in enumerate(plugin_classes, 1):
print(f"{i}. {plugin_class.__name__}")
# 测试每个插件类
for plugin_class in plugin_classes:
print(f"\n{'='*50}")
print(f"正在测试插件: {plugin_class.__name__}")
tester = PluginTester(plugin_class)
# 按照自然顺序测试生命周期方法
tester.test_before_load()
tester.test_after_load()
# 如果需要测试配置保存
if hasattr(plugin_class, 'after_save'):
tester.test_after_save()
print(f"{'='*50}\n")
if __name__ == "__main__":
main()