diff --git a/y_socket_t/Makefile b/y_socket_t/Makefile index 8a21afd..5668c31 100644 --- a/y_socket_t/Makefile +++ b/y_socket_t/Makefile @@ -59,7 +59,7 @@ $(PROJECT_LIB): $(OBJ) $(CC) -shared -o $@ $^ $(CFLAGS) -$(YSOCKSRC_O): $(YSOCKSRC) $(YNODESRC_O) $(YFILEHANDLERSRC_O) +$(YSOCKSRC_O): $(YSOCKSRC) $(YNODESRC_O) $(YFILEHANDLERSRC_O) $(WORKSRC_0) $(YTASKSRC_0) $(YJSONSRC_O) $(CC) -o $@ -c $< $(CFLAGS) $(YNODESRC_O): $(YNODESRC) $(YLISTSRC_O) @@ -68,7 +68,7 @@ $(YNODESRC_O): $(YNODESRC) $(YLISTSRC_O) $(YY_STRINGSRC_O): $(YY_STRINGSRC) $(YLISTSRC_O) $(CC) -o $@ -c $< $(CFLAGS) -$(YFILEHANDLERSRC_O): $(YFILEHANDLERSRC) $(YSOCKSRC_O) +$(YFILEHANDLERSRC_O): $(YFILEHANDLERSRC) $(YSOCKSRC_O) $(YNODESRC_O) $(WORKSRC_0) $(YTASKSRC_0) $(YY_STRINGSRC_O) $(YJSONSRC_O) $(CC) -o $@ -c $< $(CFLAGS) $(DEPS): diff --git a/y_socket_t/include/y_socket_t/y_file_handler.h b/y_socket_t/include/y_socket_t/y_file_handler.h index 4ea9bab..1066af6 100644 --- a/y_socket_t/include/y_socket_t/y_file_handler.h +++ b/y_socket_t/include/y_socket_t/y_file_handler.h @@ -1,7 +1,17 @@ +/*file: include/y_socket_t/y_file_handler.h */ #ifndef Y_FILE_HANDLER_T_H__C #define Y_FILE_HANDLER_T_H__C #include "y_socket_t/y_socket_t.h" +#include "y_socket_t/y_node_t.h" +#include "y_socket_t/y_list_string.h" + +#include "y_worker_t/y_worker_t.h" +#include "y_worker_t/y_task_t.h" + +#include "json_t/json_t.h" + + void fileNameDateScore(char* filename, char * pre, char* post,size_t score); @@ -12,7 +22,51 @@ struct arg_send_file{ }; void* y_socket_send_file_for_all_nodes(void* arg); -void receve_from_node(struct pollfd *fds, char *msg, size_t count); +enum cmd_type { + cmd_update_kill, + cmd_update_standby, + cmd_update_wakeup, + cmd_post_file, + cmd_post_var, + cmd_get_file, + cmd_get_var, +}; + + +typedef struct msg_content_t { + enum cmd_type cmd_t; + size_t seq; + char eof; + size_t size_content; + char *content; + size_t size_nameid; + char * nameid;/* containerid: filename_src_dst_tm */ +} y_MSG_CONTENT_T; + +typedef struct msg_content_t * y_ptr_MSG_CONTENT_T; +GENERATE_LIST_ALL(y_ptr_MSG_CONTENT_T) +GEN_HEAD_PTR_LIST(y_ptr_MSG_CONTENT_T) + + +typedef struct header_t { + enum cmd_type cmd_t; +// size_t seq; + char eof; +// void *content; + size_t size_nameid; + char * nameid;/* containerid: filename_src_dst_tm */ + struct main_list_y_ptr_MSG_CONTENT_T * m_content_l; +} y_HEADER_T; + +typedef struct header_t * y_ptr_HEADER_T; + +GENERATE_LIST_ALL(y_ptr_HEADER_T) +GEN_HEAD_PTR_LIST(y_ptr_HEADER_T) + + +size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node); +//void receve_from_node(struct pollfd *fds, char *msg, size_t count); +void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename); #endif /*Y_FILE_HANDLER_T_H__C*/ diff --git a/y_socket_t/include/y_socket_t/y_socket_t.h b/y_socket_t/include/y_socket_t/y_socket_t.h index 33fa6ea..9b3c0b9 100644 --- a/y_socket_t/include/y_socket_t/y_socket_t.h +++ b/y_socket_t/include/y_socket_t/y_socket_t.h @@ -9,6 +9,8 @@ #include #include +#include + //#include #include #include @@ -46,6 +48,7 @@ enum ipVersions{ extern const int af_array[nbIpVersion];//={AF_INET, AF_INET6}; /* y_ptr_STRING */ + #if 0 struct y_string{ char * buf; @@ -68,7 +71,7 @@ struct y_socket_t{ size_t size_fds; char * port; struct main_list_y_NODE_T *nodes; - pthread_mutex_t *mut_nodes; +// pthread_mutex_t *mut_nodes; int go_on; pthread_mutex_t *mut_go_on; int nb_workers; diff --git a/y_socket_t/src/y_socket_t/y_file_handler.c b/y_socket_t/src/y_socket_t/y_file_handler.c index 092a0dd..1934765 100644 --- a/y_socket_t/src/y_socket_t/y_file_handler.c +++ b/y_socket_t/src/y_socket_t/y_file_handler.c @@ -4,7 +4,357 @@ //#include "y_socket_t/y_node_t.h" +GEN_LIST_ALL(y_ptr_MSG_CONTENT_T) +GEN_FUNC_PTR_LIST_FREE(y_ptr_MSG_CONTENT_T){ + free(arg->content); + free(arg->nameid); + free(arg); +} + + +GEN_LIST_ALL(y_ptr_HEADER_T) + +GEN_FUNC_PTR_LIST_FREE(y_ptr_HEADER_T){ +// free(arg->content); + free(arg->nameid); + purge_ptr_type_list_y_ptr_MSG_CONTENT_T(arg->m_content_l); + free(arg); +} + +y_ptr_MSG_CONTENT_T create_y_ptr_MSG_CONTENT_T(char *nameid, size_t size_nameid, char* content, size_t size_content, + enum cmd_type cmd_t, + size_t seq, + char eof +){ + y_ptr_MSG_CONTENT_T new_p_content = malloc(sizeof(struct msg_content_t)); + + new_p_content->size_content = size_content; + new_p_content->content = malloc(size_content+1); + memcpy(new_p_content->content, content, size_content); + new_p_content->content[size_content]='\0'; + new_p_content->size_nameid = size_nameid; + new_p_content->nameid = malloc(size_nameid+1); + memcpy(new_p_content->nameid, nameid, size_nameid); + new_p_content->nameid[size_nameid]='\0'; + + + new_p_content->cmd_t = cmd_t; + new_p_content->seq = seq; + new_p_content->eof = eof; + + return new_p_content; + +} + +y_ptr_HEADER_T create_y_ptr_HEADER_T(char *nameid, size_t size_nameid, enum cmd_type cmd_t ){ + y_ptr_HEADER_T new_header_t = malloc(sizeof(struct header_t)); + new_header_t->cmd_t = cmd_t; + new_header_t->size_nameid = size_nameid; + new_header_t->nameid = malloc(size_nameid+1); + memcpy(new_header_t->nameid, nameid, size_nameid); + new_header_t->nameid[size_nameid]='\0'; + + new_header_t->m_content_l = create_var_list_y_ptr_MSG_CONTENT_T(); + + return new_header_t; + + +} + +int funcCmp_y_ptr_HEADER_T(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){ + if(h1==NULL || h2==NULL) return -1; + if(h1->size_nameid == h2->size_nameid){ + if(h1->cmd_t == h2->cmd_t){ + return strcmp(h1->nameid, h2->nameid); + }else return (h1->cmd_t - h2->cmd_t); + }else if(h1->size_nameid < h2->size_nameid){ + return -1; + }else return 1; +} + +#define TEST_DUPLICATE_SEQ()\ + if(temp_curr->value->seq == cnt->seq){\ + printf("debug: index_nearest_seq=%ld: seq equal: doublon problem ? seq:%ld appuyer sur une touche\n", index_nearest_seq, cnt->seq);\ + free_y_ptr_MSG_CONTENT_T(cnt); \ + getchar();\ + return -2;\ + } + +long y_append_content_to_header_l(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); + printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid); + if(l_ocate_header){ + free_y_ptr_HEADER_T(current_header); + pthread_mutex_t *mut_m_content_l = l_ocate_header->value->m_content_l->mut_list; + pthread_mutex_lock(mut_m_content_l); + struct list_y_ptr_MSG_CONTENT_T * current_cnt = l_ocate_header->value->m_content_l->current_list; + struct list_y_ptr_MSG_CONTENT_T * end_cnt = l_ocate_header->value->m_content_l->end_list; + struct list_y_ptr_MSG_CONTENT_T * begin_cnt = l_ocate_header->value->m_content_l->begin_list; + + if(begin_cnt == NULL){ + pthread_mutex_unlock(mut_m_content_l); + printf("debug: current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq); + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + printf("debug: after current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq); + return 0; + } + long last_seq = end_cnt->value->seq; + if(cnt->seq > last_seq){ + pthread_mutex_unlock(mut_m_content_l); + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + + struct list_y_ptr_MSG_CONTENT_T *temp_curr = NULL; + + long from_current_seq = current_cnt->value->seq - cnt->seq; \ + size_t abs_cur_diff_seq = abs(from_current_seq); \ + size_t array_diff_seq[3] = {cnt->seq - begin_cnt->value->seq, abs_cur_diff_seq , end_cnt->value->seq - cnt->seq}; \ + size_t index_nearest_seq = ARG_MIN_ARRAY_TYPE_SIZE_T(array_diff_seq, 3);\ + if(index_nearest_seq == 0){\ + for(temp_curr = begin_cnt ;temp_curr && (temp_curr->value->seq < cnt->seq); temp_curr = temp_curr->next){} + if(temp_curr){ + l_ocate_header->value->m_content_l->current_list = temp_curr; + pthread_mutex_unlock(mut_m_content_l); + TEST_DUPLICATE_SEQ() + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index, cnt); + }else{ + pthread_mutex_unlock(mut_m_content_l); + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + }\ + else if(index_nearest_seq == 2){ + + for(temp_curr = end_cnt; temp_curr && temp_curr->value->seq > cnt->seq; temp_curr = temp_curr->preview) {} + if(temp_curr){ + l_ocate_header->value->m_content_l->current_list = temp_curr; + pthread_mutex_unlock(mut_m_content_l); + TEST_DUPLICATE_SEQ() + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index + 1, cnt); + }else{ + pthread_mutex_unlock(mut_m_content_l); + push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + }else if(from_current_seq >= 0) { + for(temp_curr = current_cnt; temp_curr && temp_curr->value->seq > cnt->seq; temp_curr = temp_curr->preview) {} + if(temp_curr){ + l_ocate_header->value->m_content_l->current_list = temp_curr; + pthread_mutex_unlock(mut_m_content_l); + TEST_DUPLICATE_SEQ() + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index + 1, cnt); + }else{ + pthread_mutex_unlock(mut_m_content_l); + push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + + }else{ + for(temp_curr = current_cnt ;temp_curr && (temp_curr->value->seq < cnt->seq); temp_curr = temp_curr->next){} + if(temp_curr){ + l_ocate_header->value->m_content_l->current_list = temp_curr; + pthread_mutex_unlock(mut_m_content_l); + TEST_DUPLICATE_SEQ() + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index, cnt); + }else{ + pthread_mutex_unlock(mut_m_content_l); + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + } + + +#if 0 + // cnt->seq < last_seq + while(current_cnt){ + printf("debug: last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq); + if(cnt->seq == current_cnt->value->seq){ + printf("debug: equal last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq); + //return -2; + } + if(cnt->seq < last_seq && cnt->seq > current_cnt->value->seq){ + pthread_mutex_unlock(mut_m_content_l); + /*if(cnt->seq > last_seq && cnt->seq < current_cnt->value->seq)*/ + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, current_cnt->index + 1, cnt); + }else if(current_cnt->preview==NULL){ + pthread_mutex_unlock(mut_m_content_l); + push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + last_seq = current_cnt->value->seq; + current_cnt = current_cnt->preview; + + } + printf("debug: some thing wrong here! last_seq: %ld, cnt->seq=%ld\n", last_seq, cnt->seq); + return -1; +#endif + } + else{ + push_back_list_y_ptr_MSG_CONTENT_T(current_header->m_content_l, cnt); + push_back_list_y_ptr_HEADER_T(m_head_l_t, current_header); + printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_head_l_t->size); + return 0; + } +} + + + +long y_append_content_to_header_l_from_end(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); + printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid); + if(l_ocate_header){ + free_y_ptr_HEADER_T(current_header); + struct list_y_ptr_MSG_CONTENT_T * current_cnt = l_ocate_header->value->m_content_l->end_list; + if(current_cnt == NULL){ + printf("debug: current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq); + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + printf("debug: after current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq); + return 0; + } + long last_seq = current_cnt->value->seq + 1; + if(cnt->seq >= last_seq){ + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + // cnt->seq < last_seq + while(current_cnt){ + printf("debug: last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq); + if(cnt->seq == current_cnt->value->seq){ + printf("debug: equal last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq); + //return -2; + } + if(cnt->seq < last_seq && cnt->seq > current_cnt->value->seq){ + /*if(cnt->seq > last_seq && cnt->seq < current_cnt->value->seq)*/ + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, current_cnt->index + 1, cnt); + }else if(current_cnt->preview==NULL){ + push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + last_seq = current_cnt->value->seq; + current_cnt = current_cnt->preview; + + } + printf("debug: some thing wrong here! last_seq: %ld, cnt->seq=%ld\n", last_seq, cnt->seq); + return -1; + + } + else{ + push_back_list_y_ptr_MSG_CONTENT_T(current_header->m_content_l, cnt); + push_back_list_y_ptr_HEADER_T(m_head_l_t, current_header); + printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_head_l_t->size); + return 0; + } +} + + +long y_append_content_to_header_l_from_begin(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); + printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid); + if(l_ocate_header){ + free_y_ptr_HEADER_T(current_header); + struct list_y_ptr_MSG_CONTENT_T * current_cnt = l_ocate_header->value->m_content_l->begin_list; + if(current_cnt == NULL){ + printf("debug: current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq); + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + printf("debug: after current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq); + return 0; + } + long last_seq = -1; + while(current_cnt){ + printf("debug: last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq); + if(cnt->seq == current_cnt->value->seq){ + printf("debug: equal last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq); + //return -2; + } + if(cnt->seq > last_seq && cnt->seq < current_cnt->value->seq){ + return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, current_cnt->index, cnt); + }else if(current_cnt->next==NULL){ + push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt); + return 0; + } + last_seq = current_cnt->value->seq; + current_cnt = current_cnt->next; + + } + printf("debug: some thing wrong here! last_seq: %ld, cnt->seq=%ld\n", last_seq, cnt->seq); + return -1; + + } + else{ + push_back_list_y_ptr_MSG_CONTENT_T(current_header->m_content_l, cnt); + push_back_list_y_ptr_HEADER_T(m_head_l_t, current_header); + printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_head_l_t->size); + return 0; + } +} + +struct list_y_ptr_HEADER_T * check_if_all_contents_done_from_headers(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t); + printf("debug: check_if_all_contents_done_from_headers, begin search\n"); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T (m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); + printf("debug: check_if_all_contents_done_from_headers, search done\n"); + free_y_ptr_HEADER_T(current_header); + if(l_ocate_header){ + printf("debug: check_if_all_contents_done_from_headers, l_ocate_header->index=%ld\n",l_ocate_header->index); + if(l_ocate_header->value->m_content_l){ + struct list_y_ptr_MSG_CONTENT_T *end_list = l_ocate_header->value->m_content_l->end_list; + printf("debug: check_if_all_contents_done_from_headers, l_ocate_header->m_content_l->size=%ld,\n",l_ocate_header->value->m_content_l->size); + if(end_list){ + printf("debug: check_if_all_contents_done_from_headers, end->eof=%d, end->seq=%ld, end->index = %ld\n",end_list->value->eof, end_list->value->seq, end_list->index); + // check if all contents are done! + if(end_list->value->eof && end_list->value->seq == end_list->index){ + return l_ocate_header; + } + }else{ + printf("debug: check_if_all_contents_done_from_headers, end_list==NULL\n"); + + } + }else{ + + printf("debug: check_if_all_contents_done_from_headers, l_ocate_header->value->m_content_l==NULL\n"); + } + + } + return NULL; +} + +int remove_content_from_headers(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){ + + y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t); + struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T (m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); + free_y_ptr_HEADER_T(current_header); + if(l_ocate_header){ + struct list_y_ptr_MSG_CONTENT_T *end_list = l_ocate_header->value->m_content_l->end_list; + // check if all contents are done! + if(end_list->value->eof && end_list->value->seq == end_list->index){ //l_ocate_header->value->m_content_l->size - 1 + struct list_y_ptr_HEADER_T *current_list = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, l_ocate_header->index); + + if(current_list == l_ocate_header){ + + free_y_ptr_HEADER_T(l_ocate_header->value); + free(l_ocate_header); + } + return 0; + } + else{ + printf("some thing wrong here: EOF:%d seq_end:%ld, size m_content_l :%ld\n",end_list->value->eof, end_list->value->seq, l_ocate_header->value->m_content_l->size); + return 1; + } + } + else{ + printf("\n%s is not in header list\n",cnt->nameid); + return -1; + } +} void fileNameDateScore(char* filename, char * pre, char* post,size_t score){ // char *filename=malloc(256); @@ -15,6 +365,16 @@ void fileNameDateScore(char* filename, char * pre, char* post,size_t score){ //return filename; } +char * time_id(){ + // char *filename=malloc(256); + char *timeid=malloc(128); + time_t t = time(NULL); + struct tm tm = *localtime(&t); + sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + + return timeid; + //return filename; +} #if 0 struct arg_send_file{ @@ -24,68 +384,26 @@ struct arg_send_file{ }; #endif /* */ -void y_send_post_file_to_all_nodes(void *arg){ - struct arg_send_file *argS=(struct arg_send_file*)arg; - struct pollfd *fds=argS->fds; - struct main_list_y_NODE_T *nodes=argS->nodes; - char * filename=argS->filename; -#if TEMP_ADDR - char tempAddr[BUF_SIZE+1]; -#endif - int c_af; -// char host[NI_MAXHOST], service[NI_MAXSERV]; - char buf_send[BUF_SIZE+1]={0}; - int fd_file; - int retsprintf = snprintf(buf_send, 50,"post file %s", filename ); - printf("debug: buf_send=%s, size=%d\n",buf_send, retsprintf); - - for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ - //memset(tempAddr, 0, BUF_SIZE+1); - c_af=(local_list_current->value).addr.ss_family; -#if TEMP_ADDR - if(c_af==AF_INET){ - if(NULL == inet_ntop(c_af, - &(GET_IN_type_ADDR(&(local_list_current->value),)), +size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node) { + int c_af=(node).addr.ss_family; + if(c_af==AF_INET){ + if(NULL == inet_ntop(c_af, + &(GET_IN_type_ADDR(&(node),)), tempAddr, BUF_SIZE/*(argSock->local_list_current->value).addr_len*/)){ fprintf(stderr, "error inet_ntop v4\n"); } }else if(c_af==AF_INET6){ - if(NULL == inet_ntop(c_af, - &(GET_IN_type_ADDR(&(local_list_current->value),6)), + if(NULL == inet_ntop(c_af, + &(GET_IN_type_ADDR(&(node),6)), tempAddr, BUF_SIZE /*(argSock->local_list_current->value).addr_len*/)){ fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno); } } -#endif -#if 0 - off_t offset = 0; - ssize_t ret_sendfile ; - while((ret_sendfile = sendfile(fds[(c_af==AF_INET6)].fd ,fd_file, &offset, BUF_SIZE))>0){ - - } -#endif - /// printf("debug: destination %s :\n",tempAddr); + size_t ret_len = strlen(tempAddr); + return ret_len; +} - if(sendto(fds[(c_af==AF_INET6)].fd, - buf_send, retsprintf, - /*msgRet, len_msgRet,*/ - 0, - (struct sockaddr*)&((local_list_current->value).addr), - (local_list_current->value).addr_len) != - retsprintf - /*len_msgRet*/ - ){ -#if TEMP_ADDR - fprintf(stderr, "Error sending response to %s\n",tempAddr); -#endif - }else{ -#if TEMP_ADDR - printf("debug: sending %s to < %s >",buf_send,tempAddr); -#endif - } - } - } /* */ #define TEMP_ADDR 1 @@ -98,7 +416,7 @@ void* y_socket_send_file_for_all_nodes(void* arg){ struct main_list_y_NODE_T *nodes=argS->nodes; char * filename=argS->filename; #if TEMP_ADDR - char tempAddr[BUF_SIZE+1]; + char tempAddr[64]; #endif int c_af; // char host[NI_MAXHOST], service[NI_MAXSERV]; @@ -117,99 +435,68 @@ void* y_socket_send_file_for_all_nodes(void* arg){ push_back_list_y_NODE_T(nodes, node); #endif - fd_file = open( filename , O_RDONLY); + + size_t seq = 0;//, len_buf_header=0, + size_t len_local_header_=0; + char * timeid = time_id(); + for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ + + fd_file = open( filename , O_RDONLY); if(fd_file == -1){ fprintf(stderr,"error opening file |%s| for reading\n",filename); return NULL; } - - y_send_post_file_to_all_nodes(arg); - usleep(1); - //for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ) + set_tempAddr_from_node(tempAddr, local_list_current->value); + c_af=(local_list_current->value).addr.ss_family; - //memset(buf_send, 0, BUF_SIZE+1); - while((retread = read(fd_file, buf_send, BUF_SIZE) ) > 0 ){ - buf_send[retread]='\0'; - //memset(msgRet, 0, BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100); - // sprintf(msgRet, "from %s:%s =%s",host, service, buf); - - // len_msgRet = strlen(msgRet); - ///printf("debug: sending response %s :\n",buf_send); - - //FOR_LIST_FORM_BEGIN(y_NODE_T, nodes) - for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ - //memset(tempAddr, 0, BUF_SIZE+1); - c_af=(local_list_current->value).addr.ss_family; -#if TEMP_ADDR - if(c_af==AF_INET){ - if(NULL == inet_ntop(c_af, - &(GET_IN_type_ADDR(&(local_list_current->value),)), - tempAddr, BUF_SIZE/*(argSock->local_list_current->value).addr_len*/)){ - fprintf(stderr, "error inet_ntop v4\n"); - } - }else if(c_af==AF_INET6){ - if(NULL == inet_ntop(c_af, - &(GET_IN_type_ADDR(&(local_list_current->value),6)), - tempAddr, BUF_SIZE /*(argSock->local_list_current->value).addr_len*/)){ - fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno); - } - } -#endif -#if 0 - off_t offset = 0; - ssize_t ret_sendfile ; - while((ret_sendfile = sendfile(fds[(c_af==AF_INET6)].fd ,fd_file, &offset, BUF_SIZE))>0){ - - } -#endif - /// printf("debug: destination %s :\n",tempAddr); + seq=0; -#if 1 + len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); + while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){ + buf_send[len_local_header_ + retread]='\0'; if(sendto(fds[(c_af==AF_INET6)].fd, - buf_send, retread, - /*msgRet, len_msgRet,*/ + buf_send, retread+len_local_header_, 0, (struct sockaddr*)&((local_list_current->value).addr), (local_list_current->value).addr_len) != - retread - /*len_msgRet*/ + retread + len_local_header_ ){ -#if TEMP_ADDR fprintf(stderr, "Error sending response to %s\n",tempAddr); -#endif }else{ -#if TEMP_ADDR - printf("debug: sending response to < %s >",tempAddr); -#endif + printf("debug: sending response to < %s > seq=[%ld] ",tempAddr,seq); } + + ++seq; + len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); } -#endif - -#if 0 - //memset(buf_send, 0, BUF_SIZE+1); - retread = sprintf(buf_send, "post file %s", filename); - if(sendto(fds[(c_af==AF_INET6)].fd, - buf_send, retread, - /*msgRet, len_msgRet,*/ - 0, - (struct sockaddr*)&((local_list_current->value).addr), - (local_list_current->value).addr_len) != - retread - /*len_msgRet*/ + len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq, tempAddr,timeid); + if(sendto(fds[(c_af==AF_INET6)].fd, + buf_send, len_local_header_, + 0, + (struct sockaddr*)&((local_list_current->value).addr), + (local_list_current->value).addr_len) != + len_local_header_ ){ - fprintf(stderr, "Error sending response to %s\n",tempAddr); - }else{ - printf("debug: sending response to < %s >",tempAddr); - } -#endif - - } + fprintf(stderr, "Error ending response to %s, len_buf_header=%ld\n", tempAddr, len_local_header_); + }else{ + printf("debug: ending response to < %s > [%ld] EOF",tempAddr,seq); + } close(fd_file); - printf("debug: fd=%d closed: filename=%s\n",fd_file,filename); + printf("debug: fd=%d closed: filename=%s, for index = %ld\n",fd_file,filename, local_list_current->index); + + } + + + + free(timeid); return NULL; } + + +//main_list_y_ptr_HEADER_T + /* struct arg_record_to_file{ int *is_file_to_record, @@ -221,8 +508,115 @@ void record_buffer_to_file(void *arg){ } */ -void receve_from_node(struct pollfd *fds, char *msg, size_t count){ - printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count); +#if 1 +void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename ){ + //printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count); + + + //size_t size_m_str = 0; + struct list_y_ptr_STRING * local_current_no_rec = m_str->begin_list; + struct list_y_ptr_STRING * local_current; + for(local_current = local_current_no_rec; local_current; local_current = local_current->next){ + char *buf_loc = local_current->value->buf; + char nameid[BUF_SIZE]=""; + size_t size_nameid=0; + struct js_value * js_header_v = create_js_value(buf_loc,NULL); + //struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v ); + //printf("debug: index=[%ld] \n BEGIN file ***\n%s\n END\n",local_current->index,buf_loc); + if(js_header_v){ + + + struct js_value *js_seq_v = get_js_value_of_key("seq", js_header_v ); + char eof=0; + if(js_seq_v){ + if(js_seq_v->type.object.value->code_type == jstype_number){ + size_t seq_local = (long)(js_seq_v->type.object.value->type.number); + printf("debug: \n*********seq_local=%ld ***\n\n",seq_local); + + struct js_value *js_eof_v = get_js_value_of_key("EOF", js_header_v ); + if(js_eof_v){ + // size_m_str = seq_local; + eof=1; + printf("debug: \n****************************end of file ***\n\n"); + //printf("debug: \n****************************end of file ***\n%s\n**********************************\n",buf_loc); + } + + struct js_value *js_dst_v = get_js_value_of_key("dst", js_header_v ); + if(js_dst_v){ + struct js_value *js_tm_v = get_js_value_of_key("tm", js_header_v ); + if(js_tm_v){ + size_t length_js_header = js_org_str_length(js_header_v); + char *content = buf_loc+ length_js_header; + size_t size_content = strlen(content);// js_header_v->length - length_js_header; + enum cmd_type cmd_t = cmd_post_file; + size_nameid = sprintf(nameid, "%s_%s_%s_%s",filename,srcAddr, value_of_(js_dst_v)->type.string,value_of_(js_tm_v)->type.string); + printf("debug: nameid = %s\n", nameid); + y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof); + long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt); + if(ret_app != -2){ + struct list_y_ptr_HEADER_T * local_header = check_if_all_contents_done_from_headers(m_head_l_t, y_msg_cnt); + if(local_header){ + struct main_list_y_ptr_MSG_CONTENT_T *m_content_l = local_header->value->m_content_l; + struct list_y_ptr_MSG_CONTENT_T * tmpCnt_l = m_content_l->begin_list; + while(tmpCnt_l){ + printf("debug: nameid:%s seq = %ld eof %d\n\n%s\n",tmpCnt_l->value->nameid, tmpCnt_l->value->seq, tmpCnt_l->value->eof, tmpCnt_l->value->content); + tmpCnt_l=tmpCnt_l->next; + } + struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index); + free_y_ptr_HEADER_T(l_head_to_remove->value); + free(l_head_to_remove); + + } + } + + }else{ + printf("debug: tm missing!"); + } + } + else{ + printf("debug: dst missing!"); + } + } + else{ + printf("debug: \n SSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ type:%d \n",js_seq_v->type.object.value->code_type); + + } + + + }else{ + + printf("debug: \n NNNNNNNNNNNNNNNNOOOOOOOOOOOOOSSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ :type header : %d \n",js_header_v->code_type); + } + free_js_value(js_header_v); + }else{ + printf("\ndebug NULLL JS___HHHEADER_V \n"); + } + } + //if(local_current){ + // printf("debug: getchar\n"); + // getchar(); + + //} + //local_current_no_rec = local_current_no_rec->next; + + + /*if(cur_index == size_m_str){ + printf("debug: all seq are received: %ld\n",cur_index); + } + else{ + printf("debug: some issue cur_index %ld vs size_m_str :%ld\n",cur_index,size_m_str); + + }*/ + + if(m_str){ + purge_ptr_type_list_y_ptr_STRING(m_str); + m_str = NULL; + } + +} + +#endif +/* char filename[500]; int fd_file; long int nread; @@ -262,6 +656,5 @@ void receve_from_node(struct pollfd *fds, char *msg, size_t count){ } printf("debug: close nread==%ld\n",nread); close(fd_file); +*/ - -} diff --git a/y_socket_t/src/y_socket_t/y_list_string.c b/y_socket_t/src/y_socket_t/y_list_string.c index d80fc8f..69b3d2f 100644 --- a/y_socket_t/src/y_socket_t/y_list_string.c +++ b/y_socket_t/src/y_socket_t/y_list_string.c @@ -7,7 +7,7 @@ struct y_string * create_y_ptr_STRING(const char *buf, size_t size){ if(buf){ //strncpy(string->buf, buf, size); //snprintf(string->buf, size, "%s", buf); - memcpy(string->buf, buf, size); + memcpy(string->buf, buf, size+1); //if(strlen(buf)>=size) if(buf[size]!='\0') string->buf[size]='\0'; diff --git a/y_socket_t/src/y_socket_t/y_socket_t.c b/y_socket_t/src/y_socket_t/y_socket_t.c index cbc7be3..7bfc1d6 100644 --- a/y_socket_t/src/y_socket_t/y_socket_t.c +++ b/y_socket_t/src/y_socket_t/y_socket_t.c @@ -1,6 +1,7 @@ /*file: src/y_socket_t/y_socket_t.c */ #include "y_socket_t/y_socket_t.h" + //#include "y_socket_t/y_list_string.h" //#include "json_t/json_t.h" @@ -72,8 +73,8 @@ struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers) sock_temp->port=port; sock_temp->nodes = create_var_list_y_NODE_T(); - sock_temp->mut_nodes = malloc(sizeof(pthread_mutex_t)); - pthread_mutex_init(sock_temp->mut_nodes, NULL); +// sock_temp->mut_nodes = malloc(sizeof(pthread_mutex_t)); +// pthread_mutex_init(sock_temp->mut_nodes, NULL); sock_temp->go_on = 1; sock_temp->mut_go_on = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(sock_temp->mut_go_on, NULL); @@ -87,8 +88,8 @@ struct y_socket_t * y_socket_create_(char * port){ void y_socket_free(struct y_socket_t *socket){ free(socket->fds); free_all_var_list_y_NODE_T(socket->nodes); - pthread_mutex_destroy(socket->mut_nodes); - free(socket->mut_nodes); +// pthread_mutex_destroy(socket->mut_nodes); +// free(socket->mut_nodes); pthread_mutex_destroy(socket->mut_go_on); free(socket->mut_go_on); free(socket); @@ -303,6 +304,22 @@ void y_socket_get_fds(struct pollfd * fds, char * port, char * addrDistant){ if(bind(fds[af].fd, rp->ai_addr, rp->ai_addrlen)==-1){ close(fds[af].fd); fds[af].fd=-1; + }else{ + char tempAddr[BUF_SIZE]={0}; + if(af_array[af]==AF_INET){ + if(NULL == inet_ntop(AF_INET, + &(GET_IN_type_ADDR(rp->ai_addr,)), + tempAddr, BUF_SIZE)){ + fprintf(stderr, "error inet_ntop v4\n"); + } + }else if(af_array[af]==AF_INET6){ + if(NULL == inet_ntop(AF_INET6, + &(GET_IN_type_ADDR(rp->ai_addr,6)), + tempAddr, BUF_SIZE )){ + fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno); + } + } + printf("\n\ndebug: ADDR_LOCAL v%d:%s\n\n", 2*af+4,tempAddr); } #if 0 int flags = fcntl(fds[af].fd, F_GETFL); @@ -322,17 +339,19 @@ int flags = fcntl(fds[af].fd, F_GETFL); } struct arg_handler_{ - char *buf; + struct main_list_y_ptr_STRING *m_str; + //char *buf; struct pollfd *fds; y_NODE_T node; struct y_socket_t *sock; struct argWorker *argw ; + struct main_list_y_ptr_HEADER_T *m_head_l_t; }; //void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock) void* y_socket_handler_(void *arg){ struct arg_handler_ *argH = (struct arg_handler_ *)arg; - char *buf=argH->buf; + struct main_list_y_ptr_STRING *m_str =argH->m_str; struct pollfd *fds=argH->fds; struct y_socket_t *sock=argH->sock; struct argWorker *argw=argH->argw; @@ -340,56 +359,139 @@ void* y_socket_handler_(void *arg){ struct main_list_y_NODE_T *nodes = sock->nodes; update_nodes(argH->node, nodes); - printf("\n\n:::::::::::::::::::::::::::handler: : \n\n%s\n\n::::::::::::::::::::::::::\n",buf); - if(strncmp(buf, "get", 3)==0){ - if(strncmp(buf+4,"file",4)==0){ - char *filename = buf + 9; - struct arg_send_file *argS=malloc(sizeof(struct arg_send_file)); - argS->fds=fds; - argS->nodes=nodes; - argS->filename=filename; - push_back_list_TYPE_PTR(argw->list_arg, argS); - struct y_task_t task_send={ - .func=y_socket_send_file_for_all_nodes, - .arg=argS, - .status=TASK_PENDING, - }; - push_tasQ(argw->argx->tasQ, task_send); - //y_socket_send_file_for_all_nodes(fds, nodes, filename) ; - } - } - else if(strncmp(buf, "update", 6)==0){ - if(strncmp(buf+7,"kill",4)==0){ - pthread_mutex_lock(sock->mut_go_on); - sock->go_on = 0; - pthread_mutex_unlock(sock->mut_go_on); -// kill_all_workers(argw); -// printf("debug: kill_all\n"); + char *buf_org = m_str->begin_list->value->buf; + //printf("\n\n:::::::::::::::::::::::::::handler: : \n\n%s\n\n::::::::::::::::::::::::::\n",buf_org); + struct js_value *js_header = create_js_value(buf_org, NULL); + if(js_header && js_header->code_type == jstype_object){ + struct js_value *js_cmd = get_js_value_of_key("cmd", js_header ); + if(js_cmd && js_cmd->type.object.value->code_type == jstype_string){ + /** */ + struct js_value *js_seq = get_js_value_of_key("seq", js_header ); + if(js_seq){ + size_t seq_local = (long)(js_seq->type.object.value->type.number); + printf("debug: \n HANDLER header seq_local=%ld \n",seq_local); + }else{ + printf("debug: \n HANDLER header no seq\n"); + + } + /* */ + char * buf = js_cmd->type.object.value->type.string; + + if(strncmp(buf, "get", 3)==0){ + if(strncmp(buf+4,"file",4)==0){ + size_t len_filename = strlen(buf + 9); + char *filename = malloc(len_filename+1); + memcpy(filename, buf + 9, len_filename ); + filename[len_filename]='\0'; + //printf("debug: filename: %s \n\n",filename); + struct arg_send_file *argS=malloc(sizeof(struct arg_send_file)); + argS->fds=fds; + argS->nodes=nodes; + argS->filename=filename; + push_back_list_TYPE_PTR(argw->list_arg, argS); + push_back_list_TYPE_PTR(argw->list_arg, filename); + struct y_task_t task_send={ + .func=y_socket_send_file_for_all_nodes, + .arg=argS, + .status=TASK_PENDING, + }; + push_tasQ(argw->argx->tasQ, task_send); + //y_socket_send_file_for_all_nodes(fds, nodes, filename) ; + } + } + else if(strncmp(buf, "update", 6)==0){ + if(strncmp(buf+7,"kill",4)==0){ + pthread_mutex_lock(sock->mut_go_on); + sock->go_on = 0; + pthread_mutex_unlock(sock->mut_go_on); + // kill_all_workers(argw); + // printf("debug: kill_all\n"); + } + + } + else if(strncmp(buf, "post", 4)==0){ + if(strncmp(buf+5,"file",4)==0){ + char *filename = buf+10; + //index_f=strcpy(filename, buf + 10); + int index_f = strlen(filename); + printf("debug: receve_from_node : file: %s\n",filename); + for(--index_f; index_f>=0;--index_f){ + if(filename[index_f]=='/') { + ++index_f; + break; + } + } + +#if 0 + //struct list_y_ptr_STRING * last_record_=NULL; + for(struct list_y_ptr_STRING * local_current = m_str->begin_list; local_current; local_current = local_current->next){ + char *buf_loc = local_current->value->buf; + struct js_value * js_header_v = create_js_value(buf_loc,NULL); + //struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v ); + //printf("debug: index=[%ld] \n BBBBBEGINNNNNN file ***\n%s\n EEEENDDDDD\n",local_current->index,buf_loc); + printf("debug: index=[%ld] \n",local_current->index); + if(js_header_v){ + + + struct js_value *js_seq_v = get_js_value_of_key("seq", js_header_v ); + if(js_seq_v){ + if(js_seq_v->type.object.value->code_type == jstype_number){ + printf("debug: receve : \n################################# seq : %ld ###################################\n",(long)(js_seq_v->type.object.value->type.number)); + } + else{ + printf("debug: \n SSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ type:%d \n",js_seq_v->type.object.value->code_type); + + } + }else{ + + printf("debug: \n NNNNNNNNNNNNNNNNOOOOOOOOOOOOOSSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ :type header : %d \n",js_header_v->code_type); + } + struct js_value *js_eof_v = get_js_value_of_key("EOF", js_header_v ); + if(js_eof_v){ + printf("debug: \n****************************end of file ***\n%s\n**********************************\n",buf_loc); + } + else{ + //printf("debug: \n*******************************\n%s\n**********************************\n",buf_loc+js_org_str_length(js_header_v)); + } + free_js_value(js_header_v); + }else{ + printf("\ndebug NULLL JS___HHHEADER_V \n"); + } + } + + +#else + struct main_list_y_ptr_HEADER_T *m_head_l_t = argH->m_head_l_t; + char srcAddr[BUF_SIZE]; + set_tempAddr_from_node(srcAddr, argH->node); + receve_from_node(m_head_l_t, m_str,srcAddr, filename + index_f); + m_str = NULL; +#endif + /* + pthread_mutex_lock(sock->mut_go_on); + sock->go_on = 0; + pthread_mutex_unlock(sock->mut_go_on); + */ + // kill_all_workers(argw); + // printf("debug: kill_all\n"); + } + + } + } } - else if(strncmp(buf, "post", 4)==0){ - if(strncmp(buf+5,"file",4)==0){ - char filename[BUF_SIZE]; - strcpy(filename, buf + 10); - - printf("debug: receve_from_node : file: %s\n",filename); - receve_from_node(fds, filename, strlen(filename)); - /* - pthread_mutex_lock(sock->mut_go_on); - sock->go_on = 0; - pthread_mutex_unlock(sock->mut_go_on); - */ -// kill_all_workers(argw); -// printf("debug: kill_all\n"); - } - + free_js_value(js_header); + if(m_str){ + purge_ptr_type_list_y_ptr_STRING(m_str); + printf("debug: purge_ptr_type_list_y_ptr_STRING in y_socket_handler_\n"); } - return NULL; } + +#if 0 void handle_input_kbd(char *buf, ssize_t buf_len ,void *arg){ struct y_socket_t * argSock = (struct y_socket_t*)arg; struct pollfd *fds = argSock->fds; @@ -463,26 +565,67 @@ void handle_input_kbd(char *buf, ssize_t buf_len ,void *arg){ } } } +#endif -void handle_buf_socket_rec(char *temp_all_buf, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){ +void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){ struct y_socket_t * argSock = (struct y_socket_t*)arg; struct pollfd *fds = argSock->fds; + + struct js_value *js_header = create_js_value(m_str->begin_list->value->buf, NULL); + if(js_header && js_header->code_type == jstype_object){ + struct js_value *js_cmd = get_js_value_of_key("cmd", js_header ); + if(js_cmd && js_cmd->type.object.value->code_type == jstype_string){ + if(strncmp(js_cmd->type.object.value->type.string,"update standby",14)==0){ + //pthread_mutex_lock(sock->mut_go_on); + //sock->go_on = 0; + //pthread_mutex_unlock(sock->mut_go_on); + standby_all_workers(workers->begin_list->value->arg); + // printf("debug: kill_all\n"); + } + else if(strncmp(js_cmd->type.object.value->type.string,"update wakeup",13)==0){ + //pthread_mutex_lock(sock->mut_go_on); + //sock->go_on = 0; + //pthread_mutex_unlock(sock->mut_go_on); + wakeup_all_workers(workers->begin_list->value->arg); + // printf("debug: kill_all\n"); + } + else{ + struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_)); + ptr_argHandl->m_str = m_str; + ptr_argHandl->fds=fds; + ptr_argHandl->sock=argSock; + ptr_argHandl->node=node; + ptr_argHandl->argw=workers->begin_list->value->arg; + ptr_argHandl->m_head_l_t=m_head_l_t; + + push_back_list_TYPE_PTR(list_arg, ptr_argHandl); + struct y_task_t task_handl = { + .func=y_socket_handler_, + .arg=ptr_argHandl, + .status=TASK_PENDING, + }; + push_tasQ(argx->tasQ, task_handl); + } + } + } + +#if 0 if(strncmp(temp_all_buf,"update standby",14)==0){ //pthread_mutex_lock(sock->mut_go_on); //sock->go_on = 0; //pthread_mutex_unlock(sock->mut_go_on); standby_all_workers(workers->begin_list->value->arg); // printf("debug: kill_all\n"); - } - else if(strncmp(temp_all_buf,"update wakeup",13)==0){ + } + else if(strncmp(temp_all_buf,"update wakeup",13)==0){ //pthread_mutex_lock(sock->mut_go_on); //sock->go_on = 0; //pthread_mutex_unlock(sock->mut_go_on); wakeup_all_workers(workers->begin_list->value->arg); // printf("debug: kill_all\n"); - } - else{ + } + else{ struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_)); ptr_argHandl->buf = temp_all_buf; ptr_argHandl->fds=fds; @@ -497,7 +640,9 @@ void handle_buf_socket_rec(char *temp_all_buf, y_NODE_T node, struct main_list_p .status=TASK_PENDING, }; push_tasQ(argx->tasQ, task_handl); - } + } +#endif + free_js_value(js_header); } void *y_socket_poll_fds(void *arg){ @@ -533,9 +678,9 @@ void *y_socket_poll_fds(void *arg){ y_NODE_T node; int af, status; ssize_t nread, buf_len; - char buf[BUF_SIZE]; - struct main_list_y_ptr_STRING *m_str=create_var_list_y_ptr_STRING(); - + char buf[BUF_SIZE+1]; + struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING(); + struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T(); // char *temp_all_buf=NULL; // char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100]; @@ -556,24 +701,33 @@ void *y_socket_poll_fds(void *arg){ } for(af = v4; af<=v6;++af){ if(fds[af].revents && POLLIN){ - remove_all_ptr_type_list_y_ptr_STRING(m_str); - memset(buf, 0, BUF_SIZE); + //remove_all_ptr_type_list_y_ptr_STRING(m_str); + if(m_str == NULL) + m_str=create_var_list_y_ptr_STRING(); + memset(buf, 0, BUF_SIZE+1); + +#if 1 while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0, (struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){ - if(buf[nread-1]=='\n') buf[nread-1]='\0'; + + if(buf[nread-1]=='\n') + buf[nread-1]='\0'; buf[nread]='\0'; + y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread); push_back_list_y_ptr_STRING(m_str, y_buf); - ///printf("debug: push_back_list_y_ptr_STRING of <%s>\n",buf); + //printf("debug: push_back_list_y_ptr_STRING of <%s>\n",buf); + //memset(buf, 0, BUF_SIZE+1); //printf("debug: nread: %ld vs BUF_SIZE :%d \n",nread, BUF_SIZE); } //printf("debug: out nread: %ld vs BUF_SIZE :%d \n",nread, BUF_SIZE); if(nread == -1) fprintf(stderr,"error recvfrom\n"); - else if(nread >= 0 && nread < BUF_SIZE){ - if(nread && buf[nread-1]=='\n') buf[nread-1]='\0'; + else if(nread >= 0 && nread < BUF_SIZE){ + if(nread && buf[nread-1]=='\n' + ) buf[nread-1]='\0'; buf[nread]='\0'; //printf("msg: %s\n",buf); y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread); @@ -581,6 +735,7 @@ void *y_socket_poll_fds(void *arg){ //printf("debug: out push_back_list_y_ptr_STRING of <%s>\n",buf); } +#endif #if 0 struct arg_update_nodes *argUP_N=malloc(sizeof(struct arg_update_nodes)); @@ -601,17 +756,22 @@ void *y_socket_poll_fds(void *arg){ temp_all_buf=NULL; }*/ /*size_t total_buf = */ - char * temp_all_buf = NULL; + /*char * temp_all_buf = NULL; copy_list_y_ptr_STRING_to_one_string(&temp_all_buf , m_str); push_back_list_TYPE_PTR(list_arg, temp_all_buf); - + */ - handle_buf_socket_rec(temp_all_buf, node, workers, argx, list_arg, arg); /// //y_socket_handler_(temp_all_buf, fds, argSock); /// } + if(m_str){ + printf("debug: call handle_buf_socket_rec\n"); + handle_buf_socket_rec(m_head_l_t,m_str, node, workers, argx, list_arg, arg); + + m_str=NULL; + } } // stdin poll if(fds[2].revents){// && POLLIN @@ -639,12 +799,13 @@ void *y_socket_poll_fds(void *arg){ printf("debug : index_str= %d; cmd=[%s]\n",index_str, cmd); index_str=0; - while(buf[index_buf]==' '){++index_buf;} - for(; buf[index_buf]!=' '; ++index_buf){ + while((index_buf < buf_len) && (buf[index_buf]==' ')){++index_buf;} + for(; (index_buf < buf_len) && (buf[index_buf]!=' '); ++index_buf){ dst_addr[index_str++]=buf[index_buf]; } dst_addr[index_str]='\0'; - while(buf[index_buf]==' '){++index_buf;} + //while(buf[index_buf]==' '){++index_buf;} + while((index_buf < buf_len) && (buf[index_buf]==' ')){++index_buf;} /*index_str=0; for(; buf[index_buf]!='\n'; ++index_buf) msg_buf[index_str++]=buf[index_buf]; @@ -699,8 +860,11 @@ void *y_socket_poll_fds(void *arg){ } - - purge_ptr_type_list_y_ptr_STRING(m_str); + if(m_str){ + purge_ptr_type_list_y_ptr_STRING(m_str); + + printf("debug: m_str!=NULL -> purge_ptr_type_list_y_ptr_STRING done\n"); + } /* if(temp_all_buf){ free(temp_all_buf); @@ -711,7 +875,8 @@ void *y_socket_poll_fds(void *arg){ kill_all_workers(workers->begin_list->value->arg); printf("debug: kill all done\n"); - + purge_ptr_type_list_y_ptr_HEADER_T(m_head_l_t); + printf("debug: purge_ptr_type_list_y_ptr_HEADER_T done\n"); ///// ///// ///// diff --git a/y_socket_t/test/Makefile b/y_socket_t/test/Makefile index 823d2cb..ca875d6 100644 --- a/y_socket_t/test/Makefile +++ b/y_socket_t/test/Makefile @@ -11,7 +11,8 @@ YJSONDIR=$(PWD)/../../yjson_t ROOT_DIR=$(PWD)/.. INCLUDE_DIR=$(ROOT_DIR)/include -CFLAGS=-I$(INCLUDE_DIR) -I$(YTESTDIR)/include_ytest/include -I$(YLISTDIR)/src -I$(YWORKDIR)/include -I$(YJSONDIR)/src +INCLUDE=-I$(INCLUDE_DIR) -I$(YTESTDIR)/include_ytest/include -I$(YLISTDIR)/src -I$(YWORKDIR)/include -I$(YJSONDIR)/src +CFLAGS=-Wall -Werror -fpic $(INCLUDE) LDFLAGS=-L$(YTESTDIR) -lytest -lpthread -lm -lOpenCL #SRC_DIR=$(ROOT_DIR)/src diff --git a/y_worker_t/src/y_worker_t/y_task_t.c b/y_worker_t/src/y_worker_t/y_task_t.c index ff6cb34..2e36105 100644 --- a/y_worker_t/src/y_worker_t/y_task_t.c +++ b/y_worker_t/src/y_worker_t/y_task_t.c @@ -40,8 +40,8 @@ struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){ while(tasQ->list_tasQ->end_list == NULL){ pthread_cond_wait(tasQ->cond_tasQ, tasQ->mut_tasQ); } - printf("debug: call pull_begin_from_list_y_TASK_T debut\n"); valueRet = pull_begin_from_list_y_TASK_T(tasQ->list_tasQ); + printf("debug: call pull_begin_from_list_y_TASK_T debut\n"); // valueRet = pull_end_from_list_y_TASK_T(tasQ->list_tasQ); //printf("debug: call pull_begin_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL); pthread_mutex_unlock(tasQ->mut_tasQ);