修复log记录错误(第一个节点无法落盘),更新http解析为现成库实现,优化退出逻辑。

This commit is contained in:
2025-11-21 11:40:08 +08:00
parent 331c6b9f89
commit 26dac05e5b
11 changed files with 30579 additions and 115 deletions

View File

@ -77,16 +77,92 @@ int rbt_parse_json(const char *json_text, rbt_msg *out)
return 0; // 成功
}
int init_http_network(int port)
/**
* @brief 初始化HTTP监听socket所有错误通过logmanager记录
* @param port 监听端口
* @param logger 日志管理器实例指针
* @return 成功返回监听fd失败返回-1并记录日志
*/
int init_http_network(int port, log_manager *logger)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_GETFL);
logs *log;
int fd;
/* 1. 创建socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
log = malloc(sizeof(logs));
// cppcheck-suppress uninitdata
snprintf(log->log, sizeof(log->log),
"[FATAL] socket() failed: %s", strerror(errno));
logger->in_log(log, logger);
return -1;
}
/* 2. 设置SO_REUSEADDR避免TIME_WAIT状态导致bind失败 */
int opt = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
log = malloc(sizeof(logs));
snprintf(log->log, sizeof(log->log),
"[ERROR] setsockopt(SO_REUSEADDR) on fd=%d failed: %s",
fd, strerror(errno));
logger->in_log(log, logger);
close(fd);
return -1;
}
/* 3. 设置为非阻塞模式配合epoll使用 */
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
log = malloc(sizeof(logs));
snprintf(log->log, sizeof(log->log),
"[ERROR] fcntl(F_GETFL) on fd=%d failed: %s", fd, strerror(errno));
logger->in_log(log, logger);
close(fd);
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
log = malloc(sizeof(logs));
snprintf(log->log, sizeof(log->log),
"[ERROR] fcntl(O_NONBLOCK) on fd=%d failed: %s", fd, strerror(errno));
logger->in_log(log, logger);
close(fd);
return -1;
}
/* 4. 绑定到指定端口 */
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET; // 和 socket() 一致
addr.sin_port = htons(port); // 端口号必须网络字节序
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0:本机所有网卡
bind(fd, (struct sockaddr *)&addr, sizeof(addr));
listen(fd,10);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
log = malloc(sizeof(logs));
snprintf(log->log, sizeof(log->log),
"[FATAL] bind(port %d) failed: %s (fd=%d)",
port, strerror(errno), fd);
logger->in_log(log, logger);
close(fd);
return -1;
}
/* 5. 开始监听 */
if (listen(fd, 10) == -1) {
log = malloc(sizeof(logs));
snprintf(log->log, sizeof(log->log),
"[FATAL] listen(fd=%d, backlog=10) failed: %s",
fd, strerror(errno));
logger->in_log(log, logger);
close(fd);
return -1;
}
/* 6. 成功日志 */
log = malloc(sizeof(logs));
snprintf(log->log, sizeof(log->log),
"[HTTP] Successfully listening on port %d (fd=%d)", port, fd);
logger->in_log(log, logger);
return fd;
}
@ -99,36 +175,62 @@ ssize_t read_req(int fd, void *buf)
return (n > 0) ? n : -1;
}
int process_message(char *req,log_manager *logger)
{
//TODO 修改管道命令解析
if(req ==NULL )
return 0;
int process_message(char *req, log_manager *logger) {
if(req == NULL) return 0;
int fd;
char front,rear;
sscanf(req,"%s/%d/%s",&rear,&fd,&rear);
char type[16], end[16];
if(sscanf(req, "%15s/%d/%15s", type, &fd, end) != 3) {
free(req);
return -1;
}
char *req_buf = recv_http_request(fd);
if(!req_buf) { // 检查返回值
close(fd);
free(req);
return -1;
}
const char *body = http_get_body(req_buf);
free(req_buf);
rbt_msg message;
rbt_parse_json(body,&message);
make_swap((void*)&message);
logs *log = malloc(sizeof log);
if(snprintf(log->log,sizeof(log->log),
"%s message %s processd ok\n",message.nickname,message.raw_message)<1024);
logger->in_log(log,logger);
if(rbt_parse_json(body, &message) == 0) {
make_swap((void*)&message);
logs *log = malloc(sizeof(logs));
// cppcheck-suppress uninitdata
snprintf(log->log, sizeof(log->log), "%s message %s processed ok\n",
message.nickname, message.raw_message);
logger->in_log(log, logger);
}
// ****** 修复2发送HTTP响应 ******
const char *response =
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: 2\r\n"
"\r\n"
"OK";
write(fd, response, strlen(response));
close(fd);
free(req);
return 0;
}
int iss_work(netm *self,char *command)
{
int i = self->last_alc+1;
int i = self->last_alc;
//查询空闲线程
while(self->pool[i].fifo_fd ==0)
while(atomic_load(&(self->pool[i].status)) ==0)
{
if(i<MAX_POOL)
i++;
else
else{
i=0;
}
}
//向空闲线程发送数据
write(self->pool[i].fifo_fd[0],command,strlen(command));
@ -143,8 +245,9 @@ void *pth_module(void *args_p)
pth_m *pmd = argms->pth;
log_manager *logger = argms->log;
//参数解析
free(args_p);
char name[256] = {'\0'};
sprintf(name,"chatrebot%lu",pmd->pthread_id);
sprintf(name,"chatrebot%lu",pthread_self());
int swap = create_swap(name);
//创建共享内存
char swap_arg[64] = {'\0'};
@ -159,16 +262,17 @@ void *pth_module(void *args_p)
execv("Run_pluhginmanager",args);
}
logs *pth_log = (logs*)malloc(sizeof(logs));
sprintf(pth_log->log,"PID:%lu launched python plugines\n",pmd->pthread_id);
// cppcheck-suppress uninitdata
sprintf(pth_log->log,"PID:%lu launched python plugines\n",pthread_self());
logger->in_log(pth_log,logger);
//拉起python插件管理器
for(;;){
//线程池中,单个线程模型
char req[64];
char *req = (char*)malloc(MAX_MESSAGE_BUF);
//从管道中读取请求,并解析,无内容时休眠
int n = read_req(pmd->fifo_fd[0],req);
int n = read_req(pmd->fifo_fd[0],(void*)req);
//管道关闭时退出;
if (n == EOF) {
@ -176,6 +280,9 @@ void *pth_module(void *args_p)
break;
}
else{
pth_log = (logs*)malloc(sizeof(logs));
sprintf(pth_log->log,"processd message");
logger->in_log(pth_log,logger);
process_message(req,logger);
atomic_fetch_add(&pmd->status, 1);
}
@ -189,11 +296,12 @@ int start_pool(netm *self)
//为线程开辟管道
pipe(self->pool[i].fifo_fd);
//启动线程
net_args arg;
arg.pth =&self->pool[i];
arg.log = self->logmanager;
net_args *arg = (net_args*)malloc(sizeof(net_args));
arg->pth = &self->pool[i];
arg->log = self->logmanager;
self->pool[i].status = 1;
pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)&arg);
pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)arg);
}
}
@ -211,7 +319,7 @@ int shutdown_pool(netm *self)
int server_run(int port,int fifo_fd,netm *self)
{
int epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐
int epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
@ -222,7 +330,7 @@ int server_run(int port,int fifo_fd,netm *self)
ev.data.fd = fifo_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_fd, &ev);
char iss_buf[256];
self->http_fd = init_http_network(port);
self->http_fd = init_http_network(port,self->logmanager);
ev.data.fd = self->http_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, self->http_fd, &ev);
@ -261,7 +369,6 @@ int server_run(int port,int fifo_fd,netm *self)
break;
case 'u':
//插件更新逻辑
break;
}
}
@ -289,8 +396,9 @@ int init_networkmanager(netm *self,int *fifo,log_manager *logmanager,int port)
self->fifo_fd[0]= fifo[0];
self->fifo_fd[1]= fifo[1];
self->last_alc = 0;
self->port = port;
//初始化参数
self->logmanager = logmanager;
self->err_indictor = (indiector*)malloc(sizeof(indiector));
self->port = port;
return 0;
}