y_socket: debug recvfrom buf ending, prepare handling varoable

This commit is contained in:
2025-10-11 21:13:45 +02:00
parent 6aa1e9b245
commit 6527394785
3 changed files with 133 additions and 86 deletions
@@ -66,6 +66,18 @@ size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y
#endif /* y_ptr_STRING */ #endif /* y_ptr_STRING */
struct y_variable{
char * name;
void * value;
};
typedef struct y_variable * y_ptr_VARIABLE;
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_value);
GENERATE_LIST_ALL(y_ptr_VARIABLE)
GEN_HEAD_PTR_LIST(y_ptr_VARIABLE)
struct y_socket_t{ struct y_socket_t{
struct pollfd *fds; struct pollfd *fds;
size_t size_fds; size_t size_fds;
+87 -67
View File
@@ -752,7 +752,7 @@ void record_buffer_to_file(void *arg){
} }
*/ */
#if 1
void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /*char * srcAddr*/, char *filename ){ void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /*char * srcAddr*/, char *filename ){
//printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count); //printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count);
char srcAddr[64]; char srcAddr[64];
@@ -772,88 +772,109 @@ void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_hea
//printf("debug: index=[%ld] \n BEGIN file ***\n%s\n END\n",local_current->index,buf_loc); //printf("debug: index=[%ld] \n BEGIN file ***\n%s\n END\n",local_current->index,buf_loc);
if(js_header_v){ if(js_header_v){
struct js_value *js_cmd = get_js_value_of_key("cmd", js_header_v );
if(js_cmd && js_cmd->type.object.value->code_type == jstype_string){
char * buf_cmd_v = js_cmd->type.object.value->type.string;
struct js_value *js_seq_v = get_js_value_of_key("seq", js_header_v ); struct js_value *js_seq_v = get_js_value_of_key("seq", js_header_v );
char eof=0; char eof=0;
if(js_seq_v){ if(js_seq_v){
if(js_seq_v->type.object.value->code_type == jstype_number){ if(js_seq_v->type.object.value->code_type == jstype_number){
size_t seq_local = (long)(js_seq_v->type.object.value->type.number); size_t seq_local = (long)(js_seq_v->type.object.value->type.number);
printf("debug: \n*********seq_local=%ld ***\n\n",seq_local); printf("debug: \n*********seq_local=%ld ***\n\n",seq_local);
struct js_value *js_eof_v = get_js_value_of_key("EOF", js_header_v ); struct js_value *js_eof_v = get_js_value_of_key("EOF", js_header_v );
if(js_eof_v){ if(js_eof_v){
// size_m_str = seq_local; // size_m_str = seq_local;
eof=1; eof=1;
printf("debug: \n****************************end of file ***\n\n"); printf("debug: \n****************************end of file ***\n\n");
//printf("debug: \n****************************end of file ***\n%s\n**********************************\n",buf_loc); //printf("debug: \n****************************end of file ***\n%s\n**********************************\n",buf_loc);
} }
struct js_value *js_dst_v = get_js_value_of_key("dst", js_header_v ); struct js_value *js_dst_v = get_js_value_of_key("dst", js_header_v );
if(js_dst_v){ if(js_dst_v){
struct js_value *js_tm_v = get_js_value_of_key("tm", js_header_v ); struct js_value *js_tm_v = get_js_value_of_key("tm", js_header_v );
if(js_tm_v){ if(js_tm_v){
size_t length_js_header = js_org_str_length(js_header_v); size_t length_js_header = js_org_str_length(js_header_v);
char *content = buf_loc+ length_js_header; char *content = buf_loc+ length_js_header;
size_t size_content = strlen(content);// js_header_v->length - length_js_header; size_t size_content = strlen(content);// js_header_v->length - length_js_header;
enum cmd_type cmd_t = cmd_post_file; enum cmd_type cmd_t = cmd_post_file;
//char *timeid = value_of_(js_tm_v)->type.string; //char *timeid = value_of_(js_tm_v)->type.string;
#if 0 #if 0
size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f /*filename*/, srcAddr, value_of_(js_dst_v)->type.string, timeid/*value_of_(js_tm_v)->type.string*/); size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f /*filename*/, srcAddr, value_of_(js_dst_v)->type.string, timeid/*value_of_(js_tm_v)->type.string*/);
#endif #endif
size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f, srcAddr, value_of_(js_dst_v)->type.string, value_of_(js_tm_v)->type.string); size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f, srcAddr, value_of_(js_dst_v)->type.string, value_of_(js_tm_v)->type.string);
printf("debug: nameid = %s\n", nameid); printf("debug: nameid = %s\n", nameid);
//int intTimeid = atoi(timeid); //int intTimeid = atoi(timeid);
y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof); y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof);
long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt); long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt);
if(ret_app != -2){ if(ret_app != -2){
struct list_y_ptr_HEADER_T * local_header = check_if_all_contents_done_from_headers(m_head_l_t, y_msg_cnt); struct list_y_ptr_HEADER_T * local_header = check_if_all_contents_done_from_headers(m_head_l_t, y_msg_cnt);
if(local_header){ if(local_header){
struct main_list_y_ptr_MSG_CONTENT_T *m_content_l = local_header->value->m_content_l; struct main_list_y_ptr_MSG_CONTENT_T *m_content_l = local_header->value->m_content_l;
struct list_y_ptr_MSG_CONTENT_T * tmpCnt_l = m_content_l->begin_list; struct list_y_ptr_MSG_CONTENT_T * tmpCnt_l = m_content_l->begin_list;
while(tmpCnt_l){ if(strncmp(buf_cmd_v+5,"file",4)==0){
printf("debug: nameid:%s seq = %ld eof %d\n\n%s\n",tmpCnt_l->value->nameid, tmpCnt_l->value->seq, tmpCnt_l->value->eof, tmpCnt_l->value->content); int fd_file ;
tmpCnt_l=tmpCnt_l->next; if((fd_file = open(tmpCnt_l->value->nameid, O_WRONLY | O_CREAT ,
} S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) == -1){
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index); fprintf(stderr,"erreur write %s\n",tmpCnt_l->value->nameid);
free_y_ptr_HEADER_T(l_head_to_remove->value); break;//return NULL;
free(l_head_to_remove); }
// sendto srcAddr { "cmd" : "post ok nameid" } again !
char buf[BUF_SIZE];
size_t len_buf = sprintf(buf, "{ \"cmd\" : \"post ok %s\" }", nameid);
if(sendto(fds[(node.addr.ss_family==AF_INET6)].fd, buf, len_buf, 0, (struct sockaddr*)&((node).addr), (node).addr_len) != len_buf){
fprintf(stderr, "Error sending ok %s to %s\n", nameid,srcAddr);
}else{
printf("debug: sending OK %s to < %s > ",nameid,srcAddr);
}
}/*else if(intTimeNow-intTimeid > TTL_SOCKDRAM){ while(tmpCnt_l){
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index); write(fd_file, tmpCnt_l->value->content, tmpCnt_l->value->size_content);
free_y_ptr_HEADER_T(l_head_to_remove->value); printf("debug: nameid:%s seq = %ld eof %d\n\n%s\n",tmpCnt_l->value->nameid, tmpCnt_l->value->seq, tmpCnt_l->value->eof, tmpCnt_l->value->content);
free(l_head_to_remove); tmpCnt_l=tmpCnt_l->next;
} }
*/ close(fd_file);
}else if(strncmp(buf_cmd_v+5,"var",3)==0){
}
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index);
free_y_ptr_HEADER_T(l_head_to_remove->value);
free(l_head_to_remove);
// sendto srcAddr { "cmd" : "post ok nameid" } again !
char buf[BUF_SIZE];
size_t len_buf = sprintf(buf, "{ \"cmd\" : \"post ok %s\" }", nameid);
if(sendto(fds[(node.addr.ss_family==AF_INET6)].fd, buf, len_buf, 0, (struct sockaddr*)&((node).addr), (node).addr_len) != len_buf){
fprintf(stderr, "Error sending ok %s to %s\n", nameid,srcAddr);
}else{
printf("debug: sending OK %s to < %s > ",nameid,srcAddr);
}
}/*else if(intTimeNow-intTimeid > TTL_SOCKDRAM){
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index);
free_y_ptr_HEADER_T(l_head_to_remove->value);
free(l_head_to_remove);
}
*/
}
}else{
printf("debug: tm missing!");
} }
}
}else{ else{
printf("debug: tm missing!"); printf("debug: dst missing!");
} }
} }
else{ else{
printf("debug: dst missing!"); printf("debug: \n SSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ type:%d \n",js_seq_v->type.object.value->code_type);
} }
}else{
printf("debug: \n NNNNNNNNNNNNNNNNOOOOOOOOOOOOOSSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ :type header : %d \n",js_header_v->code_type);
} }
else{
printf("debug: \n SSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ type:%d \n",js_seq_v->type.object.value->code_type);
}
}else{ }else{
printf("debug: \n NO CMD :type header : %d \n",js_header_v->code_type);
printf("debug: \n NNNNNNNNNNNNNNNNOOOOOOOOOOOOOSSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ :type header : %d \n",js_header_v->code_type);
} }
free_js_value(js_header_v); free_js_value(js_header_v);
}else{ }else{
@@ -885,7 +906,6 @@ void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_hea
//free(timeNow); //free(timeNow);
} }
#endif
/* /*
char filename[500]; char filename[500];
int fd_file; int fd_file;
+31 -16
View File
@@ -5,6 +5,28 @@
//#include "y_socket_t/y_list_string.h" //#include "y_socket_t/y_list_string.h"
//#include "json_t/json_t.h" //#include "json_t/json_t.h"
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_value){
struct y_variable *variable=malloc(sizeof(struct y_variable));
size_t len_name = strlen(name);
variable->name=malloc(len_name+1);
variable->value=malloc(size_value);
if(name){
memcpy(variable->name, name, len_name+1);
if(name[len_name]!='\0')
variable->name[len_name]='\0';
}
return variable;
}
GEN_LIST_ALL(y_ptr_VARIABLE)
GEN_FUNC_PTR_LIST_FREE(y_ptr_VARIABLE){
free(arg->name);
free(arg->value);
free(arg);
}
const int af_array[nbIpVersion]={AF_INET, AF_INET6}; const int af_array[nbIpVersion]={AF_INET, AF_INET6};
struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){ struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){
@@ -288,23 +310,18 @@ void* y_socket_handler_(void *arg){
else if(strncmp(buf, "post", 4)==0){ else if(strncmp(buf, "post", 4)==0){
if(strncmp(buf+5,"file",4)==0){ if(strncmp(buf+5,"file",4)==0){
char *filename = buf+10; char *filename = buf+10;
struct main_list_y_ptr_HEADER_T *m_head_l_t = argH->m_head_l_t; receve_from_node(fds, argH->m_head_l_t, m_str,argH->node, filename );
//char srcAddr[BUF_SIZE]; m_str = NULL;
//set_tempAddr_from_node(srcAddr, argH->node);
receve_from_node(fds, m_head_l_t, m_str,argH->node/* srcAddr*/, filename /*+ index_f*/);
m_str = NULL;
/*
pthread_mutex_lock(sock->mut_go_on);
sock->go_on = 0;
pthread_mutex_unlock(sock->mut_go_on);
*/
// kill_all_workers(argw);
// printf("debug: kill_all\n");
}else if(strncmp(buf+5,"ok",2)==0){ }else if(strncmp(buf+5,"ok",2)==0){
char *nameid = buf+8; char *nameid = buf+8;
y_append_to_ok_header_l_(argH->m_ok_head_l_t,nameid ); y_append_to_ok_header_l_(argH->m_ok_head_l_t,nameid );
}else if(strncmp(buf+5,"var",3)==0){
char *var_nameid = buf+9;
receve_from_node(fds, argH->m_head_l_t, m_str,argH->node, var_nameid );
m_str = NULL;
} }
} }
} }
@@ -435,8 +452,7 @@ void *y_socket_poll_fds(void *arg){
while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0, while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0,
(struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){ (struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){
if(buf[nread-1]=='\n') //if(buf[nread-1]=='\n') buf[nread-1]='\0';
buf[nread-1]='\0';
buf[nread]='\0'; buf[nread]='\0';
y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread); y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread);
@@ -451,8 +467,7 @@ void *y_socket_poll_fds(void *arg){
if(nread == -1) if(nread == -1)
fprintf(stderr,"error recvfrom\n"); fprintf(stderr,"error recvfrom\n");
else if(nread >= 0 && nread < BUF_SIZE){ else if(nread >= 0 && nread < BUF_SIZE){
if(nread && buf[nread-1]=='\n' //if(nread && buf[nread-1]=='\n') buf[nread-1]='\0';
) buf[nread-1]='\0';
buf[nread]='\0'; buf[nread]='\0';
//printf("msg: %s\n",buf); //printf("msg: %s\n",buf);
y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread); y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread);