#define _GNU_SOURCE #include "network.h" #include "swap.h" #include "http_rel.h" #include "cJSON.h" #include "tools/log/log.h" #include #include #include #include #include #include #include #include #include #include static void safe_strcpy(char *dst, size_t dst_size, const char *src) { if (!src) { dst[0] = '\0'; return; } size_t len = strlen(src); if (len >= dst_size) len = dst_size - 1; memcpy(dst, src, len); dst[len] = '\0'; } /* 主解析 */ int rbt_parse_json(const char *json_text, rbt_msg *out) { memset(out, 0, sizeof(*out)); // 统一清 0,gid 天然 '\0' cJSON *root = cJSON_Parse(json_text); if (!root) return -1; /* 1. 取群号(可能没有) */ cJSON *gid = cJSON_GetObjectItemCaseSensitive(root, "group_id"); if (cJSON_IsString(gid)) safe_strcpy(out->gid, sizeof(out->gid), gid->valuestring); else if (cJSON_IsNumber(gid)) // 有些框架是数字 snprintf(out->gid, sizeof(out->gid), "%d", gid->valueint); /* 2. 用户号 */ cJSON *uid = cJSON_GetObjectItemCaseSensitive(root, "user_id"); if (cJSON_IsString(uid)) safe_strcpy(out->uid, sizeof(out->uid), uid->valuestring); else if (cJSON_IsNumber(uid)) snprintf(out->uid, sizeof(out->uid), "%d", uid->valueint); /* 3. 昵称在 sender 对象里 */ cJSON *sender = cJSON_GetObjectItemCaseSensitive(root, "sender"); if (cJSON_IsObject(sender)) { cJSON *nick = cJSON_GetObjectItemCaseSensitive(sender, "nickname"); safe_strcpy(out->nickname, sizeof(out->nickname), cJSON_IsString(nick) ? nick->valuestring : NULL); } /* 4. 原始消息 */ cJSON *raw = cJSON_GetObjectItemCaseSensitive(root, "raw_message"); safe_strcpy(out->raw_message, sizeof(out->raw_message), cJSON_IsString(raw) ? raw->valuestring : NULL); /* 5. 消息类型 */ cJSON *type = cJSON_GetObjectItemCaseSensitive(root, "message_type"); if (cJSON_IsString(type)) { if (strcmp(type->valuestring, "group") == 0) out->message_type = 'g'; else if (strcmp(type->valuestring, "private") == 0) out->message_type = 'p'; /* else 保持 0 */ } cJSON_Delete(root); return 0; // 成功 } int init_network(int port) { int fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in addr = {0}; addr.sin_family = AF_INET; // 和 socket() 一致 addr.sin_port = htons(port); // 端口号必须网络字节序 addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0:本机所有网卡 bind(fd, (struct sockaddr *)&addr, sizeof(addr)); return fd; } ssize_t read_req(int fd, void *buf) { ssize_t n = read(fd, buf, MAX_MESSAGE_BUF); if (n == 0) /* 写端已关闭,管道永不会再有数据 */ return -1; return (n > 0) ? n : -1; } int process_message(char *req,log_manager *logger) { const char *body = http_get_body(req); rbt_msg message; rbt_parse_json(body,&message); make_swap((void*)&message); logs *log = malloc(sizeof log); if(snprintf(log->log,sizeof(log->log), "%s message %s processd ok\n",message.nickname,message.raw_message)<1024); logger->in_log(log,logger); } int iss_work(netm *self,char *command) { int i = self->last_alc+1; //查询空闲线程 while(self->pool[i].fifo_fd ==0) { if(ipool[i].fifo_fd[0],command,strlen(command)); //设置线程程为working atomic_fetch_sub(&self->pool[i].status,1); self->last_alc = i; } void *pth_module(void *args_p) { args *argms = (args*)args_p; pth_m *pmd = argms->pth; log_manager *logger = argms->log; //参数解析 char name[256] = {'\0'}; sprintf(name,"chatrebot%lu",pmd->pthread_id); int swap = create_swap(name); //创建共享内存 char swap_arg[64] = {'\0'}; sprintf(swap_arg,"%d",swap); pid_t id = fork(); if(id == 0) { char *args[]={ "Pluginmanager", "--swap",swap_arg, NULL}; execv("Run_pluhginmanager",args); } //拉起python插件管理器 for(;;){ //线程池中,单个线程模型 char req[64*1024]; //从管道中读取请求,并解析,无内容时休眠 int n = read_req(pmd->fifo_fd[0],req); //管道关闭时退出; if (n == EOF) { return NULL; break; } else{ process_message(req,logger); atomic_fetch_add(&pmd->status, 1); } } } int start_pool(netm *self) { for(int i = 0;ipool[i].fifo_fd); //启动线程 args arg; arg.pth =&self->pool[i]; arg.log = self->logmanager; self->pool[i].status = 1; pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)&arg); } } int shutdown_pool(netm *self) { for(int i = 0;ipool[i].fifo_fd[0]); } } int server_run(int port,int fifo_fd,netm *self) { int epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐 if (epfd == -1) { perror("epoll_create1"); exit(EXIT_FAILURE); } char iss_buf[256]; int http_fd = init_network(port); struct epoll_event events; for(;;) { int nf = epoll_wait(epfd,&events,1,-1); if (nf == -1) { perror("epoll_wait"); break; } if(events.data.fd ==http_fd) { sprintf(iss_buf,"s/%d/e",accept4(http_fd,NULL,NULL,SOCK_NONBLOCK | SOCK_CLOEXEC)); self->iss_work(self,iss_buf); } if(events.data.fd == fifo_fd) { char command; while(read(fifo_fd,&command,1)==1) { switch(command){ case 'q': //退出逻辑 break; case 'u': //插件更新逻辑 break; } } } } } void *run_network(void *self_d) { netm *self = (netm*)self_d; self->start_pool(self); server_run(self->port,self->fifo_fd[1],self); self->shutdown_pool(self); } int init_networkmanager(netm *self,int *fifo,log_manager *logmanager,int port) { self->run_network = run_network; self->iss_work = iss_work; self->start_pool = start_pool; self->shutdown_pool = shutdown_pool; //装载方法 self->fifo_fd[0]= fifo[0]; self->fifo_fd[1]= fifo[1]; self->last_alc = 0; //初始化参数 self->logmanager = logmanager; self->port = port; }