Files
chat_rebot-connect-with-one…/c/network/network.c

282 lines
7.8 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#define _GNU_SOURCE
#include "network.h"
#include "swap.h"
#include "http_rel.h"
#include "cJSON.h"
#include "tools/log/log.h"
#include "tools/quit/quit.h"
#include <semaphore.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stddef.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
static void safe_strcpy(char *dst, size_t dst_size, const char *src)
{
if (!src) { dst[0] = '\0'; return; }
size_t len = strlen(src);
if (len >= dst_size) len = dst_size - 1;
memcpy(dst, src, len);
dst[len] = '\0';
}
/* 主解析 */
int rbt_parse_json(const char *json_text, rbt_msg *out)
{
memset(out, 0, sizeof(*out)); // 统一清 0gid 天然 '\0'
cJSON *root = cJSON_Parse(json_text);
if (!root) return -1;
/* 1. 取群号(可能没有) */
cJSON *gid = cJSON_GetObjectItemCaseSensitive(root, "group_id");
if (cJSON_IsString(gid))
safe_strcpy(out->gid, sizeof(out->gid), gid->valuestring);
else if (cJSON_IsNumber(gid)) // 有些框架是数字
snprintf(out->gid, sizeof(out->gid), "%d", gid->valueint);
/* 2. 用户号 */
cJSON *uid = cJSON_GetObjectItemCaseSensitive(root, "user_id");
if (cJSON_IsString(uid))
safe_strcpy(out->uid, sizeof(out->uid), uid->valuestring);
else if (cJSON_IsNumber(uid))
snprintf(out->uid, sizeof(out->uid), "%d", uid->valueint);
/* 3. 昵称在 sender 对象里 */
cJSON *sender = cJSON_GetObjectItemCaseSensitive(root, "sender");
if (cJSON_IsObject(sender)) {
cJSON *nick = cJSON_GetObjectItemCaseSensitive(sender, "nickname");
safe_strcpy(out->nickname, sizeof(out->nickname),
cJSON_IsString(nick) ? nick->valuestring : NULL);
}
/* 4. 原始消息 */
cJSON *raw = cJSON_GetObjectItemCaseSensitive(root, "raw_message");
safe_strcpy(out->raw_message, sizeof(out->raw_message),
cJSON_IsString(raw) ? raw->valuestring : NULL);
/* 5. 消息类型 */
cJSON *type = cJSON_GetObjectItemCaseSensitive(root, "message_type");
if (cJSON_IsString(type)) {
if (strcmp(type->valuestring, "group") == 0)
out->message_type = 'g';
else if (strcmp(type->valuestring, "private") == 0)
out->message_type = 'p';
/* else 保持 0 */
}
cJSON_Delete(root);
return 0; // 成功
}
int init_network(int port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET; // 和 socket() 一致
addr.sin_port = htons(port); // 端口号必须网络字节序
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0:本机所有网卡
bind(fd, (struct sockaddr *)&addr, sizeof(addr));
listen(fd,10);
return fd;
}
ssize_t read_req(int fd, void *buf)
{
// TODO 修改读取任务函数
ssize_t n = read(fd, buf, MAX_MESSAGE_BUF);
if (n == 0) /* 写端已关闭,管道永不会再有数据 */
return -1;
return (n > 0) ? n : -1;
}
int process_message(char *req,log_manager *logger)
{
//TODO 修改管道命令解析
if(req[0]!='s')
return 0;
const char *body = http_get_body(req);
rbt_msg message;
rbt_parse_json(body,&message);
make_swap((void*)&message);
logs *log = malloc(sizeof log);
if(snprintf(log->log,sizeof(log->log),
"%s message %s processd ok\n",message.nickname,message.raw_message)<1024);
logger->in_log(log,logger);
}
int iss_work(netm *self,char *command)
{
int i = self->last_alc+1;
//查询空闲线程
while(self->pool[i].fifo_fd ==0)
{
if(i<MAX_POOL)
i++;
else
i=0;
}
//向空闲线程发送数据
write(self->pool[i].fifo_fd[0],command,strlen(command));
//设置线程程为working
atomic_fetch_sub(&self->pool[i].status,1);
self->last_alc = i;
}
void *pth_module(void *args_p)
{
net_args *argms = (net_args*)args_p;
pth_m *pmd = argms->pth;
log_manager *logger = argms->log;
//参数解析
char name[256] = {'\0'};
sprintf(name,"chatrebot%lu",pmd->pthread_id);
int swap = create_swap(name);
//创建共享内存
char swap_arg[64] = {'\0'};
sprintf(swap_arg,"%d",swap);
pid_t id = fork();
if(id == 0)
{
char *args[]={
"Pluginmanager",
"--swap",swap_arg,
NULL};
execv("Run_pluhginmanager",args);
}
logs *pth_log = (logs*)malloc(sizeof(logs));
sprintf(pth_log->log,"PID:%lu launched python plugines\n",pmd->pthread_id);
logger->in_log(pth_log,logger);
//拉起python插件管理器
for(;;){
//线程池中,单个线程模型
char req[64*1024];
//从管道中读取请求,并解析,无内容时休眠
int n = read_req(pmd->fifo_fd[0],req);
//管道关闭时退出;
if (n == EOF) {
return NULL;
break;
}
else{
process_message(req,logger);
atomic_fetch_add(&pmd->status, 1);
}
}
}
int start_pool(netm *self)
{
for(int i = 0;i<MAX_POOL;i++)
{
//为线程开辟管道
pipe(self->pool[i].fifo_fd);
//启动线程
net_args arg;
arg.pth =&self->pool[i];
arg.log = self->logmanager;
self->pool[i].status = 1;
pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)&arg);
}
}
int shutdown_pool(netm *self)
{
for(int i = 0;i<MAX_POOL;i++)
{
if(self->pool[i].status == -1)
continue;
self->pool[i].status = -1;
close(self->pool[i].fifo_fd[1]);
}
return 1;
}
int server_run(int port,int fifo_fd,netm *self)
{
int epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐
if (epfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
struct epoll_event ev;
//设置epoll同时监听控制管道与http请求
ev.events = EPOLLIN;
ev.data.fd = fifo_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_fd, &ev);
char iss_buf[256];
self->http_fd = init_network(port);
ev.data.fd = self->http_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, self->http_fd, &ev);
struct epoll_event events;
self->epoll_fd = epfd;
for(;;)
{
/*工作循环-----------------------------*/
int nf = epoll_wait(epfd,&events,1,-1);
if (nf == -1) {
perror("epoll_wait");
break;
}
if(events.data.fd ==self->http_fd)
{
int nt_fd = accept4(self->http_fd,NULL,NULL,SOCK_NONBLOCK | SOCK_CLOEXEC);
if(nt_fd == -1)
continue;
sprintf(iss_buf,"s/%d/e",nt_fd);
self->iss_work(self,iss_buf);
}
if(events.data.fd == fifo_fd)
{
char command;
while(read(fifo_fd,&command,1)==1)
{
switch(command){
case 'q':
//退出逻辑
quit_server(self);
return 1;
break;
case 'u':
//插件更新逻辑
break;
}
}
}
/*工作循环----------------------------*/
}
}
void *run_network(void *self_d)
{
netm *self = (netm*)self_d;
self->start_pool(self);
server_run(self->port,self->fifo_fd[0],self);
self->shutdown_pool(self);
}
int init_networkmanager(netm *self,int *fifo,log_manager *logmanager,int port)
{
self->run_network = run_network;
self->iss_work = iss_work;
self->start_pool = start_pool;
self->shutdown_pool = shutdown_pool;
//装载方法
self->fifo_fd[0]= fifo[0];
self->fifo_fd[1]= fifo[1];
self->last_alc = 0;
//初始化参数
self->logmanager = logmanager;
self->port = port;
}