y_socket: add check go_on workers before handling buf

This commit is contained in:
2025-10-11 08:08:41 +02:00
parent f232cd3826
commit 7a07b43038
3 changed files with 41 additions and 4 deletions
+1
View File
@@ -33,5 +33,6 @@ void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port);
const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst); const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst);
void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes); void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes);
void * remove_node_from_nodes(void* arg);
#endif /* __Y_NODE_T_H__C */ #endif /* __Y_NODE_T_H__C */
+15
View File
@@ -123,6 +123,21 @@ const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst){
return dst; return dst;
} }
void * remove_node_from_nodes(void* arg){
struct arg_send_file *argS=(struct arg_send_file*)arg;
struct main_list_y_NODE_T *nodes=argS->nodes;
y_NODE_T node=argS->node;
struct list_y_NODE_T * l_node = search_node_in_list_y_NODE_T(nodes, node);
if(l_node) remove_index_from_list_y_NODE_T(nodes, l_node->index );
return NULL;
}
void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes){ void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes){
#if 0 #if 0
void* update_nodes(void* arg) void* update_nodes(void* arg)
+25 -4
View File
@@ -410,6 +410,25 @@ void* y_socket_handler_(void *arg){
pthread_mutex_unlock(sock->mut_go_on); pthread_mutex_unlock(sock->mut_go_on);
// kill_all_workers(argw); // kill_all_workers(argw);
// printf("debug: kill_all\n"); // printf("debug: kill_all\n");
}else if(strncmp(buf+7,"remove node",11)==0){
if(set_addr_y_NODE_T(&(argH->node), buf + 19)){
set_str_port_y_NODE_T(&(argH->node), argH->sock->port);
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
argS->fds=fds;
argS->nodes=nodes;
argS->node=argH->node;
argS->filename=NULL;
argS->m_ok_head_l_t = argH->m_ok_head_l_t;
push_back_list_TYPE_PTR(argw->list_arg, argS);
struct y_task_t task_send={
//.func=y_socket_send_file_for_all_nodes,
.func=remove_node_from_nodes,
.arg=argS,
.status=TASK_PENDING,
};
push_tasQ(argw->argx->tasQ, task_send);
}
} }
} }
@@ -717,10 +736,12 @@ void *y_socket_poll_fds(void *arg){
if(m_str == NULL) if(m_str == NULL)
m_str=create_var_list_y_ptr_STRING(); m_str=create_var_list_y_ptr_STRING();
memset(buf, 0, BUF_SIZE+1); memset(buf, 0, BUF_SIZE+1);
pthread_mutex_lock(argSock->mut_go_on);
int w_go_on = argSock->go_on;
pthread_mutex_unlock(argSock->mut_go_on);
#if 1 #if 1
while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0, while(w_go_on && ((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0,
(struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){ (struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE)){
if(buf[nread-1]=='\n') if(buf[nread-1]=='\n')
buf[nread-1]='\0'; buf[nread-1]='\0';
@@ -737,7 +758,7 @@ void *y_socket_poll_fds(void *arg){
//printf("debug: out nread: %ld vs BUF_SIZE :%d \n",nread, BUF_SIZE); //printf("debug: out nread: %ld vs BUF_SIZE :%d \n",nread, BUF_SIZE);
if(nread == -1) if(nread == -1)
fprintf(stderr,"error recvfrom\n"); fprintf(stderr,"error recvfrom\n");
else if(nread >= 0 && nread < BUF_SIZE){ else if(w_go_on && (nread >= 0 && nread < BUF_SIZE)){
if(nread && buf[nread-1]=='\n' if(nread && buf[nread-1]=='\n'
) buf[nread-1]='\0'; ) buf[nread-1]='\0';
buf[nread]='\0'; buf[nread]='\0';