278 lines
7.7 KiB
C
278 lines
7.7 KiB
C
#define _GNU_SOURCE
|
||
|
||
#include "network.h"
|
||
#include "swap.h"
|
||
#include "http_rel.h"
|
||
#include "cJSON.h"
|
||
#include "tools/log/log.h"
|
||
#include "tools/quit/quit.h"
|
||
#include <semaphore.h>
|
||
#include <unistd.h>
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <string.h>
|
||
#include <stddef.h>
|
||
#include <errno.h>
|
||
#include <sys/socket.h>
|
||
#include <netinet/in.h>
|
||
#include <sys/epoll.h>
|
||
|
||
static void safe_strcpy(char *dst, size_t dst_size, const char *src)
|
||
{
|
||
if (!src) { dst[0] = '\0'; return; }
|
||
size_t len = strlen(src);
|
||
if (len >= dst_size) len = dst_size - 1;
|
||
memcpy(dst, src, len);
|
||
dst[len] = '\0';
|
||
}
|
||
|
||
/* 主解析 */
|
||
int rbt_parse_json(const char *json_text, rbt_msg *out)
|
||
{
|
||
memset(out, 0, sizeof(*out)); // 统一清 0,gid 天然 '\0'
|
||
|
||
cJSON *root = cJSON_Parse(json_text);
|
||
if (!root) return -1;
|
||
|
||
/* 1. 取群号(可能没有) */
|
||
cJSON *gid = cJSON_GetObjectItemCaseSensitive(root, "group_id");
|
||
if (cJSON_IsString(gid))
|
||
safe_strcpy(out->gid, sizeof(out->gid), gid->valuestring);
|
||
else if (cJSON_IsNumber(gid)) // 有些框架是数字
|
||
snprintf(out->gid, sizeof(out->gid), "%d", gid->valueint);
|
||
|
||
/* 2. 用户号 */
|
||
cJSON *uid = cJSON_GetObjectItemCaseSensitive(root, "user_id");
|
||
if (cJSON_IsString(uid))
|
||
safe_strcpy(out->uid, sizeof(out->uid), uid->valuestring);
|
||
else if (cJSON_IsNumber(uid))
|
||
snprintf(out->uid, sizeof(out->uid), "%d", uid->valueint);
|
||
|
||
/* 3. 昵称在 sender 对象里 */
|
||
cJSON *sender = cJSON_GetObjectItemCaseSensitive(root, "sender");
|
||
if (cJSON_IsObject(sender)) {
|
||
cJSON *nick = cJSON_GetObjectItemCaseSensitive(sender, "nickname");
|
||
safe_strcpy(out->nickname, sizeof(out->nickname),
|
||
cJSON_IsString(nick) ? nick->valuestring : NULL);
|
||
}
|
||
|
||
/* 4. 原始消息 */
|
||
cJSON *raw = cJSON_GetObjectItemCaseSensitive(root, "raw_message");
|
||
safe_strcpy(out->raw_message, sizeof(out->raw_message),
|
||
cJSON_IsString(raw) ? raw->valuestring : NULL);
|
||
|
||
/* 5. 消息类型 */
|
||
cJSON *type = cJSON_GetObjectItemCaseSensitive(root, "message_type");
|
||
if (cJSON_IsString(type)) {
|
||
if (strcmp(type->valuestring, "group") == 0)
|
||
out->message_type = 'g';
|
||
else if (strcmp(type->valuestring, "private") == 0)
|
||
out->message_type = 'p';
|
||
/* else 保持 0 */
|
||
}
|
||
|
||
cJSON_Delete(root);
|
||
return 0; // 成功
|
||
}
|
||
|
||
int init_network(int port)
|
||
{
|
||
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
||
struct sockaddr_in addr = {0};
|
||
addr.sin_family = AF_INET; // 和 socket() 一致
|
||
addr.sin_port = htons(port); // 端口号必须网络字节序
|
||
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0:本机所有网卡
|
||
bind(fd, (struct sockaddr *)&addr, sizeof(addr));
|
||
listen(fd,10);
|
||
return fd;
|
||
}
|
||
|
||
ssize_t read_req(int fd, void *buf)
|
||
{
|
||
ssize_t n = read(fd, buf, MAX_MESSAGE_BUF);
|
||
if (n == 0) /* 写端已关闭,管道永不会再有数据 */
|
||
return -1;
|
||
return (n > 0) ? n : -1;
|
||
}
|
||
|
||
int process_message(char *req,log_manager *logger)
|
||
{
|
||
const char *body = http_get_body(req);
|
||
rbt_msg message;
|
||
rbt_parse_json(body,&message);
|
||
make_swap((void*)&message);
|
||
logs *log = malloc(sizeof log);
|
||
if(snprintf(log->log,sizeof(log->log),
|
||
"%s message %s processd ok\n",message.nickname,message.raw_message)<1024);
|
||
logger->in_log(log,logger);
|
||
}
|
||
|
||
int iss_work(netm *self,char *command)
|
||
{
|
||
int i = self->last_alc+1;
|
||
//查询空闲线程
|
||
while(self->pool[i].fifo_fd ==0)
|
||
{
|
||
if(i<MAX_POOL)
|
||
i++;
|
||
else
|
||
i=0;
|
||
}
|
||
//向空闲线程发送数据
|
||
write(self->pool[i].fifo_fd[0],command,strlen(command));
|
||
//设置线程程为working
|
||
atomic_fetch_sub(&self->pool[i].status,1);
|
||
self->last_alc = i;
|
||
}
|
||
|
||
void *pth_module(void *args_p)
|
||
{
|
||
net_args *argms = (net_args*)args_p;
|
||
pth_m *pmd = argms->pth;
|
||
log_manager *logger = argms->log;
|
||
//参数解析
|
||
char name[256] = {'\0'};
|
||
sprintf(name,"chatrebot%lu",pmd->pthread_id);
|
||
int swap = create_swap(name);
|
||
//创建共享内存
|
||
char swap_arg[64] = {'\0'};
|
||
sprintf(swap_arg,"%d",swap);
|
||
pid_t id = fork();
|
||
if(id == 0)
|
||
{
|
||
char *args[]={
|
||
"Pluginmanager",
|
||
"--swap",swap_arg,
|
||
NULL};
|
||
execv("Run_pluhginmanager",args);
|
||
}
|
||
logs *pth_log = (logs*)malloc(sizeof(logs));
|
||
sprintf(pth_log->log,"PID:%lu launched python plugines\n",pmd->pthread_id);
|
||
|
||
logger->in_log(pth_log,logger);
|
||
//拉起python插件管理器
|
||
for(;;){
|
||
//线程池中,单个线程模型
|
||
|
||
char req[64*1024];
|
||
//从管道中读取请求,并解析,无内容时休眠
|
||
int n = read_req(pmd->fifo_fd[0],req);
|
||
//管道关闭时退出;
|
||
|
||
if (n == EOF) {
|
||
return NULL;
|
||
break;
|
||
}
|
||
else{
|
||
process_message(req,logger);
|
||
atomic_fetch_add(&pmd->status, 1);
|
||
}
|
||
}
|
||
}
|
||
|
||
int start_pool(netm *self)
|
||
{
|
||
for(int i = 0;i<MAX_POOL;i++)
|
||
{
|
||
//为线程开辟管道
|
||
pipe(self->pool[i].fifo_fd);
|
||
//启动线程
|
||
net_args arg;
|
||
arg.pth =&self->pool[i];
|
||
arg.log = self->logmanager;
|
||
self->pool[i].status = 1;
|
||
pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)&arg);
|
||
}
|
||
}
|
||
|
||
int shutdown_pool(netm *self)
|
||
{
|
||
for(int i = 0;i<MAX_POOL;i++)
|
||
{
|
||
if(self->pool[i].status == -1)
|
||
continue;
|
||
self->pool[i].status = -1;
|
||
close(self->pool[i].fifo_fd[1]);
|
||
}
|
||
return 1;
|
||
}
|
||
|
||
int server_run(int port,int fifo_fd,netm *self)
|
||
{
|
||
int epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐
|
||
if (epfd == -1) {
|
||
perror("epoll_create1");
|
||
exit(EXIT_FAILURE);
|
||
}
|
||
struct epoll_event ev;
|
||
//设置epoll同时监听控制管道与http请求
|
||
ev.events = EPOLLIN;
|
||
ev.data.fd = fifo_fd;
|
||
epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_fd, &ev);
|
||
char iss_buf[256];
|
||
self->http_fd = init_network(port);
|
||
|
||
ev.data.fd = self->http_fd;
|
||
epoll_ctl(epfd, EPOLL_CTL_ADD, self->http_fd, &ev);
|
||
struct epoll_event events;
|
||
self->epoll_fd = epfd;
|
||
for(;;)
|
||
{
|
||
/*工作循环-----------------------------*/
|
||
int nf = epoll_wait(epfd,&events,1,-1);
|
||
if (nf == -1) {
|
||
perror("epoll_wait");
|
||
break;
|
||
}
|
||
if(events.data.fd ==self->http_fd)
|
||
{
|
||
int nt_fd = accept4(self->http_fd,NULL,NULL,SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||
if(nt_fd == -1)
|
||
continue;
|
||
sprintf(iss_buf,"s/%d/e",nt_fd);
|
||
self->iss_work(self,iss_buf);
|
||
}
|
||
if(events.data.fd == fifo_fd)
|
||
{
|
||
char command;
|
||
while(read(fifo_fd,&command,1)==1)
|
||
{
|
||
switch(command){
|
||
case 'q':
|
||
//退出逻辑
|
||
quit_server(self);
|
||
return 1;
|
||
break;
|
||
case 'u':
|
||
//插件更新逻辑
|
||
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
/*工作循环----------------------------*/
|
||
}
|
||
}
|
||
|
||
void *run_network(void *self_d)
|
||
{
|
||
netm *self = (netm*)self_d;
|
||
self->start_pool(self);
|
||
server_run(self->port,self->fifo_fd[0],self);
|
||
self->shutdown_pool(self);
|
||
}
|
||
|
||
int init_networkmanager(netm *self,int *fifo,log_manager *logmanager,int port)
|
||
{
|
||
self->run_network = run_network;
|
||
self->iss_work = iss_work;
|
||
self->start_pool = start_pool;
|
||
self->shutdown_pool = shutdown_pool;
|
||
//装载方法
|
||
self->fifo_fd[0]= fifo[0];
|
||
self->fifo_fd[1]= fifo[1];
|
||
self->last_alc = 0;
|
||
//初始化参数
|
||
self->logmanager = logmanager;
|
||
self->port = port;
|
||
} |