#define _GNU_SOURCE #include "network.h" #include "swap.h" #include "http_rel.h" #include "cJSON.h" #include "tools/log/log.h" #include "tools/quit/quit.h" #include #include #include #include #include #include #include #include #include #include static void safe_strcpy(char *dst, size_t dst_size, const char *src) { if (!src) { dst[0] = '\0'; return; } size_t len = strlen(src); if (len >= dst_size) len = dst_size - 1; memcpy(dst, src, len); dst[len] = '\0'; } /* 主解析 */ int rbt_parse_json(const char *json_text, rbt_msg *out) { memset(out, 0, sizeof(*out)); // 统一清 0,gid 天然 '\0' cJSON *root = cJSON_Parse(json_text); if (!root) return -1; /* 1. 取群号(可能没有) */ cJSON *gid = cJSON_GetObjectItemCaseSensitive(root, "group_id"); if (cJSON_IsString(gid)) safe_strcpy(out->gid, sizeof(out->gid), gid->valuestring); else if (cJSON_IsNumber(gid)) // 有些框架是数字 snprintf(out->gid, sizeof(out->gid), "%d", gid->valueint); /* 2. 用户号 */ cJSON *uid = cJSON_GetObjectItemCaseSensitive(root, "user_id"); if (cJSON_IsString(uid)) safe_strcpy(out->uid, sizeof(out->uid), uid->valuestring); else if (cJSON_IsNumber(uid)) snprintf(out->uid, sizeof(out->uid), "%d", uid->valueint); /* 3. 昵称在 sender 对象里 */ cJSON *sender = cJSON_GetObjectItemCaseSensitive(root, "sender"); if (cJSON_IsObject(sender)) { cJSON *nick = cJSON_GetObjectItemCaseSensitive(sender, "nickname"); safe_strcpy(out->nickname, sizeof(out->nickname), cJSON_IsString(nick) ? nick->valuestring : NULL); } /* 4. 原始消息 */ cJSON *raw = cJSON_GetObjectItemCaseSensitive(root, "raw_message"); safe_strcpy(out->raw_message, sizeof(out->raw_message), cJSON_IsString(raw) ? raw->valuestring : NULL); /* 5. 消息类型 */ cJSON *type = cJSON_GetObjectItemCaseSensitive(root, "message_type"); if (cJSON_IsString(type)) { if (strcmp(type->valuestring, "group") == 0) out->message_type = 'g'; else if (strcmp(type->valuestring, "private") == 0) out->message_type = 'p'; /* else 保持 0 */ } cJSON_Delete(root); return 0; // 成功 } int init_network(int port) { int fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in addr = {0}; addr.sin_family = AF_INET; // 和 socket() 一致 addr.sin_port = htons(port); // 端口号必须网络字节序 addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0:本机所有网卡 bind(fd, (struct sockaddr *)&addr, sizeof(addr)); listen(fd,10); return fd; } ssize_t read_req(int fd, void *buf) { // TODO 修改读取任务函数 ssize_t n = read(fd, buf, MAX_MESSAGE_BUF); if (n == 0) /* 写端已关闭,管道永不会再有数据 */ return -1; return (n > 0) ? n : -1; } int process_message(char *req,log_manager *logger) { //TODO 修改管道命令解析 if(req ==NULL ) return 0; int fd; char front,rear; sscanf(req,"%s/%d/%s",&rear,&fd,&rear); char *req_buf = recv_http_request(fd); const char *body = http_get_body(req_buf); free(req_buf); rbt_msg message; rbt_parse_json(body,&message); make_swap((void*)&message); logs *log = malloc(sizeof log); if(snprintf(log->log,sizeof(log->log), "%s message %s processd ok\n",message.nickname,message.raw_message)<1024); logger->in_log(log,logger); } int iss_work(netm *self,char *command) { int i = self->last_alc+1; //查询空闲线程 while(self->pool[i].fifo_fd ==0) { if(ipool[i].fifo_fd[0],command,strlen(command)); //设置线程程为working atomic_fetch_sub(&self->pool[i].status,1); self->last_alc = i; } void *pth_module(void *args_p) { net_args *argms = (net_args*)args_p; pth_m *pmd = argms->pth; log_manager *logger = argms->log; //参数解析 char name[256] = {'\0'}; sprintf(name,"chatrebot%lu",pmd->pthread_id); int swap = create_swap(name); //创建共享内存 char swap_arg[64] = {'\0'}; sprintf(swap_arg,"%d",swap); pid_t id = fork(); if(id == 0) { char *args[]={ "Pluginmanager", "--swap",swap_arg, NULL}; execv("Run_pluhginmanager",args); } logs *pth_log = (logs*)malloc(sizeof(logs)); sprintf(pth_log->log,"PID:%lu launched python plugines\n",pmd->pthread_id); logger->in_log(pth_log,logger); //拉起python插件管理器 for(;;){ //线程池中,单个线程模型 char req[64*1024]; //从管道中读取请求,并解析,无内容时休眠 int n = read_req(pmd->fifo_fd[0],req); //管道关闭时退出; if (n == EOF) { return NULL; break; } else{ process_message(req,logger); atomic_fetch_add(&pmd->status, 1); } } } int start_pool(netm *self) { for(int i = 0;ipool[i].fifo_fd); //启动线程 net_args arg; arg.pth =&self->pool[i]; arg.log = self->logmanager; self->pool[i].status = 1; pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)&arg); } } int shutdown_pool(netm *self) { for(int i = 0;ipool[i].status == -1) continue; self->pool[i].status = -1; close(self->pool[i].fifo_fd[1]); } return 1; } int server_run(int port,int fifo_fd,netm *self) { int epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐 if (epfd == -1) { perror("epoll_create1"); exit(EXIT_FAILURE); } struct epoll_event ev; //设置epoll同时监听控制管道与http请求 ev.events = EPOLLIN; ev.data.fd = fifo_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_fd, &ev); char iss_buf[256]; self->http_fd = init_network(port); ev.data.fd = self->http_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, self->http_fd, &ev); struct epoll_event events; self->epoll_fd = epfd; for(;;) { /*工作循环-----------------------------*/ int nf = epoll_wait(epfd,&events,1,-1); if (nf == -1) { perror("epoll_wait"); break; } if(events.data.fd ==self->http_fd) { int nt_fd = accept4(self->http_fd,NULL,NULL,SOCK_NONBLOCK | SOCK_CLOEXEC); if(nt_fd == -1) continue; sprintf(iss_buf,"s/%d/e",nt_fd); self->iss_work(self,iss_buf); } if(events.data.fd == fifo_fd) { char command; while(read(fifo_fd,&command,1)==1) { switch(command){ case 'q': //退出逻辑 quit_server(self); return 1; break; case 'u': //插件更新逻辑 break; } } } /*工作循环----------------------------*/ } } void *run_network(void *self_d) { netm *self = (netm*)self_d; self->start_pool(self); server_run(self->port,self->fifo_fd[0],self); self->shutdown_pool(self); } int init_networkmanager(netm *self,int *fifo,log_manager *logmanager,int port) { self->run_network = run_network; self->iss_work = iss_work; self->start_pool = start_pool; self->shutdown_pool = shutdown_pool; //装载方法 self->fifo_fd[0]= fifo[0]; self->fifo_fd[1]= fifo[1]; self->last_alc = 0; //初始化参数 self->logmanager = logmanager; self->port = port; }