From 3c5cae31cc1cc0328e210a90b24e08d654e08959 Mon Sep 17 00:00:00 2001 From: fanasina Date: Wed, 2 Jul 2025 01:07:09 +0200 Subject: [PATCH] debug list: now use local current list instead of current_list to avoid thread concurrency --- list_t/src/list_t/list_t.h | 45 +++-- y_socket_t/include/y_socket_t/y_socket_t.h | 4 +- y_socket_t/src/y_socket_t/y_socket_t.c | 190 ++++++++++++++++----- y_socket_t/test/is_good.c | 4 +- 4 files changed, 182 insertions(+), 61 deletions(-) diff --git a/list_t/src/list_t/list_t.h b/list_t/src/list_t/list_t.h index 774eed3..44573a9 100644 --- a/list_t/src/list_t/list_t.h +++ b/list_t/src/list_t/list_t.h @@ -30,11 +30,13 @@ void remove_all_list_in_##type(struct main_list_##type *var_list);\ void increment_list_##type(struct main_list_##type * var_list);\ void decrement_list_##type(struct main_list_##type * var_list);\ - struct list_##type * search_first_occ_with_mov_from_curr_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type), void (*incr_or_decr_mov)(struct main_list_##type *));\ + struct list_##type * search_first_occ_with_mov_from_curr_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type), struct list_##type * (*incr_or_decr_mov)(struct list_##type *));\ struct list_##type * search_first_occ_from_begin_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type));\ struct list_##type * pull_end_from_list_##type(struct main_list_##type *var_list);\ struct list_##type * pull_begin_from_list_##type(struct main_list_##type *var_list);\ void append_list_##type(struct main_list_##type *var_list, struct list_##type *list);\ + struct list_##type * local_increment_list_##type(struct list_##type *list);\ + struct list_##type * local_decrement_list_##type(struct list_##type *list);\ GENERATE_LIST_ALL(TYPE_CHAR) @@ -185,11 +187,11 @@ GENERATE_LIST_ALL(TYPE_PTR) }\ }\ void remove_all_list_in_##type(struct main_list_##type *var_list){\ - struct list_##type *tmp = var_list->begin_list;\ + struct list_##type *tmp = var_list->begin_list, *prec_tmp;\ while(tmp){\ - var_list->current_list = tmp;\ + prec_tmp = tmp;\ tmp = tmp->next;\ - free(var_list->current_list);\ + free(prec_tmp);\ }\ var_list->begin_list = NULL;\ var_list->current_list = NULL;\ @@ -209,16 +211,23 @@ GENERATE_LIST_ALL(TYPE_PTR) var_list->current_list = (var_list->current_list)->preview;\ --(var_list->current_index);\ }\ - struct list_##type * search_first_occ_with_mov_from_curr_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type), void (*incr_or_decr_mov)(struct main_list_##type *)){\ - for(; var_list->current_list; incr_or_decr_mov(var_list)){\ - if(0 == funcCmp(value, var_list->current_list->value))\ - return var_list->current_list;\ + struct list_##type * local_increment_list_##type(struct list_##type *list){\ + if(list) return list->next;\ + return NULL;\ + }\ + struct list_##type * local_decrement_list_##type(struct list_##type *list){\ + if (list) return list->preview;\ + return NULL;\ + }\ + struct list_##type * search_first_occ_with_mov_from_curr_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type), struct list_##type * (*incr_or_decr_mov)(struct list_##type *)){\ + for(struct list_##type *list_cur = var_list->current_list; list_cur; list_cur = incr_or_decr_mov(list_cur)){\ + if(0 == funcCmp(value, list_cur->value))\ + return list_cur;\ }\ return NULL;\ }\ struct list_##type * search_first_occ_from_begin_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type)){\ - move_current_to_begin_list_##type(var_list);\ - return search_first_occ_with_mov_from_curr_in_list_##type(var_list, value, funcCmp, increment_list_##type);\ + return search_first_occ_with_mov_from_curr_in_list_##type(var_list, value, funcCmp, local_increment_list_##type);\ }\ struct list_##type * pull_end_from_list_##type(struct main_list_##type *var_list){\ struct list_##type *ret = var_list->end_list;\ @@ -279,12 +288,12 @@ GENERATE_LIST_ALL(TYPE_PTR) #define GEN_FUNC_PTR_LIST_FREE(type)\ void remove_all_ptr_type_list_##type(struct main_list_##type *var_list){\ - struct list_##type *tmp = var_list->begin_list;\ + struct list_##type *tmp = var_list->begin_list, *prec_tmp;\ while(tmp){\ - var_list->current_list = tmp;\ + prec_tmp = tmp;\ tmp = tmp->next;\ - free_##type(var_list->current_list->value);\ - free(var_list->current_list);\ + free_##type(prec_tmp->value);\ + free(prec_tmp);\ }\ var_list->begin_list = NULL;\ var_list->current_list = NULL;\ @@ -293,12 +302,12 @@ GENERATE_LIST_ALL(TYPE_PTR) var_list->current_index = 0;\ }\ void purge_ptr_type_list_##type(struct main_list_##type *var_list){\ - struct list_##type *tmp = var_list->begin_list;\ + struct list_##type *tmp = var_list->begin_list, *prec_tmp;\ while(tmp){\ - var_list->current_list = tmp;\ + prec_tmp = tmp;\ tmp = tmp->next;\ - free_##type(var_list->current_list->value);\ - free(var_list->current_list);\ + free_##type(prec_tmp->value);\ + free(prec_tmp);\ }\ var_list->begin_list = NULL;\ var_list->current_list = NULL;\ diff --git a/y_socket_t/include/y_socket_t/y_socket_t.h b/y_socket_t/include/y_socket_t/y_socket_t.h index 7479abc..c3994bc 100644 --- a/y_socket_t/include/y_socket_t/y_socket_t.h +++ b/y_socket_t/include/y_socket_t/y_socket_t.h @@ -62,6 +62,7 @@ struct y_socket_t{ pthread_mutex_t *mut_nodes; int go_on; pthread_mutex_t *mut_go_on; + int nb_workers; }; @@ -74,7 +75,8 @@ struct argdst { }; //struct y_socket_t * y_socket_create_(char * port); -struct y_socket_t * y_socket_create(char * port, size_t size_fds); +struct y_socket_t * y_socket_create(char * port, size_t size_fds, int nb_workers); +struct y_socket_t * y_socket_create_(char * port); void y_socket_free(struct y_socket_t *socket); diff --git a/y_socket_t/src/y_socket_t/y_socket_t.c b/y_socket_t/src/y_socket_t/y_socket_t.c index 29031b4..f804768 100644 --- a/y_socket_t/src/y_socket_t/y_socket_t.c +++ b/y_socket_t/src/y_socket_t/y_socket_t.c @@ -23,8 +23,8 @@ GEN_FUNC_PTR_LIST_FREE(y_ptr_STRING){ size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr){ size_t total_size=0; - for(move_current_to_begin_list_y_ptr_STRING(mstr); mstr->current_list; increment_list_y_ptr_STRING(mstr)){ - total_size += mstr->current_list->value->size; + for(struct list_y_ptr_STRING * local_current = mstr->begin_list; local_current; local_current = local_current->next){ + total_size += local_current->value->size; } printf("debug: totalsize :%ld\n",total_size); return total_size; @@ -38,11 +38,12 @@ size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y char *cur_str = dst_str; size_t local_size=0; size_t count_size=0; - for(move_current_to_begin_list_y_ptr_STRING(mstr); mstr->current_list; increment_list_y_ptr_STRING(mstr)){ - local_size = mstr->current_list->value->size; + //for(move_current_to_begin_list_y_ptr_STRING(mstr); mstr->current_list; increment_list_y_ptr_STRING(mstr)) + for(struct list_y_ptr_STRING * local_current = mstr->begin_list; local_current; local_current = local_current->next){ + local_size = local_current->value->size; // printf("debug: local_size :%ld\n",local_size); for(size_t i=0; icurrent_list->value->buf[i]; + cur_str[i]=local_current->value->buf[i]; } count_size += local_size; // printf("debug: countsize :%ld \n",count_size); @@ -53,7 +54,7 @@ size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y } -struct y_socket_t * y_socket_create(char *port, size_t size_fds){ +struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){ struct y_socket_t *sock_temp=malloc(sizeof(struct y_socket_t)); if(size_fds>=nbIpVersion) sock_temp->size_fds = size_fds; @@ -68,10 +69,12 @@ struct y_socket_t * y_socket_create(char *port, size_t size_fds){ sock_temp->go_on = 1; sock_temp->mut_go_on = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(sock_temp->mut_go_on, NULL); + sock_temp->nb_workers = nb_workers; + return sock_temp; } struct y_socket_t * y_socket_create_(char * port){ - return y_socket_create(port, 2); + return y_socket_create(port, 2, 2); } void y_socket_free(struct y_socket_t *socket){ free(socket->fds); @@ -90,7 +93,19 @@ int check_y_socket_go_on(struct y_socket_t *sock){ return ret; } +struct arg_update_nodes{ + + y_NODE_T node; + struct main_list_y_NODE_T *nodes; +}; void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes){ +#if 0 +void* update_nodes(void* arg) + struct arg_update_nodes * argU=(struct arg_update_nodes*)arg; + y_NODE_T node=argU->node; + struct main_list_y_NODE_T *nodes=argU->nodes; +#endif + char host[NI_MAXHOST], service[NI_MAXSERV]; int status = getnameinfo((struct sockaddr*)&(node.addr), node.addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST); if(status) @@ -102,16 +117,24 @@ void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes){ if(NULL == search_node_in_list_y_NODE_T(nodes, node)) push_back_list_y_NODE_T(nodes, node); +// return NULL; } -struct send_arg{ - struct pollfd *fd; +struct arg_send_file{ + struct pollfd *fds; struct main_list_y_NODE_T *nodes; char * filename; }; -void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename){ - char tempAddr[BUF_SIZE+1]; +//void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename) +void* y_socket_send_file_for_all_nodes(void* arg){ + struct arg_send_file *argS=(struct arg_send_file*)arg; + + struct pollfd *fds=argS->fds; + struct main_list_y_NODE_T *nodes=argS->nodes; + char * filename=argS->filename; + + char tempAddr[BUF_SIZE+1]; int c_af; // char host[NI_MAXHOST], service[NI_MAXSERV]; char buf_send[BUF_SIZE+1]; @@ -132,7 +155,7 @@ void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NOD fd_file = open( filename , O_RDONLY); if(fd_file == -1){ fprintf(stderr,"error opening file |%s| for reading\n",filename); - return /*NULL*/; + return NULL; } @@ -143,21 +166,22 @@ void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NOD // sprintf(msgRet, "from %s:%s =%s",host, service, buf); // len_msgRet = strlen(msgRet); - printf("debug: sending response %s :\n",buf_send); + ///printf("debug: sending response %s :\n",buf_send); - FOR_LIST_FORM_BEGIN(y_NODE_T, nodes){ + //FOR_LIST_FORM_BEGIN(y_NODE_T, nodes) + for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ //memset(tempAddr, 0, BUF_SIZE+1); - c_af=(nodes->current_list->value).addr.ss_family; + c_af=(local_list_current->value).addr.ss_family; if(c_af==AF_INET){ if(NULL == inet_ntop(c_af, - &(GET_IN_type_ADDR(&(nodes->current_list->value),)), - tempAddr, BUF_SIZE/*(argSock->nodes->current_list->value).addr_len*/)){ + &(GET_IN_type_ADDR(&(local_list_current->value),)), + tempAddr, BUF_SIZE/*(argSock->local_list_current->value).addr_len*/)){ fprintf(stderr, "error inet_ntop v4\n"); } }else if(c_af==AF_INET6){ if(NULL == inet_ntop(c_af, - &(GET_IN_type_ADDR(&(nodes->current_list->value),6)), - tempAddr, BUF_SIZE /*(argSock->nodes->current_list->value).addr_len*/)){ + &(GET_IN_type_ADDR(&(local_list_current->value),6)), + tempAddr, BUF_SIZE /*(argSock->local_list_current->value).addr_len*/)){ fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno); } } @@ -168,28 +192,29 @@ void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NOD } #endif - printf("debug: destination %s :\n",tempAddr); + /// printf("debug: destination %s :\n",tempAddr); #if 1 if(sendto(fds[(c_af==AF_INET6)].fd, buf_send, retread, /*msgRet, len_msgRet,*/ 0, - (struct sockaddr*)&((nodes->current_list->value).addr), - (nodes->current_list->value).addr_len) != + (struct sockaddr*)&((local_list_current->value).addr), + (local_list_current->value).addr_len) != retread /*len_msgRet*/ ){ fprintf(stderr, "Error sending response to %s\n",tempAddr); }else - printf("debug: sending response to %s\n",tempAddr); + printf("debug: sending response to < %s >",tempAddr); #endif } //memset(buf_send, 0, BUF_SIZE+1); } close(fd_file); - printf("fd=%d closed: filename=%s\n",fd_file,filename); + printf("debug: fd=%d closed: filename=%s\n",fd_file,filename); + return NULL; } void y_socket_get_fds(struct pollfd * fds, char * port, char * addrDistant){ @@ -260,25 +285,73 @@ int flags = fcntl(fds[af].fd, F_GETFL); freeaddrinfo(result); } -void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock){ +struct arg_handler_{ + char *buf; + struct pollfd *fds; + y_NODE_T node; + struct y_socket_t *sock; + struct argWorker *argw ; +}; + +//void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock) +void* y_socket_handler_(void *arg){ + struct arg_handler_ *argH = (struct arg_handler_ *)arg; + char *buf=argH->buf; + struct pollfd *fds=argH->fds; + struct y_socket_t *sock=argH->sock; + struct argWorker *argw=argH->argw; + struct main_list_y_NODE_T *nodes = sock->nodes; + update_nodes(argH->node, nodes); + printf("\n\n:::::::::::::::::::::::::::handler: : \n\n%s\n\n::::::::::::::::::::::::::\n",buf); - if(strncmp(buf, "GET", 3)==0){ + if(strncmp(buf, "get", 3)==0){ if(strncmp(buf+4,"file",4)==0){ char *filename = buf + 9; - y_socket_send_file_for_all_nodes(fds, nodes, filename) ; + struct arg_send_file *argS=malloc(sizeof(struct arg_send_file)); + argS->fds=fds; + argS->nodes=nodes; + argS->filename=filename; + push_back_list_TYPE_PTR(argw->list_arg, argS); + struct y_task_t task_send={ + .func=y_socket_send_file_for_all_nodes, + .arg=argS, + .status=TASK_PENDING, + }; + push_tasQ(argw->argx->tasQ, task_send); + //y_socket_send_file_for_all_nodes(fds, nodes, filename) ; } } - if(strncmp(buf, "UPDATE", 6)==0){ + if(strncmp(buf, "update", 6)==0){ if(strncmp(buf+7,"kill",4)==0){ pthread_mutex_lock(sock->mut_go_on); sock->go_on = 0; pthread_mutex_unlock(sock->mut_go_on); +// kill_all_workers(argw); +// printf("debug: kill_all\n"); } } } void *y_socket_poll_fds(void *arg){ struct y_socket_t * argSock = (struct y_socket_t*)arg; +// // // + struct main_list_ptr_y_WORKER_T * workers = create_var_list_ptr_y_WORKER_T(); + struct main_list_TYPE_PTR * list_arg = create_var_list_TYPE_PTR(); + struct argExecTasQ *argx = create_argExecTasQ(); + pthread_mutex_t *mut_workers = malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(mut_workers, NULL); + + //int nb_workers=2; + for(int i=0; i< argSock->nb_workers; ++i){ + + struct y_worker_t *pw= create_ptr_y_WORKER_T(workers, list_arg, mut_workers, argx, GO_ON_WORKER, i); + + printf("debug:  - / -----  -- / --- %d workers %ld created\n",argSock->nb_workers,(pw->arg->pworker->id)); +//usleep(500); + } + +/// /// /// + struct pollfd *fds = argSock->fds; y_socket_get_fds(fds, argSock->port, NULL); @@ -294,7 +367,9 @@ void *y_socket_poll_fds(void *arg){ ssize_t nread; char buf[BUF_SIZE]; struct main_list_y_ptr_STRING *m_str=create_var_list_y_ptr_STRING(); - char *temp_all_buf=NULL; + +// char *temp_all_buf=NULL; + // char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100]; // int len_msgRet; @@ -337,21 +412,45 @@ void *y_socket_poll_fds(void *arg){ //printf("debug: out push_back_list_y_ptr_STRING of <%s>\n",buf); } - - update_nodes(node, argSock->nodes); - if(temp_all_buf){ +#if 0 + struct arg_update_nodes *argUP_N=malloc(sizeof(struct arg_update_nodes)); + argUP_N->node=node; + argUP_N->nodes=argSock->nodes; + push_back_list_TYPE_PTR(list_arg, argUP_N); + struct y_task_t task_update_node={ + .func=update_nodes, + .arg=argUP_N, + .status=TASK_PENDING, + }; + push_tasQ(argx->tasQ, task_update_node); +#endif + //update_nodes(node, argSock->nodes); + /* + if(temp_all_buf){ free(temp_all_buf); temp_all_buf=NULL; - } - /*size_t total_buf = */ copy_list_y_ptr_STRING_to_one_string(&temp_all_buf , m_str); - -//printf("msg : %s\n",buf); - //printf("msg : %s\n",temp_all_buf); - + }*/ + /*size_t total_buf = */ + char * temp_all_buf = NULL; + copy_list_y_ptr_STRING_to_one_string(&temp_all_buf , m_str); + push_back_list_TYPE_PTR(list_arg, temp_all_buf); + struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_)); + ptr_argHandl->buf = temp_all_buf; + ptr_argHandl->fds=fds; + ptr_argHandl->sock=argSock; + ptr_argHandl->node=node; + ptr_argHandl->argw=workers->begin_list->value->arg; + + push_back_list_TYPE_PTR(list_arg, ptr_argHandl); + struct y_task_t task_handl = { + .func=y_socket_handler_, + .arg=ptr_argHandl, + .status=TASK_PENDING, + }; + push_tasQ(argx->tasQ, task_handl); /// - /// - y_socket_handler_(temp_all_buf, fds, argSock); + //y_socket_handler_(temp_all_buf, fds, argSock); /// } @@ -368,12 +467,23 @@ void *y_socket_poll_fds(void *arg){ } + + purge_ptr_type_list_y_ptr_STRING(m_str); +/* if(temp_all_buf){ free(temp_all_buf); temp_all_buf=NULL; } +*/ +//// //// //// + kill_all_workers(workers->begin_list->value->arg); + printf("debug: kill all done\n"); + +///// ///// ///// + + return NULL; } #define str(x) # x diff --git a/y_socket_t/test/is_good.c b/y_socket_t/test/is_good.c index 0eb956a..336ffa4 100644 --- a/y_socket_t/test/is_good.c +++ b/y_socket_t/test/is_good.c @@ -25,7 +25,7 @@ TEST(first){ - struct y_socket_t *firstSock = y_socket_create("1600", 2); + struct y_socket_t *firstSock = y_socket_create_("1600"); LOG("create y_socket_t in port |%s|\n",firstSock->port); y_socket_free(firstSock); @@ -164,7 +164,7 @@ TEST(searchNode){ TEST(pollThread){ - struct y_socket_t *argS=y_socket_create("1600", 2); + struct y_socket_t *argS=y_socket_create("1600", 2, 3); pthread_t pollTh; pthread_create(&pollTh, NULL, y_socket_poll_fds, (void*)argS);