From 65d4d8f67cda91929e42c3e026ddf872f51b43b9 Mon Sep 17 00:00:00 2001 From: fanasina Date: Mon, 28 Jul 2025 23:03:36 +0200 Subject: [PATCH] y_socket: add poll stdin, debug node init --- tensor_t/test_cl/is_good.c | 2 +- y_socket_t/include/y_socket_t/y_node_t.h | 3 + y_socket_t/src/y_socket_t/y_node_t.c | 44 ++++++++- y_socket_t/src/y_socket_t/y_socket_t.c | 108 ++++++++++++++++++++--- 4 files changed, 139 insertions(+), 18 deletions(-) diff --git a/tensor_t/test_cl/is_good.c b/tensor_t/test_cl/is_good.c index a8c337b..f68c95e 100644 --- a/tensor_t/test_cl/is_good.c +++ b/tensor_t/test_cl/is_good.c @@ -855,7 +855,7 @@ TEST(tensorProd_vs2d ){ tensorProd_TYPE_FLOAT(&M,M0,M1); //cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,24,24); //cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,32,32); - cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,64,16); + cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,16,16); LOG("M->dim->rank = %ld\n",M->dim->rank); //print_tensor_float(M,"M"); diff --git a/y_socket_t/include/y_socket_t/y_node_t.h b/y_socket_t/include/y_socket_t/y_node_t.h index a4b7b68..922a76f 100644 --- a/y_socket_t/include/y_socket_t/y_node_t.h +++ b/y_socket_t/include/y_socket_t/y_node_t.h @@ -27,5 +27,8 @@ int y_NODE_T_cmp(y_NODE_T nodeA, y_NODE_T nodeB); struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *listNodes, y_NODE_T node); int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr); void set_port_y_NODE_T(y_NODE_T *node, int port); +void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port); + +const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst); #endif /* __Y_NODE_T_H__C */ diff --git a/y_socket_t/src/y_socket_t/y_node_t.c b/y_socket_t/src/y_socket_t/y_node_t.c index 3cc15ed..2d320d7 100644 --- a/y_socket_t/src/y_socket_t/y_node_t.c +++ b/y_socket_t/src/y_socket_t/y_node_t.c @@ -54,6 +54,7 @@ struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *l } int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){ + //memset(&(node->addr), 0, sizeof(struct sockaddr_storage)); int af = AF_INET, ret = -2; for(int i=0; iaddr.ss_family = af; if(af==AF_INET) ret = inet_pton(af, addrStr, &(GET_IN_type_ADDR(&(node->addr),))); - else if(af == AF_INET6) - ret = inet_pton(af, addrStr, (GET_IN_type_ADDR(&(node->addr), 6))); + else if(af == AF_INET6){ + //((struct sockaddr_in6*)(&(node->addr)))->sin6_flowinfo = 0; + ret = inet_pton(af, addrStr, (GET_IN_type_ADDR(&(node->addr), 6))); + } return ret; } @@ -77,10 +80,43 @@ int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){ void set_port_y_NODE_T(y_NODE_T *node, int port){ int af = node->addr.ss_family; if(af==AF_INET) - ((struct sockaddr_in*)(&(node->addr)))->sin_port = port; + ((struct sockaddr_in*)(&(node->addr)))->sin_port = htons(port); else if(af == AF_INET6) - ((struct sockaddr_in6*)(&(node->addr)))->sin6_port = port; + ((struct sockaddr_in6*)(&(node->addr)))->sin6_port = htons(port); } +void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port){ + int port = atoi(str_port); + set_port_y_NODE_T(node, port); +} +const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst){ +#if 0 + char host[NI_MAXHOST], service[NI_MAXSERV]; + int status = getnameinfo((struct sockaddr*)&(node->addr), node->addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST); + if(status==0) + return NULL; + // printf("debug: status ==0 : success: Received successfully from %s:%s\n", host,service); + // else + printf("getnameinfo: %s\n", gai_strerror(status)); + sprintf(dst,"%s:[%s]",host,service); +#endif + char temp_addr[INET6_ADDRSTRLEN]; + if(node->addr.ss_family == AF_INET){ + struct sockaddr_in *sinaddrv4 = ((struct sockaddr_in*)&(node->addr)); + if(inet_ntop(node->addr.ss_family, &(sinaddrv4->sin_addr), + temp_addr, INET6_ADDRSTRLEN) == NULL){ + return NULL; + } + sprintf(dst, "%s:[%d]",temp_addr, ntohs(sinaddrv4->sin_port )); + }else if(node->addr.ss_family == AF_INET6){ + struct sockaddr_in6 *sinaddrv6 = ((struct sockaddr_in6*)&(node->addr)); + if(inet_ntop(node->addr.ss_family, &(sinaddrv6->sin6_addr), + temp_addr, INET6_ADDRSTRLEN) == NULL){ + return NULL; + } + sprintf(dst, "%s:[%d]",temp_addr, ntohs(((struct sockaddr_in6*)&(node->addr))->sin6_port )); + } + return dst; +} diff --git a/y_socket_t/src/y_socket_t/y_socket_t.c b/y_socket_t/src/y_socket_t/y_socket_t.c index 96df691..de3f50e 100644 --- a/y_socket_t/src/y_socket_t/y_socket_t.c +++ b/y_socket_t/src/y_socket_t/y_socket_t.c @@ -56,10 +56,10 @@ size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){ struct y_socket_t *sock_temp=malloc(sizeof(struct y_socket_t)); - if(size_fds>=nbIpVersion) + if(size_fds>=nbIpVersion+1) sock_temp->size_fds = size_fds; else - sock_temp->size_fds = nbIpVersion; + sock_temp->size_fds = nbIpVersion+1; sock_temp->fds = malloc(sock_temp->size_fds * sizeof(struct pollfd)); sock_temp->port=port; @@ -74,7 +74,7 @@ struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers) return sock_temp; } struct y_socket_t * y_socket_create_(char * port){ - return y_socket_create(port, 2, 2); + return y_socket_create(port, 3, 2); } void y_socket_free(struct y_socket_t *socket){ free(socket->fds); @@ -106,6 +106,9 @@ void* update_nodes(void* arg) struct main_list_y_NODE_T *nodes=argU->nodes; #endif + +#if 0 + char host[NI_MAXHOST], service[NI_MAXSERV]; int status = getnameinfo((struct sockaddr*)&(node.addr), node.addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST); if(status) @@ -113,7 +116,7 @@ void* update_nodes(void* arg) // else fprintf(stderr, "getnameinfo: %s\n", gai_strerror(status)); - +#endif if(NULL == search_node_in_list_y_NODE_T(nodes, node)) push_back_list_y_NODE_T(nodes, node); @@ -126,6 +129,8 @@ struct arg_send_file{ char * filename; }; +#define TEMP_ADDR 1 + //void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename) void* y_socket_send_file_for_all_nodes(void* arg){ struct arg_send_file *argS=(struct arg_send_file*)arg; @@ -133,8 +138,9 @@ void* y_socket_send_file_for_all_nodes(void* arg){ struct pollfd *fds=argS->fds; struct main_list_y_NODE_T *nodes=argS->nodes; char * filename=argS->filename; - +#if TEMP_ADDR char tempAddr[BUF_SIZE+1]; +#endif int c_af; // char host[NI_MAXHOST], service[NI_MAXSERV]; char buf_send[BUF_SIZE+1]; @@ -172,7 +178,8 @@ void* y_socket_send_file_for_all_nodes(void* arg){ for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ //memset(tempAddr, 0, BUF_SIZE+1); c_af=(local_list_current->value).addr.ss_family; - if(c_af==AF_INET){ +#if TEMP_ADDR + if(c_af==AF_INET){ if(NULL == inet_ntop(c_af, &(GET_IN_type_ADDR(&(local_list_current->value),)), tempAddr, BUF_SIZE/*(argSock->local_list_current->value).addr_len*/)){ @@ -185,6 +192,7 @@ void* y_socket_send_file_for_all_nodes(void* arg){ fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno); } } +#endif #if 0 off_t offset = 0; ssize_t ret_sendfile ; @@ -204,11 +212,16 @@ void* y_socket_send_file_for_all_nodes(void* arg){ retread /*len_msgRet*/ ){ - fprintf(stderr, "Error sending response to %s\n",tempAddr); - }else - printf("debug: sending response to < %s >",tempAddr); -#endif +#if TEMP_ADDR + fprintf(stderr, "Error sending response to %s\n",tempAddr); +#endif + }else{ +#if TEMP_ADDR + printf("debug: sending response to < %s >",tempAddr); +#endif + } } +#endif //memset(buf_send, 0, BUF_SIZE+1); } @@ -221,6 +234,7 @@ void y_socket_get_fds(struct pollfd * fds, char * port, char * addrDistant){ fds[v4].fd=-1; fds[v4].events = POLLIN; fds[v6].fd=-1; fds[v6].events = POLLIN; + fds[2].fd=0; fds[2].events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI ; struct addrinfo hints, *result, *rp; int status; @@ -366,7 +380,7 @@ void *y_socket_poll_fds(void *arg){ //socklen_t len_peer_addr; y_NODE_T node; int af, status; - ssize_t nread; + ssize_t nread, buf_len; char buf[BUF_SIZE]; struct main_list_y_ptr_STRING *m_str=create_var_list_y_ptr_STRING(); @@ -374,12 +388,13 @@ void *y_socket_poll_fds(void *arg){ // char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100]; // int len_msgRet; - +// I had to initialize all attribute of addr to avoid error uninitialized value with valgrind, for example "sin6_flowinfo" in sockaddr_in6 + memset(&(node.addr), 0, sizeof(struct sockaddr_storage)); node.addr_len = sizeof(struct sockaddr_storage); for(;check_y_socket_go_on(argSock);){ printf("poll: wait events\n"); - status = poll(fds, nbIpVersion, -1); + status = poll(fds, nbIpVersion + 1, -1); if(status <= 0){ if(status == -1 && errno != EINTR){ perror("poll"); @@ -474,6 +489,73 @@ void *y_socket_poll_fds(void *arg){ /// } } + // stdin poll + if(fds[2].revents){// && POLLIN + //pollEventRec = fds[1].events; + //printf("fd = %d\n event=%d\n\n",fds[1].fd,pollEventRec); + //fds[1].events = 0; + puts("Saisie du message : "); + memset(buf, 0, sizeof buf); + //scanf(" %"xstr(BUF_SIZE)"[^\n]%*c", buf); + buf_len = read(0,buf,BUF_SIZE); + printf("message saisi : %s\n len = %ld\n",buf, buf_len); + if(buf_len>6){ +#if 1 + char cmd[BUF_SIZE], dst_addr[BUF_SIZE], msg_buf[BUF_SIZE]; + int index_buf=0, index_str=0; + for(; buf[index_buf]!=' '; ++index_buf){ + cmd[index_str++]=buf[index_buf]; + } + cmd[index_str]='\0'; + printf("debug : index_str= %d; cmd=[%s]\n",index_str, cmd); + + index_str=0; + while(buf[index_buf]==' '){++index_buf;} + for(; buf[index_buf]!=' '; ++index_buf){ + dst_addr[index_str++]=buf[index_buf]; + } + dst_addr[index_str]='\0'; + while(buf[index_buf]==' '){++index_buf;} + index_str=0; + for(; buf[index_buf]!='\n'; ++index_buf) + msg_buf[index_str++]=buf[index_buf]; + msg_buf[index_str++]='\0'; + + printf("debug : index_str=%d, dst_addr=[%s]\n", index_str, dst_addr); +#endif + + + if(strncmp(cmd, "sendto", 6)==0){ + printf("debug : sendto match, dst_addr=[%s]\n", dst_addr); + if(set_addr_y_NODE_T(&node, dst_addr)){ + printf("debug : set_addr_y_NODE_T done\n"); + set_str_port_y_NODE_T(&node, argSock->port); + update_nodes(node, argSock->nodes); + af=(node.addr.ss_family == AF_INET6); + + + printf("debug : af = AF_INET=%d, af = AF_INET6=%d, vs af=[%d]\n",AF_INET, AF_INET6, af); + + if(sendto(fds[af].fd, /*buf+index_buf , buf_len-index_buf,*/ + msg_buf, index_str, + 0, + (struct sockaddr*)(&(node.addr)), node.addr_len) == -1){ + printf("message erreur sendto : %s\n\n",buf); + perror("sendto:"); + close(fds[af].fd); + return NULL; + } + char dddnn[56]; + put_y_NODE_T_in_string(&node, dddnn); + printf("debug: sendto : %s: msg :%s\n\n",dddnn, buf+index_buf); + + + } + } + } + + } + #if 0 // printf("nread = %ld: buf=%s\nlen_buf=%ld\ncmp=%d\n",nread,buf,strlen(buf),strncmp(buf,"SHUTDOWN SERVER",15));