y_socket: add poll stdin, debug node init

This commit is contained in:
2025-07-28 23:03:36 +02:00
parent 6d5e060d6c
commit 65d4d8f67c
4 changed files with 139 additions and 18 deletions
+1 -1
View File
@@ -855,7 +855,7 @@ TEST(tensorProd_vs2d ){
tensorProd_TYPE_FLOAT(&M,M0,M1);
//cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,24,24);
//cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,32,32);
cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,64,16);
cl2d_tensorProd_TYPE_FLOAT(&Mn,M0,M1,16,16);
LOG("M->dim->rank = %ld\n",M->dim->rank);
//print_tensor_float(M,"M");
+3
View File
@@ -27,5 +27,8 @@ int y_NODE_T_cmp(y_NODE_T nodeA, y_NODE_T nodeB);
struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *listNodes, y_NODE_T node);
int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr);
void set_port_y_NODE_T(y_NODE_T *node, int port);
void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port);
const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst);
#endif /* __Y_NODE_T_H__C */
+39 -3
View File
@@ -54,6 +54,7 @@ struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *l
}
int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){
//memset(&(node->addr), 0, sizeof(struct sockaddr_storage));
int af = AF_INET, ret = -2;
for(int i=0; i<strlen(addrStr); ++i){
if(addrStr[i]=='.')
@@ -66,9 +67,11 @@ int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){
node->addr.ss_family = af;
if(af==AF_INET)
ret = inet_pton(af, addrStr, &(GET_IN_type_ADDR(&(node->addr),)));
else if(af == AF_INET6)
else if(af == AF_INET6){
//((struct sockaddr_in6*)(&(node->addr)))->sin6_flowinfo = 0;
ret = inet_pton(af, addrStr, (GET_IN_type_ADDR(&(node->addr), 6)));
}
return ret;
}
@@ -77,10 +80,43 @@ int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){
void set_port_y_NODE_T(y_NODE_T *node, int port){
int af = node->addr.ss_family;
if(af==AF_INET)
((struct sockaddr_in*)(&(node->addr)))->sin_port = port;
((struct sockaddr_in*)(&(node->addr)))->sin_port = htons(port);
else if(af == AF_INET6)
((struct sockaddr_in6*)(&(node->addr)))->sin6_port = port;
((struct sockaddr_in6*)(&(node->addr)))->sin6_port = htons(port);
}
void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port){
int port = atoi(str_port);
set_port_y_NODE_T(node, port);
}
const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst){
#if 0
char host[NI_MAXHOST], service[NI_MAXSERV];
int status = getnameinfo((struct sockaddr*)&(node->addr), node->addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST);
if(status==0)
return NULL;
// printf("debug: status ==0 : success: Received successfully from %s:%s\n", host,service);
// else
printf("getnameinfo: %s\n", gai_strerror(status));
sprintf(dst,"%s:[%s]",host,service);
#endif
char temp_addr[INET6_ADDRSTRLEN];
if(node->addr.ss_family == AF_INET){
struct sockaddr_in *sinaddrv4 = ((struct sockaddr_in*)&(node->addr));
if(inet_ntop(node->addr.ss_family, &(sinaddrv4->sin_addr),
temp_addr, INET6_ADDRSTRLEN) == NULL){
return NULL;
}
sprintf(dst, "%s:[%d]",temp_addr, ntohs(sinaddrv4->sin_port ));
}else if(node->addr.ss_family == AF_INET6){
struct sockaddr_in6 *sinaddrv6 = ((struct sockaddr_in6*)&(node->addr));
if(inet_ntop(node->addr.ss_family, &(sinaddrv6->sin6_addr),
temp_addr, INET6_ADDRSTRLEN) == NULL){
return NULL;
}
sprintf(dst, "%s:[%d]",temp_addr, ntohs(((struct sockaddr_in6*)&(node->addr))->sin6_port ));
}
return dst;
}
+91 -9
View File
@@ -56,10 +56,10 @@ size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y
struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){
struct y_socket_t *sock_temp=malloc(sizeof(struct y_socket_t));
if(size_fds>=nbIpVersion)
if(size_fds>=nbIpVersion+1)
sock_temp->size_fds = size_fds;
else
sock_temp->size_fds = nbIpVersion;
sock_temp->size_fds = nbIpVersion+1;
sock_temp->fds = malloc(sock_temp->size_fds * sizeof(struct pollfd));
sock_temp->port=port;
@@ -74,7 +74,7 @@ struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers)
return sock_temp;
}
struct y_socket_t * y_socket_create_(char * port){
return y_socket_create(port, 2, 2);
return y_socket_create(port, 3, 2);
}
void y_socket_free(struct y_socket_t *socket){
free(socket->fds);
@@ -106,6 +106,9 @@ void* update_nodes(void* arg)
struct main_list_y_NODE_T *nodes=argU->nodes;
#endif
#if 0
char host[NI_MAXHOST], service[NI_MAXSERV];
int status = getnameinfo((struct sockaddr*)&(node.addr), node.addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST);
if(status)
@@ -113,7 +116,7 @@ void* update_nodes(void* arg)
// else
fprintf(stderr, "getnameinfo: %s\n", gai_strerror(status));
#endif
if(NULL == search_node_in_list_y_NODE_T(nodes, node))
push_back_list_y_NODE_T(nodes, node);
@@ -126,6 +129,8 @@ struct arg_send_file{
char * filename;
};
#define TEMP_ADDR 1
//void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename)
void* y_socket_send_file_for_all_nodes(void* arg){
struct arg_send_file *argS=(struct arg_send_file*)arg;
@@ -133,8 +138,9 @@ void* y_socket_send_file_for_all_nodes(void* arg){
struct pollfd *fds=argS->fds;
struct main_list_y_NODE_T *nodes=argS->nodes;
char * filename=argS->filename;
#if TEMP_ADDR
char tempAddr[BUF_SIZE+1];
#endif
int c_af;
// char host[NI_MAXHOST], service[NI_MAXSERV];
char buf_send[BUF_SIZE+1];
@@ -172,6 +178,7 @@ void* y_socket_send_file_for_all_nodes(void* arg){
for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){
//memset(tempAddr, 0, BUF_SIZE+1);
c_af=(local_list_current->value).addr.ss_family;
#if TEMP_ADDR
if(c_af==AF_INET){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(local_list_current->value),)),
@@ -185,6 +192,7 @@ void* y_socket_send_file_for_all_nodes(void* arg){
fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno);
}
}
#endif
#if 0
off_t offset = 0;
ssize_t ret_sendfile ;
@@ -204,11 +212,16 @@ void* y_socket_send_file_for_all_nodes(void* arg){
retread
/*len_msgRet*/
){
#if TEMP_ADDR
fprintf(stderr, "Error sending response to %s\n",tempAddr);
}else
#endif
}else{
#if TEMP_ADDR
printf("debug: sending response to < %s >",tempAddr);
#endif
}
}
#endif
//memset(buf_send, 0, BUF_SIZE+1);
}
@@ -221,6 +234,7 @@ void y_socket_get_fds(struct pollfd * fds, char * port, char * addrDistant){
fds[v4].fd=-1; fds[v4].events = POLLIN;
fds[v6].fd=-1; fds[v6].events = POLLIN;
fds[2].fd=0; fds[2].events = POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI ;
struct addrinfo hints, *result, *rp;
int status;
@@ -366,7 +380,7 @@ void *y_socket_poll_fds(void *arg){
//socklen_t len_peer_addr;
y_NODE_T node;
int af, status;
ssize_t nread;
ssize_t nread, buf_len;
char buf[BUF_SIZE];
struct main_list_y_ptr_STRING *m_str=create_var_list_y_ptr_STRING();
@@ -374,12 +388,13 @@ void *y_socket_poll_fds(void *arg){
// char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100];
// int len_msgRet;
// I had to initialize all attribute of addr to avoid error uninitialized value with valgrind, for example "sin6_flowinfo" in sockaddr_in6
memset(&(node.addr), 0, sizeof(struct sockaddr_storage));
node.addr_len = sizeof(struct sockaddr_storage);
for(;check_y_socket_go_on(argSock);){
printf("poll: wait events\n");
status = poll(fds, nbIpVersion, -1);
status = poll(fds, nbIpVersion + 1, -1);
if(status <= 0){
if(status == -1 && errno != EINTR){
perror("poll");
@@ -474,6 +489,73 @@ void *y_socket_poll_fds(void *arg){
///
}
}
// stdin poll
if(fds[2].revents){// && POLLIN
//pollEventRec = fds[1].events;
//printf("fd = %d\n event=%d\n\n",fds[1].fd,pollEventRec);
//fds[1].events = 0;
puts("Saisie du message : ");
memset(buf, 0, sizeof buf);
//scanf(" %"xstr(BUF_SIZE)"[^\n]%*c", buf);
buf_len = read(0,buf,BUF_SIZE);
printf("message saisi : %s\n len = %ld\n",buf, buf_len);
if(buf_len>6){
#if 1
char cmd[BUF_SIZE], dst_addr[BUF_SIZE], msg_buf[BUF_SIZE];
int index_buf=0, index_str=0;
for(; buf[index_buf]!=' '; ++index_buf){
cmd[index_str++]=buf[index_buf];
}
cmd[index_str]='\0';
printf("debug : index_str= %d; cmd=[%s]\n",index_str, cmd);
index_str=0;
while(buf[index_buf]==' '){++index_buf;}
for(; buf[index_buf]!=' '; ++index_buf){
dst_addr[index_str++]=buf[index_buf];
}
dst_addr[index_str]='\0';
while(buf[index_buf]==' '){++index_buf;}
index_str=0;
for(; buf[index_buf]!='\n'; ++index_buf)
msg_buf[index_str++]=buf[index_buf];
msg_buf[index_str++]='\0';
printf("debug : index_str=%d, dst_addr=[%s]\n", index_str, dst_addr);
#endif
if(strncmp(cmd, "sendto", 6)==0){
printf("debug : sendto match, dst_addr=[%s]\n", dst_addr);
if(set_addr_y_NODE_T(&node, dst_addr)){
printf("debug : set_addr_y_NODE_T done\n");
set_str_port_y_NODE_T(&node, argSock->port);
update_nodes(node, argSock->nodes);
af=(node.addr.ss_family == AF_INET6);
printf("debug : af = AF_INET=%d, af = AF_INET6=%d, vs af=[%d]\n",AF_INET, AF_INET6, af);
if(sendto(fds[af].fd, /*buf+index_buf , buf_len-index_buf,*/
msg_buf, index_str,
0,
(struct sockaddr*)(&(node.addr)), node.addr_len) == -1){
printf("message erreur sendto : %s\n\n",buf);
perror("sendto:");
close(fds[af].fd);
return NULL;
}
char dddnn[56];
put_y_NODE_T_in_string(&node, dddnn);
printf("debug: sendto : %s: msg :%s\n\n",dddnn, buf+index_buf);
}
}
}
}
#if 0
// printf("nread = %ld: buf=%s\nlen_buf=%ld\ncmp=%d\n",nread,buf,strlen(buf),strncmp(buf,"SHUTDOWN SERVER",15));