diff --git a/y_socket_t/include/y_socket_t/y_file_handler.h b/y_socket_t/include/y_socket_t/y_file_handler.h index 1066af6..bf5a1b6 100644 --- a/y_socket_t/include/y_socket_t/y_file_handler.h +++ b/y_socket_t/include/y_socket_t/y_file_handler.h @@ -11,23 +11,27 @@ #include "json_t/json_t.h" - +#include "list_t/list_t.h" void fileNameDateScore(char* filename, char * pre, char* post,size_t score); struct arg_send_file{ struct pollfd *fds; struct main_list_y_NODE_T *nodes; + y_NODE_T node; char * filename; + struct main_list_y_ptr_HEADER_T *m_ok_head_l_t; }; void* y_socket_send_file_for_all_nodes(void* arg); +void* y_socket_send_file_for_node(void* arg); enum cmd_type { cmd_update_kill, cmd_update_standby, cmd_update_wakeup, cmd_post_file, + cmd_post_ok, cmd_post_var, cmd_get_file, cmd_get_var, @@ -53,6 +57,7 @@ typedef struct header_t { enum cmd_type cmd_t; // size_t seq; char eof; +// char ok; // void *content; size_t size_nameid; char * nameid;/* containerid: filename_src_dst_tm */ @@ -67,6 +72,7 @@ GEN_HEAD_PTR_LIST(y_ptr_HEADER_T) size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node); //void receve_from_node(struct pollfd *fds, char *msg, size_t count); -void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename); +void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /* char * srcAddr*/, char *filename); +long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ); #endif /*Y_FILE_HANDLER_T_H__C*/ diff --git a/y_socket_t/include/y_socket_t/y_list_string.h b/y_socket_t/include/y_socket_t/y_list_string.h index aa79bba..4b2e2ba 100644 --- a/y_socket_t/include/y_socket_t/y_list_string.h +++ b/y_socket_t/include/y_socket_t/y_list_string.h @@ -21,5 +21,6 @@ GEN_HEAD_PTR_LIST(y_ptr_STRING) size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr); size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr); +struct main_list_y_ptr_STRING * split_str_to_main_list_y_ptr_STRING(char *str_org, char sep, size_t limit_size_str_org); #endif /* Y_PTR_STRING_T_H__C */ diff --git a/y_socket_t/src/y_socket_t/y_file_handler.c b/y_socket_t/src/y_socket_t/y_file_handler.c index 1934765..51b1045 100644 --- a/y_socket_t/src/y_socket_t/y_file_handler.c +++ b/y_socket_t/src/y_socket_t/y_file_handler.c @@ -2,6 +2,8 @@ #include "y_socket_t/y_file_handler.h" +#define TTL_SOCKDRAM 10 + //#include "y_socket_t/y_node_t.h" GEN_LIST_ALL(y_ptr_MSG_CONTENT_T) @@ -73,11 +75,76 @@ int funcCmp_y_ptr_HEADER_T(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){ }else return 1; } +int funcCmp_y_ptr_HEADER_T_fn_nameid_mask(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){ + if(h1==NULL || h2==NULL) return -1; + struct main_list_y_ptr_STRING * m_h1_nameid = split_str_to_main_list_y_ptr_STRING(h1->nameid,'_', h1->size_nameid); + struct main_list_y_ptr_STRING * m_h2_nameid = split_str_to_main_list_y_ptr_STRING(h2->nameid,'_', h2->size_nameid); + + //int count_match = 0; + struct main_list_TYPE_SIZE_T * m_index_not_match = create_var_list_TYPE_SIZE_T(); + int ret = 0; + if(m_h1_nameid->size != m_h2_nameid->size) { + ret = -2; + }else{ + for(struct list_y_ptr_STRING *l_h1_ = m_h1_nameid->end_list, *l_h2_ = m_h2_nameid->end_list; l_h1_ && l_h2_; l_h1_ = l_h1_->preview, l_h2_=l_h2_->preview){ + if((l_h1_->index >= m_h1_nameid->size - 2) || (l_h1_->index < m_h1_nameid->size - 3)){ + if(strcmp(l_h1_->value->buf, l_h2_->value->buf )==0){ + //++count_match ; + } + else{ + push_back_list_TYPE_SIZE_T(m_index_not_match, l_h1_->index); + } + } + + } + } + + if(m_index_not_match->size) ret = -1; + else ret = 0; + + + purge_ptr_type_list_y_ptr_STRING(m_h1_nameid); + purge_ptr_type_list_y_ptr_STRING(m_h2_nameid); + free_all_var_list_TYPE_SIZE_T(m_index_not_match); + printf("check_if_in_ok_header_l_ ret=%d, %s ns %s\n",ret,h1->nameid, h2->nameid); + return ret; +} + + +long check_if_in_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok ); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T_fn_nameid_mask); + free_y_ptr_HEADER_T(current_header); + + if(l_ocate_header){ + return 1; + } + return 0; +} + +long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok ); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); + if(l_ocate_header){ + free_y_ptr_HEADER_T(current_header); + printf("debug: already in m_ok_head_l_t"); + return -1; // already in list + + } + else{ + push_back_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header); + printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_ok_head_l_t->size); + return m_ok_head_l_t->size; + } +} + #define TEST_DUPLICATE_SEQ()\ if(temp_curr->value->seq == cnt->seq){\ printf("debug: index_nearest_seq=%ld: seq equal: doublon problem ? seq:%ld appuyer sur une touche\n", index_nearest_seq, cnt->seq);\ free_y_ptr_MSG_CONTENT_T(cnt); \ - getchar();\ + /*getchar();*/\ return -2;\ } @@ -376,6 +443,7 @@ char * time_id(){ //return filename; } + #if 0 struct arg_send_file{ struct pollfd *fds; @@ -404,9 +472,118 @@ size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node) { return ret_len; } +/// +/// +/// + + +#define TEMP_ADDR 1 + +/// +void* y_socket_send_file_for_node(void* arg){ + struct arg_send_file *argS=(struct arg_send_file*)arg; + + struct pollfd *fds=argS->fds; + y_NODE_T node=argS->node; + char * filename=argS->filename; +#if TEMP_ADDR + char tempAddr[64]; +#endif + int c_af; +// char host[NI_MAXHOST], service[NI_MAXSERV]; + char buf_send[BUF_SIZE+1]; + int fd_file; + int retread; +#if 0 + int status = getnameinfo((struct sockaddr*)&(node.addr), node.addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST); + if(status) + // printf("debug: status ==0 : success: Received successfully from %s:%s\n", host,service); + // else + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(status)); + + + if(NULL == search_node_in_list_y_NODE_T(nodes, node)) + push_back_list_y_NODE_T(nodes, node); + +#endif + + size_t seq = 0;//, len_buf_header=0, + size_t len_local_header_=0; + char * timeid = time_id(); + char nameid[BUF_SIZE/2]; + struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1); + char * name_f=m_str_name_f->end_list->value->buf; + // char srcAddr[64]; + // set_tempAddr_from_node(srcAddr, node); + + set_tempAddr_from_node(tempAddr, node); + c_af=(node).addr.ss_family; + + sprintf(nameid, "%s_%s_%s_%s",name_f, tempAddr, tempAddr, timeid); + +for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, nameid) == 0); ++tour_i){ + + fd_file = open( filename , O_RDONLY); + if(fd_file == -1){ + fprintf(stderr,"error opening file |%s| for reading\n",filename); + return NULL; + } + /* + if(tour_i==0) seq = 1; + else */ + seq=0; + + len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); + while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){ + buf_send[len_local_header_ + retread]='\0'; + if(sendto(fds[(c_af==AF_INET6)].fd, + buf_send, retread+len_local_header_, + 0, + (struct sockaddr*)&((node).addr), + (node).addr_len) != + retread + len_local_header_ + ){ + fprintf(stderr, "Error sending response to %s\n",tempAddr); + }else{ + printf("debug: sending response to < %s > seq=[%ld] ",tempAddr,seq); + } + + ++seq; + len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); + } + len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq, tempAddr,timeid); + if(sendto(fds[(c_af==AF_INET6)].fd, + buf_send, len_local_header_, + 0, + (struct sockaddr*)&((node).addr), + (node).addr_len) != + len_local_header_ + ){ + fprintf(stderr, "Error ending response to %s, len_buf_header=%ld\n", tempAddr, len_local_header_); + }else{ + printf("debug: ending response to < %s > [%ld] EOF",tempAddr,seq); + } + + close(fd_file); + printf("debug: fd=%d closed: filename=%s, for %s\n",fd_file,filename, tempAddr); + + size_t delay = 4000000; + printf("debug: wait %ld before checking, in tour:%d\n",delay, tour_i); + usleep(delay); +} + + + + free(timeid); + purge_ptr_type_list_y_ptr_STRING(m_str_name_f); + return NULL; +} + + + +/// /* */ -#define TEMP_ADDR 1 //void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename) void* y_socket_send_file_for_all_nodes(void* arg){ @@ -509,16 +686,19 @@ void record_buffer_to_file(void *arg){ } */ #if 1 -void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename ){ +void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /*char * srcAddr*/, char *filename ){ //printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count); + char srcAddr[64]; + set_tempAddr_from_node(srcAddr, node); - + struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1); + char * name_f=m_str_name_f->end_list->value->buf; //size_t size_m_str = 0; struct list_y_ptr_STRING * local_current_no_rec = m_str->begin_list; struct list_y_ptr_STRING * local_current; for(local_current = local_current_no_rec; local_current; local_current = local_current->next){ char *buf_loc = local_current->value->buf; - char nameid[BUF_SIZE]=""; + char nameid[BUF_SIZE/2]=""; size_t size_nameid=0; struct js_value * js_header_v = create_js_value(buf_loc,NULL); //struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v ); @@ -549,8 +729,15 @@ void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_l char *content = buf_loc+ length_js_header; size_t size_content = strlen(content);// js_header_v->length - length_js_header; enum cmd_type cmd_t = cmd_post_file; - size_nameid = sprintf(nameid, "%s_%s_%s_%s",filename,srcAddr, value_of_(js_dst_v)->type.string,value_of_(js_tm_v)->type.string); + //char *timeid = value_of_(js_tm_v)->type.string; +#if 0 + size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f /*filename*/, srcAddr, value_of_(js_dst_v)->type.string, timeid/*value_of_(js_tm_v)->type.string*/); +#endif + size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f, srcAddr, value_of_(js_dst_v)->type.string, value_of_(js_tm_v)->type.string); printf("debug: nameid = %s\n", nameid); + + //int intTimeid = atoi(timeid); + y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof); long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt); if(ret_app != -2){ @@ -565,9 +752,23 @@ void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_l struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index); free_y_ptr_HEADER_T(l_head_to_remove->value); free(l_head_to_remove); + // sendto srcAddr { "cmd" : "post ok nameid" } again ! + char buf[BUF_SIZE]; + size_t len_buf = sprintf(buf, "{ \"cmd\" : \"post ok %s\" }", nameid); + if(sendto(fds[(node.addr.ss_family==AF_INET6)].fd, buf, len_buf, 0, (struct sockaddr*)&((node).addr), (node).addr_len) != len_buf){ + fprintf(stderr, "Error sending ok %s to %s\n", nameid,srcAddr); + }else{ + printf("debug: sending OK %s to < %s > ",nameid,srcAddr); + } - } - } + + }/*else if(intTimeNow-intTimeid > TTL_SOCKDRAM){ + struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index); + free_y_ptr_HEADER_T(l_head_to_remove->value); + free(l_head_to_remove); + } + */ + } }else{ printf("debug: tm missing!"); @@ -613,6 +814,8 @@ void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_l m_str = NULL; } + purge_ptr_type_list_y_ptr_STRING(m_str_name_f); + //free(timeNow); } #endif diff --git a/y_socket_t/src/y_socket_t/y_list_string.c b/y_socket_t/src/y_socket_t/y_list_string.c index 2425c04..8144cb6 100644 --- a/y_socket_t/src/y_socket_t/y_list_string.c +++ b/y_socket_t/src/y_socket_t/y_list_string.c @@ -67,7 +67,7 @@ struct main_list_y_ptr_STRING * split_str_to_main_list_y_ptr_STRING(char *str_or char *cur_str = str_org; size_t size_buf=0; size_t size_org_cur = 0; - while(cur_str && (cur_str-str_org < limit_size_str_org)){ + while(*cur_str && (cur_str-str_org < limit_size_str_org)){ if(*cur_str != sep) ++cur_str; else{ size_buf = cur_str - buf; diff --git a/y_socket_t/src/y_socket_t/y_socket_t.c b/y_socket_t/src/y_socket_t/y_socket_t.c index 7bfc1d6..e8aae60 100644 --- a/y_socket_t/src/y_socket_t/y_socket_t.c +++ b/y_socket_t/src/y_socket_t/y_socket_t.c @@ -346,6 +346,7 @@ struct arg_handler_{ struct y_socket_t *sock; struct argWorker *argw ; struct main_list_y_ptr_HEADER_T *m_head_l_t; + struct main_list_y_ptr_HEADER_T *m_ok_head_l_t; }; //void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock) @@ -387,11 +388,14 @@ void* y_socket_handler_(void *arg){ struct arg_send_file *argS=malloc(sizeof(struct arg_send_file)); argS->fds=fds; argS->nodes=nodes; + argS->node=argH->node; argS->filename=filename; + argS->m_ok_head_l_t=argH->m_ok_head_l_t; push_back_list_TYPE_PTR(argw->list_arg, argS); push_back_list_TYPE_PTR(argw->list_arg, filename); struct y_task_t task_send={ - .func=y_socket_send_file_for_all_nodes, + //.func=y_socket_send_file_for_all_nodes, + .func=y_socket_send_file_for_node, .arg=argS, .status=TASK_PENDING, }; @@ -413,14 +417,15 @@ void* y_socket_handler_(void *arg){ if(strncmp(buf+5,"file",4)==0){ char *filename = buf+10; //index_f=strcpy(filename, buf + 10); - int index_f = strlen(filename); + /* + int index_f = strlen(filename); printf("debug: receve_from_node : file: %s\n",filename); for(--index_f; index_f>=0;--index_f){ if(filename[index_f]=='/') { ++index_f; break; } - } + }*/ #if 0 //struct list_y_ptr_STRING * last_record_=NULL; @@ -462,9 +467,9 @@ void* y_socket_handler_(void *arg){ #else struct main_list_y_ptr_HEADER_T *m_head_l_t = argH->m_head_l_t; - char srcAddr[BUF_SIZE]; - set_tempAddr_from_node(srcAddr, argH->node); - receve_from_node(m_head_l_t, m_str,srcAddr, filename + index_f); + //char srcAddr[BUF_SIZE]; + //set_tempAddr_from_node(srcAddr, argH->node); + receve_from_node(fds, m_head_l_t, m_str,argH->node/* srcAddr*/, filename /*+ index_f*/); m_str = NULL; #endif /* @@ -474,7 +479,10 @@ void* y_socket_handler_(void *arg){ */ // kill_all_workers(argw); // printf("debug: kill_all\n"); - } + }else if(strncmp(buf+5,"ok",2)==0){ + char *nameid = buf+8; + y_append_to_ok_header_l_(argH->m_ok_head_l_t,nameid ); + } } @@ -567,7 +575,7 @@ void handle_input_kbd(char *buf, ssize_t buf_len ,void *arg){ } #endif -void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){ +void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){ struct y_socket_t * argSock = (struct y_socket_t*)arg; struct pollfd *fds = argSock->fds; @@ -597,6 +605,7 @@ void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_head_l_t,struct ma ptr_argHandl->node=node; ptr_argHandl->argw=workers->begin_list->value->arg; ptr_argHandl->m_head_l_t=m_head_l_t; + ptr_argHandl->m_ok_head_l_t=m_ok_head_l_t; push_back_list_TYPE_PTR(list_arg, ptr_argHandl); struct y_task_t task_handl = { @@ -681,6 +690,8 @@ void *y_socket_poll_fds(void *arg){ char buf[BUF_SIZE+1]; struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING(); struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T(); + struct main_list_y_ptr_HEADER_T *m_ok_head_l_t = create_var_list_y_ptr_HEADER_T(); + // char *temp_all_buf=NULL; // char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100]; @@ -768,7 +779,7 @@ void *y_socket_poll_fds(void *arg){ } if(m_str){ printf("debug: call handle_buf_socket_rec\n"); - handle_buf_socket_rec(m_head_l_t,m_str, node, workers, argx, list_arg, arg); + handle_buf_socket_rec(m_ok_head_l_t,m_head_l_t,m_str, node, workers, argx, list_arg, arg); m_str=NULL; } @@ -876,7 +887,9 @@ void *y_socket_poll_fds(void *arg){ kill_all_workers(workers->begin_list->value->arg); printf("debug: kill all done\n"); purge_ptr_type_list_y_ptr_HEADER_T(m_head_l_t); - printf("debug: purge_ptr_type_list_y_ptr_HEADER_T done\n"); + printf("debug: purge_ptr_type_list_y_ptr_HEADER_T m_head_l_t done\n"); + purge_ptr_type_list_y_ptr_HEADER_T(m_ok_head_l_t); + printf("debug: purge_ptr_type_list_y_ptr_HEADER_T m_ok_head_l_t done\n"); ///// ///// /////