Files
chat_rebot-connect-with-one…/src/modules/user_modules.py
JianFeeeee a0e9bf1b44 modified: src/mainprocess.py
modified:   src/modules/user_modules.py
modified:   src/plugin_manager.py
2025-08-17 11:27:07 +08:00

127 lines
5.0 KiB
Python

import json
import requests
import time
import src.file_store_api as filer
config_m = filer.ConfigManager()
url = config_m.load_config("config")
mainurl = url.get("app").get("send_url")
class User:
def __init__(self, user_id,url = mainurl):
self.user_id = user_id
self.url = url
self.nickname = None
self.get_name()
self.messages = []
self.signal = True
self.db = filer.ChatManager()
def get_name(self):
"""
获取用户的名称
"""
try:
response = requests.post('{0}/ArkSharePeer'.format(self.url),json={'user_id':self.user_id})
except:
return 0
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误信息
if response_data.get("status") == "ok":
# 获取用户卡片信息
ark_json = response_data.get("data", {})
ark_msg_str = ark_json.get("arkMsg", "{}")
try:
ark_json = json.loads(ark_msg_str) # 字符串转字典
except json.JSONDecodeError:
ark_json = {}
user_nick = ark_json.get("meta", {}).get("contact", {}).get("nickname")
self.nickname = user_nick
else:
print(f"请求失败,错误信息: {response_data.get('errMsg')}")
else:
print(f"请求失败,状态码: {response.status_code}")
def set_input_status(self, status):
payload = json.dumps({
"user_id": self.user_id,
"event_type": status
})
headers = {
'Content-Type': 'application/json'
}
while self.signal:
print(f"刷新 {self.nickname} 的输入状态为: {status}")
requests.request("POST","{0}/set_input_status".format(self.url), headers=headers, data=payload)
time.sleep(0.5)
def send_message(self, message):#发送消息
requests.post(url='{0}/send_private_msg'.format(self.url), json={'user_id':self.user_id, 'message':message})
self.db.save_private_message(self,role = 'assistant',content=message)#保存发送的消息
def send_file(self,dir,name):#上传文件
requests.post(url='{0}/upload_private_file'.format(self.url), json={'user_id':self.user_id, 'file':dir,"name":name})
self.db.save_private_message(self,role = 'assistant',content="发送了名为{name}的文件")
class Group:
def __init__(self, group_id,url = mainurl,user=None,users=None):
self.url = url
self.group_id = group_id
self.current_user = user
self.nickname = None
self.get_group_name()
self.users = users
self.get_group_users()
self.messages =[]
self.db = filer.ChatManager()
def get_group_name(self):
"""
获取群组的名称
"""
try:
response = requests.post('{0}/ArkSharePeer'.format(self.url), json={'group_id': self.group_id})
except:
return 0
if response.status_code == 200:
response_data = response.json()
if response_data.get("status") == "ok":
ark_json = response_data.get("data", {})
ark_msg_str = ark_json.get("arkJson", "{}")
try:
ark_json = json.loads(ark_msg_str)
except json.JSONDecodeError:
ark_json = {}
group_name = ark_json.get("meta", {}).get("contact", {}).get("nickname")
self.nickname = group_name
else:
print(f"请求失败,错误信息: {response_data.get('errMsg')}")
else:
print(f"请求失败,状态码: {response.status_code}")
def get_group_users(self):
try:
response = requests.post('{0}/get_group_member_list'.format(self.url), json={'group_id':self.group_id,'no_cache': False})
except:
return 0
if response.status_code == 200:
response_data = response.json()
if response_data.get("status") == "ok":
group_users = response_data.get("data", {})
self.users = group_users
else:
print(f"请求失败,错误信息: {response_data.get('errMsg')}")
else:
print(f"请求失败,状态码: {response.status_code}")
def send_message(self,message):#发送消息
requests.post(url='{0}/send_group_msg'.format(self.url), json={'group_id': self.group_id, 'message': message})
self.db.save_group_message(self,'assistant',message, sender_id=0)#保存发送的消息
def upload_file(self,dir,name,id):#上传文件
requests.post(url='{0}/upload_group_file'.format(self.url), json={'group_id': self.group_id, 'file': dir , "name": name,"folder_id": id})
self.db.save_group_message(self,'assistant',"上传了名为{name}的文件到群文件夹{id}", sender_id=0)#保存发送的消息