new file: dependence.py
new file: package.py new file: packup.bat new file: packup.sh new file: requirements.txt new file: scripts/__init__.py new file: scripts/file_store_api.py new file: src/__init__.py new file: src/config.toml new file: src/modules/__init__.py new file: src/modules/plugin_modules.py new file: src/modules/user_module.py new file: src/process.py new file: test.py
This commit is contained in:
87
dependence.py
Normal file
87
dependence.py
Normal file
@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
def check_requirements_file(requirements_file="requirements.txt"):
|
||||
"""检查 requirements.txt 是否存在"""
|
||||
if not os.path.exists(requirements_file):
|
||||
print(f"❌ 错误:未找到 {requirements_file}!请确保它在当前目录。")
|
||||
print(f"👉 生成 requirements.txt 的方法:`pip freeze > requirements.txt`")
|
||||
sys.exit(1)
|
||||
|
||||
def install_deps_to_folder(
|
||||
requirements_file="requirements.txt",
|
||||
target_dir="package",
|
||||
upgrade=True,
|
||||
no_deps=False
|
||||
):
|
||||
"""
|
||||
安装依赖到目标文件夹
|
||||
:param requirements_file: 依赖文件路径
|
||||
:param target_dir: 目标目录(默认 package)
|
||||
:param upgrade: 是否更新已安装的包
|
||||
:param no_deps: 是否跳过依赖包(仅安装直接依赖)
|
||||
"""
|
||||
target_path = Path(target_dir).resolve()
|
||||
|
||||
# 1. 如果目标文件夹已存在,提醒用户确认覆盖
|
||||
if target_path.exists():
|
||||
print(f"⚠️ 警告:目标文件夹 {target_path} 已存在!")
|
||||
choice = input("是否删除并重建?(y/N) ").strip().lower()
|
||||
if choice != "y":
|
||||
print("⏹ 用户取消操作")
|
||||
return False
|
||||
shutil.rmtree(target_path)
|
||||
|
||||
target_path.mkdir(parents=True)
|
||||
|
||||
# 2. 构造 pip 安装命令
|
||||
pip_cmd = [
|
||||
sys.executable, "-m", "pip", "install",
|
||||
"-r", requirements_file,
|
||||
"--target", str(target_path),
|
||||
]
|
||||
|
||||
if upgrade:
|
||||
pip_cmd.append("--upgrade")
|
||||
if no_deps:
|
||||
pip_cmd.append("--no-deps")
|
||||
|
||||
# 3. 执行安装
|
||||
print(f"\n📦 正在安装依赖到 {target_path} ...")
|
||||
result = subprocess.run(pip_cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
print("\n❌ 安装失败!错误信息:")
|
||||
print(result.stderr)
|
||||
return False
|
||||
|
||||
# 4. 成功时打印提示
|
||||
print("\n✅ 安装成功!依赖已保存到:")
|
||||
print(f" → {target_path}\n")
|
||||
# 5. 打印后续使用说明
|
||||
return True
|
||||
|
||||
def main():
|
||||
check_requirements_file()
|
||||
dir = "src"
|
||||
dir = os.path.join(dir,"packages")
|
||||
install_success = install_deps_to_folder(
|
||||
requirements_file="requirements.txt",
|
||||
target_dir=dir,
|
||||
upgrade=True,
|
||||
no_deps=False, # 设为 True 仅安装直接依赖
|
||||
)
|
||||
|
||||
if install_success:
|
||||
# 检查是否安装了 pip 依赖
|
||||
package_size = sum(f.stat().size for f in Path("package").glob("**/*") if f.is_file())
|
||||
print(f"📂 安装大小: {package_size / 1024 / 1024:.2f} MB")
|
||||
else:
|
||||
print("\n❌ 安装失败!请检查报错信息。")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
62
package.py
Normal file
62
package.py
Normal file
@ -0,0 +1,62 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import ast
|
||||
from pathlib import Path
|
||||
|
||||
def find_plugin_class(process_file: str) -> str:
|
||||
"""从process.py中找到继承BasePlugin的类名"""
|
||||
with open(process_file, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
# 使用AST解析Python代码,比正则更可靠
|
||||
tree = ast.parse(content)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
# 检查是否继承BasePlugin
|
||||
for base in node.bases:
|
||||
if isinstance(base, ast.Name) and base.id == "BasePlugin":
|
||||
return node.name
|
||||
|
||||
raise ValueError("❌ 未找到继承自BasePlugin的类!请检查process.py")
|
||||
|
||||
def create_zip_package(class_name: str):
|
||||
"""打包成ZIP文件,类名全小写"""
|
||||
src_path = Path("src")
|
||||
zip_name = class_name.lower() # 类名转全小写作为ZIP名
|
||||
temp_dir = Path(f"temp_{zip_name}") # 临时打包目录
|
||||
|
||||
# 1. 创建临时目录
|
||||
temp_dir.mkdir(exist_ok=True)
|
||||
|
||||
# 2. 复制需要的文件
|
||||
shutil.copytree(src_path / "packages", temp_dir / "packages")
|
||||
shutil.copy(src_path / "process.py", temp_dir)
|
||||
shutil.copy(src_path / "config.toml", temp_dir)
|
||||
|
||||
# 3. 生成ZIP
|
||||
shutil.make_archive(f"dist/{zip_name}", "zip", temp_dir)
|
||||
|
||||
# 4. 清理临时目录
|
||||
shutil.rmtree(temp_dir)
|
||||
print(f"✅ 打包完成:dist/{zip_name}.zip")
|
||||
|
||||
def main():
|
||||
process_file = "src/process.py"
|
||||
if not os.path.exists(process_file):
|
||||
print(f"❌ 错误:{process_file} 文件不存在!")
|
||||
return
|
||||
|
||||
try:
|
||||
plugin_class = find_plugin_class(process_file)
|
||||
print(f"找到插件类: {plugin_class} → 包名: {plugin_class.lower()}.zip")
|
||||
|
||||
Path("dist").mkdir(exist_ok=True) # 创建dist目录
|
||||
create_zip_package(plugin_class)
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 打包失败:{e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
21
packup.bat
Normal file
21
packup.bat
Normal file
@ -0,0 +1,21 @@
|
||||
@echo off
|
||||
REM 依次运行dependence.py和package.py的批处理脚本
|
||||
|
||||
echo 正在运行依赖安装脚本...
|
||||
python dependence.py
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo 运行dependence.py失败! 错误码: %ERRORLEVEL%
|
||||
pause
|
||||
exit /b %ERRORLEVEL%
|
||||
)
|
||||
|
||||
echo 正在运行打包脚本...
|
||||
python package.py
|
||||
if %ERRORLEVEL% neq 0 (
|
||||
echo 运行package.py失败! 错误码: %ERRORLEVEL%
|
||||
pause
|
||||
exit /b %ERRORLEVEL%
|
||||
)
|
||||
|
||||
echo 所有脚本执行完毕!
|
||||
pause
|
18
packup.sh
Normal file
18
packup.sh
Normal file
@ -0,0 +1,18 @@
|
||||
#!/bin/bash
|
||||
# 依次运行dependence.py和package.py的shell脚本
|
||||
|
||||
echo "正在运行依赖安装脚本..."
|
||||
python3 dependence.py
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "运行dependence.py失败! 错误码: $?"
|
||||
exit $?
|
||||
fi
|
||||
|
||||
echo "正在运行打包脚本..."
|
||||
python3 package.py
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "运行package.py失败! 错误码: $?"
|
||||
exit $?
|
||||
fi
|
||||
|
||||
echo "所有脚本执行完毕!"
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@ -0,0 +1 @@
|
||||
rcon
|
0
scripts/__init__.py
Normal file
0
scripts/__init__.py
Normal file
65
scripts/file_store_api.py
Normal file
65
scripts/file_store_api.py
Normal file
@ -0,0 +1,65 @@
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import sqlite3
|
||||
import os
|
||||
import time
|
||||
import toml
|
||||
from pathlib import Path
|
||||
from http import HTTPStatus
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class ConfigManager:
|
||||
"""配置管理类,处理应用配置"""
|
||||
def __init__(self, config_path="src"):
|
||||
self.config = {}
|
||||
self.config_path = config_path
|
||||
self.build_config_dict()
|
||||
|
||||
|
||||
def build_config_dict(self) -> dict[str, str]:
|
||||
config_dict = {}
|
||||
for config_file in Path(self.config_path).rglob("*.toml"):
|
||||
if not config_file.is_file():
|
||||
continue
|
||||
|
||||
# 获取相对路径的父目录名
|
||||
rel_path = config_file.relative_to(self.config_path)
|
||||
parent_name = rel_path.parent.name if rel_path.parent.name else None
|
||||
|
||||
if parent_name:
|
||||
key = parent_name
|
||||
else:
|
||||
key = config_file.stem # 去掉扩展名
|
||||
|
||||
config_dict[key] = str(config_file.absolute())
|
||||
self.config = config_dict
|
||||
|
||||
def load_config(self,name):
|
||||
"""加载配置文件"""
|
||||
if not os.path.exists(self.config[name]):
|
||||
return {}
|
||||
with open(self.config[name], 'r', encoding='utf-8') as f:
|
||||
try:
|
||||
return toml.load(f)
|
||||
except toml.TomlDecodeError:
|
||||
return {}
|
||||
|
||||
def save_config(self, key=None, value=None):
|
||||
"""保存配置项"""
|
||||
if key is not None and value is not None:
|
||||
# 如果提供了 key 和 value,则更新单个值
|
||||
self.config[key] = value
|
||||
with open(self.config_path, 'w', encoding='utf-8') as f:
|
||||
toml.dump(self.config, f)
|
||||
|
||||
def update_config(self, config_dict):
|
||||
"""更新配置字典"""
|
||||
self.config.update(config_dict)
|
||||
self.save_config()
|
||||
|
||||
|
||||
class ChatManager:
|
||||
def __init__(self):
|
||||
return None
|
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
7
src/config.toml
Normal file
7
src/config.toml
Normal file
@ -0,0 +1,7 @@
|
||||
[mc_rcon]
|
||||
host = "127.0.0.1"
|
||||
port = 25570
|
||||
password = "233719"
|
||||
|
||||
[group]
|
||||
id = 814734369
|
0
src/modules/__init__.py
Normal file
0
src/modules/__init__.py
Normal file
55
src/modules/plugin_modules.py
Normal file
55
src/modules/plugin_modules.py
Normal file
@ -0,0 +1,55 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
from dataclasses import dataclass
|
||||
from src.modules import user_module as usermod
|
||||
import scripts.file_store_api as file_M
|
||||
|
||||
class MessageContext:
|
||||
"""封装消息处理的上下文数据"""
|
||||
def __init__(self, uid: str, gid: Optional[str], raw_message: str,id:str):
|
||||
self.raw_message = raw_message
|
||||
self._processed = False
|
||||
self.response: Optional[str] = None
|
||||
# 核心服务实例化
|
||||
self.chat_manager = file_M.ChatManager()
|
||||
self.user = usermod.User(user_id=uid)
|
||||
self.rebot_id = id
|
||||
|
||||
# 动态加载数据
|
||||
if gid:
|
||||
self.group = usermod.Group(group_id=gid)
|
||||
self.group.current_user = self.user
|
||||
else:
|
||||
self.group = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PluginPermission:
|
||||
access_private: bool = False # 允许处理私聊消息
|
||||
access_group: bool = True # 允许处理群消息
|
||||
read_history: bool = False # 允许读取历史记录
|
||||
|
||||
from pathlib import Path
|
||||
import os
|
||||
|
||||
class BasePlugin(ABC):
|
||||
def __init__(self, ctx: MessageContext):
|
||||
self.ctx = ctx
|
||||
self._config_manager = file_M.ConfigManager(self._get_plugin_config_path())
|
||||
|
||||
@property
|
||||
def config(self) -> dict:
|
||||
"""直接访问插件配置的快捷方式"""
|
||||
return self._config_manager.load_config(self.name)
|
||||
|
||||
def _get_plugin_config_path(self) -> str:
|
||||
"""获取插件配置目录路径(config/插件名)"""
|
||||
plugin_name = self.__class__.__name__.lower()
|
||||
|
||||
return str(Path("config") / plugin_name)
|
||||
|
||||
def save_config(self, config_dict: dict = None) -> bool:
|
||||
"""快捷保存配置"""
|
||||
if config_dict:
|
||||
self._config_manager.update_config({"config": config_dict})
|
||||
return self._config_manager.save_config()
|
42
src/modules/user_module.py
Normal file
42
src/modules/user_module.py
Normal file
@ -0,0 +1,42 @@
|
||||
import json
|
||||
import time
|
||||
|
||||
|
||||
|
||||
class User:
|
||||
def __init__(self, user_id):
|
||||
self.user_id = user_id
|
||||
self.nickname = "test"
|
||||
self.messages = []
|
||||
self.signal = True
|
||||
|
||||
def set_input_status(self, status):
|
||||
payload = json.dumps({
|
||||
"user_id": self.user_id,
|
||||
"event_type": status
|
||||
})
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
while self.signal:
|
||||
print(f"刷新 {self.nickname} 的输入状态为: {status}")
|
||||
time.sleep(0.5)
|
||||
|
||||
def send_message(self, message):
|
||||
print(f"send message{0}".format(message))
|
||||
|
||||
class Group:
|
||||
def __init__(self, group_id,user=None,users=None):
|
||||
self.group_id = group_id
|
||||
self.current_user = user
|
||||
self.nickname = "test"
|
||||
self.users = users
|
||||
self.get_group_users()
|
||||
self.messages =[]
|
||||
|
||||
|
||||
def get_group_users(self):
|
||||
return(["user1","user2"])
|
||||
|
||||
def send_message(self,message):
|
||||
print(f"send message{0}".format(message))
|
51
src/process.py
Normal file
51
src/process.py
Normal file
@ -0,0 +1,51 @@
|
||||
import threading
|
||||
from src.modules.plugin_modules import BasePlugin, MessageContext
|
||||
import time
|
||||
|
||||
from rcon import Client
|
||||
import time
|
||||
# 服务器IP、端口、RCON密码
|
||||
|
||||
def send_message(message,host,port,password):
|
||||
"""
|
||||
发送消息到Minecraft服务器
|
||||
:param message: 要发送的消息列表
|
||||
"""
|
||||
try:
|
||||
with Client(host, passwd = password, port = port) as mcr:
|
||||
mcr.run('/say {}'.format(message))
|
||||
|
||||
|
||||
except ConnectionRefusedError as e:
|
||||
print("连接被拒绝,请检查服务器是否运行和RCON配置是否正确。错误信息:", e)
|
||||
except TimeoutError:
|
||||
print("连接超时,请检查服务器网络连接是否正常。")
|
||||
except Exception as e:
|
||||
print("发生未知错误:", e)
|
||||
|
||||
|
||||
class Mc_rcon(BasePlugin):
|
||||
def before_load(self):#加载消息前
|
||||
config = self.config
|
||||
gid = config.get("group").get("id")
|
||||
config = config.get("mc_rcon")
|
||||
if self.ctx.group is None:
|
||||
return "ok"
|
||||
print(self.ctx.group.group_id)
|
||||
if self.ctx.group.group_id == gid:#转发群消息至 Minecraft
|
||||
if "CQ:image" in self.ctx.raw_message:
|
||||
mesg = f"{self.ctx.user.nickname} 发送了一张图片"
|
||||
print(f"转发消息至 Minecraft: {mesg}")
|
||||
elif "CQ:video" in self.ctx.raw_message:
|
||||
mesg = f"{self.ctx.user.nickname} 发送了一段视频"
|
||||
print(f"转发消息至 Minecraft: {mesg}")
|
||||
elif "CQ:record" in self.ctx.raw_message:
|
||||
mesg = f"{self.ctx.user.nickname} 发送了一段语音"
|
||||
print(f"转发消息至 Minecraft: {mesg}")
|
||||
elif "CQ:file" in self.ctx.raw_message:
|
||||
mesg = f"{self.ctx.user.nickname} 发送了一个文件"
|
||||
print(f"转发消息至 Minecraft: {mesg}")
|
||||
else:
|
||||
print(f"转发消息至 Minecraft: {self.ctx.raw_message}")
|
||||
mesg = f"{self.ctx.user.nickname} 说 {self.ctx.raw_message}"
|
||||
send_message(message = mesg,host = config.get("host"),port = config.get("port"),password = config.get("password"))
|
141
test.py
Normal file
141
test.py
Normal file
@ -0,0 +1,141 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import sys
|
||||
import os
|
||||
import importlib
|
||||
from pathlib import Path
|
||||
from typing import Type, List
|
||||
from unittest.mock import Mock
|
||||
|
||||
# 添加src目录到系统路径
|
||||
|
||||
# 导入基础类
|
||||
from src.modules.plugin_modules import MessageContext, BasePlugin
|
||||
|
||||
class PluginTester:
|
||||
"""
|
||||
插件测试助手类
|
||||
创建模拟上下文和测试配置环境
|
||||
"""
|
||||
def __init__(self, plugin_class: Type[BasePlugin], config_dir: str = "test_configs"):
|
||||
self.plugin_class = plugin_class
|
||||
self.config_dir = config_dir
|
||||
os.makedirs(self.config_dir, exist_ok=True)
|
||||
|
||||
# 创建模拟上下文
|
||||
self.mock_ctx = MessageContext(
|
||||
uid="test_user",
|
||||
gid="test_group",
|
||||
raw_message="测试消息",
|
||||
id="test_bot"
|
||||
)
|
||||
|
||||
# 创建插件实例
|
||||
self.plugin = self.plugin_class(self.mock_ctx)
|
||||
|
||||
def test_before_load(self):
|
||||
"""测试before_load方法(如果存在)"""
|
||||
if hasattr(self.plugin, 'before_load'):
|
||||
print(f"\n正在测试 {self.plugin_class.__name__} 的 before_load")
|
||||
try:
|
||||
result = self.plugin.before_load()
|
||||
print(f"before_load 执行成功,返回值: {result}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"before_load 执行出错: {str(e)}")
|
||||
return False
|
||||
else:
|
||||
print(f"\n{self.plugin_class.__name__} 中没有找到 before_load 方法")
|
||||
return None
|
||||
|
||||
def test_after_load(self):
|
||||
"""测试after_load方法(如果存在)"""
|
||||
if hasattr(self.plugin, 'after_load'):
|
||||
print(f"\n正在测试 {self.plugin_class.__name__} 的 after_load")
|
||||
try:
|
||||
result = self.plugin.after_load()
|
||||
print(f"after_load 执行成功,返回值: {result}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"after_load 执行出错: {str(e)}")
|
||||
return False
|
||||
else:
|
||||
print(f"\n{self.plugin_class.__name__} 中没有找到 after_load 方法")
|
||||
return None
|
||||
|
||||
def test_after_save(self):
|
||||
"""测试after_save方法(如果存在)"""
|
||||
if hasattr(self.plugin, 'after_save'):
|
||||
print(f"\n正在测试 {self.plugin_class.__name__} 的 after_save")
|
||||
try:
|
||||
result = self.plugin.after_save()
|
||||
print(f"after_save 执行成功,返回值: {result}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"after_save 执行出错: {str(e)}")
|
||||
return False
|
||||
else:
|
||||
print(f"\n{self.plugin_class.__name__} 中没有找到 after_save 方法")
|
||||
return None
|
||||
|
||||
def find_plugin_classes(module_path: str) -> List[Type[BasePlugin]]:
|
||||
"""
|
||||
在process.py中查找所有继承自BasePlugin的类
|
||||
返回类类型列表
|
||||
"""
|
||||
plugin_classes = []
|
||||
|
||||
# 动态导入模块
|
||||
module_name = os.path.splitext(os.path.basename(module_path))[0]
|
||||
spec = importlib.util.spec_from_file_location(module_name, module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
|
||||
# 查找所有继承自BasePlugin的类
|
||||
for name, obj in vars(module).items():
|
||||
try:
|
||||
if isinstance(obj, type) and issubclass(obj, BasePlugin) and obj != BasePlugin:
|
||||
plugin_classes.append(obj)
|
||||
except TypeError:
|
||||
continue
|
||||
|
||||
return plugin_classes
|
||||
|
||||
def main():
|
||||
# process.py文件路径
|
||||
process_path = Path("src/process.py")
|
||||
|
||||
if not process_path.exists():
|
||||
print(f"错误: 在 {process_path} 没有找到 process.py")
|
||||
return
|
||||
|
||||
# 查找process.py中的所有插件类
|
||||
plugin_classes = find_plugin_classes(str(process_path))
|
||||
|
||||
if not plugin_classes:
|
||||
print("在 process.py 中没有找到插件类")
|
||||
return
|
||||
|
||||
print(f"发现了 {len(plugin_classes)} 个需要测试的插件类:")
|
||||
for i, plugin_class in enumerate(plugin_classes, 1):
|
||||
print(f"{i}. {plugin_class.__name__}")
|
||||
|
||||
# 测试每个插件类
|
||||
for plugin_class in plugin_classes:
|
||||
print(f"\n{'='*50}")
|
||||
print(f"正在测试插件: {plugin_class.__name__}")
|
||||
|
||||
tester = PluginTester(plugin_class)
|
||||
|
||||
# 按照自然顺序测试生命周期方法
|
||||
tester.test_before_load()
|
||||
tester.test_after_load()
|
||||
|
||||
# 如果需要测试配置保存
|
||||
if hasattr(plugin_class, 'after_save'):
|
||||
tester.test_after_save()
|
||||
|
||||
print(f"{'='*50}\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user