y_socket: send ok when all seqs done, if not, sender resend all msg, try 5 times
This commit is contained in:
@@ -11,23 +11,27 @@
|
|||||||
|
|
||||||
#include "json_t/json_t.h"
|
#include "json_t/json_t.h"
|
||||||
|
|
||||||
|
#include "list_t/list_t.h"
|
||||||
|
|
||||||
void fileNameDateScore(char* filename, char * pre, char* post,size_t score);
|
void fileNameDateScore(char* filename, char * pre, char* post,size_t score);
|
||||||
|
|
||||||
struct arg_send_file{
|
struct arg_send_file{
|
||||||
struct pollfd *fds;
|
struct pollfd *fds;
|
||||||
struct main_list_y_NODE_T *nodes;
|
struct main_list_y_NODE_T *nodes;
|
||||||
|
y_NODE_T node;
|
||||||
char * filename;
|
char * filename;
|
||||||
|
struct main_list_y_ptr_HEADER_T *m_ok_head_l_t;
|
||||||
};
|
};
|
||||||
|
|
||||||
void* y_socket_send_file_for_all_nodes(void* arg);
|
void* y_socket_send_file_for_all_nodes(void* arg);
|
||||||
|
void* y_socket_send_file_for_node(void* arg);
|
||||||
|
|
||||||
enum cmd_type {
|
enum cmd_type {
|
||||||
cmd_update_kill,
|
cmd_update_kill,
|
||||||
cmd_update_standby,
|
cmd_update_standby,
|
||||||
cmd_update_wakeup,
|
cmd_update_wakeup,
|
||||||
cmd_post_file,
|
cmd_post_file,
|
||||||
|
cmd_post_ok,
|
||||||
cmd_post_var,
|
cmd_post_var,
|
||||||
cmd_get_file,
|
cmd_get_file,
|
||||||
cmd_get_var,
|
cmd_get_var,
|
||||||
@@ -53,6 +57,7 @@ typedef struct header_t {
|
|||||||
enum cmd_type cmd_t;
|
enum cmd_type cmd_t;
|
||||||
// size_t seq;
|
// size_t seq;
|
||||||
char eof;
|
char eof;
|
||||||
|
// char ok;
|
||||||
// void *content;
|
// void *content;
|
||||||
size_t size_nameid;
|
size_t size_nameid;
|
||||||
char * nameid;/* containerid: filename_src_dst_tm */
|
char * nameid;/* containerid: filename_src_dst_tm */
|
||||||
@@ -67,6 +72,7 @@ GEN_HEAD_PTR_LIST(y_ptr_HEADER_T)
|
|||||||
|
|
||||||
size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node);
|
size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node);
|
||||||
//void receve_from_node(struct pollfd *fds, char *msg, size_t count);
|
//void receve_from_node(struct pollfd *fds, char *msg, size_t count);
|
||||||
void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename);
|
void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /* char * srcAddr*/, char *filename);
|
||||||
|
long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid );
|
||||||
|
|
||||||
#endif /*Y_FILE_HANDLER_T_H__C*/
|
#endif /*Y_FILE_HANDLER_T_H__C*/
|
||||||
|
|||||||
@@ -21,5 +21,6 @@ GEN_HEAD_PTR_LIST(y_ptr_STRING)
|
|||||||
|
|
||||||
size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr);
|
size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr);
|
||||||
size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr);
|
size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr);
|
||||||
|
struct main_list_y_ptr_STRING * split_str_to_main_list_y_ptr_STRING(char *str_org, char sep, size_t limit_size_str_org);
|
||||||
|
|
||||||
#endif /* Y_PTR_STRING_T_H__C */
|
#endif /* Y_PTR_STRING_T_H__C */
|
||||||
|
|||||||
@@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
#include "y_socket_t/y_file_handler.h"
|
#include "y_socket_t/y_file_handler.h"
|
||||||
|
|
||||||
|
#define TTL_SOCKDRAM 10
|
||||||
|
|
||||||
//#include "y_socket_t/y_node_t.h"
|
//#include "y_socket_t/y_node_t.h"
|
||||||
|
|
||||||
GEN_LIST_ALL(y_ptr_MSG_CONTENT_T)
|
GEN_LIST_ALL(y_ptr_MSG_CONTENT_T)
|
||||||
@@ -73,11 +75,76 @@ int funcCmp_y_ptr_HEADER_T(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){
|
|||||||
}else return 1;
|
}else return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int funcCmp_y_ptr_HEADER_T_fn_nameid_mask(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){
|
||||||
|
if(h1==NULL || h2==NULL) return -1;
|
||||||
|
struct main_list_y_ptr_STRING * m_h1_nameid = split_str_to_main_list_y_ptr_STRING(h1->nameid,'_', h1->size_nameid);
|
||||||
|
struct main_list_y_ptr_STRING * m_h2_nameid = split_str_to_main_list_y_ptr_STRING(h2->nameid,'_', h2->size_nameid);
|
||||||
|
|
||||||
|
//int count_match = 0;
|
||||||
|
struct main_list_TYPE_SIZE_T * m_index_not_match = create_var_list_TYPE_SIZE_T();
|
||||||
|
int ret = 0;
|
||||||
|
if(m_h1_nameid->size != m_h2_nameid->size) {
|
||||||
|
ret = -2;
|
||||||
|
}else{
|
||||||
|
for(struct list_y_ptr_STRING *l_h1_ = m_h1_nameid->end_list, *l_h2_ = m_h2_nameid->end_list; l_h1_ && l_h2_; l_h1_ = l_h1_->preview, l_h2_=l_h2_->preview){
|
||||||
|
if((l_h1_->index >= m_h1_nameid->size - 2) || (l_h1_->index < m_h1_nameid->size - 3)){
|
||||||
|
if(strcmp(l_h1_->value->buf, l_h2_->value->buf )==0){
|
||||||
|
//++count_match ;
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
push_back_list_TYPE_SIZE_T(m_index_not_match, l_h1_->index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(m_index_not_match->size) ret = -1;
|
||||||
|
else ret = 0;
|
||||||
|
|
||||||
|
|
||||||
|
purge_ptr_type_list_y_ptr_STRING(m_h1_nameid);
|
||||||
|
purge_ptr_type_list_y_ptr_STRING(m_h2_nameid);
|
||||||
|
free_all_var_list_TYPE_SIZE_T(m_index_not_match);
|
||||||
|
printf("check_if_in_ok_header_l_ ret=%d, %s ns %s\n",ret,h1->nameid, h2->nameid);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
long check_if_in_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){
|
||||||
|
|
||||||
|
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok );
|
||||||
|
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T_fn_nameid_mask);
|
||||||
|
free_y_ptr_HEADER_T(current_header);
|
||||||
|
|
||||||
|
if(l_ocate_header){
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){
|
||||||
|
|
||||||
|
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok );
|
||||||
|
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
|
||||||
|
if(l_ocate_header){
|
||||||
|
free_y_ptr_HEADER_T(current_header);
|
||||||
|
printf("debug: already in m_ok_head_l_t");
|
||||||
|
return -1; // already in list
|
||||||
|
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
push_back_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header);
|
||||||
|
printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_ok_head_l_t->size);
|
||||||
|
return m_ok_head_l_t->size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define TEST_DUPLICATE_SEQ()\
|
#define TEST_DUPLICATE_SEQ()\
|
||||||
if(temp_curr->value->seq == cnt->seq){\
|
if(temp_curr->value->seq == cnt->seq){\
|
||||||
printf("debug: index_nearest_seq=%ld: seq equal: doublon problem ? seq:%ld appuyer sur une touche\n", index_nearest_seq, cnt->seq);\
|
printf("debug: index_nearest_seq=%ld: seq equal: doublon problem ? seq:%ld appuyer sur une touche\n", index_nearest_seq, cnt->seq);\
|
||||||
free_y_ptr_MSG_CONTENT_T(cnt); \
|
free_y_ptr_MSG_CONTENT_T(cnt); \
|
||||||
getchar();\
|
/*getchar();*/\
|
||||||
return -2;\
|
return -2;\
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -376,6 +443,7 @@ char * time_id(){
|
|||||||
//return filename;
|
//return filename;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
struct arg_send_file{
|
struct arg_send_file{
|
||||||
struct pollfd *fds;
|
struct pollfd *fds;
|
||||||
@@ -404,9 +472,118 @@ size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node) {
|
|||||||
return ret_len;
|
return ret_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
///
|
||||||
|
///
|
||||||
|
|
||||||
|
|
||||||
|
#define TEMP_ADDR 1
|
||||||
|
|
||||||
|
///
|
||||||
|
void* y_socket_send_file_for_node(void* arg){
|
||||||
|
struct arg_send_file *argS=(struct arg_send_file*)arg;
|
||||||
|
|
||||||
|
struct pollfd *fds=argS->fds;
|
||||||
|
y_NODE_T node=argS->node;
|
||||||
|
char * filename=argS->filename;
|
||||||
|
#if TEMP_ADDR
|
||||||
|
char tempAddr[64];
|
||||||
|
#endif
|
||||||
|
int c_af;
|
||||||
|
// char host[NI_MAXHOST], service[NI_MAXSERV];
|
||||||
|
char buf_send[BUF_SIZE+1];
|
||||||
|
int fd_file;
|
||||||
|
int retread;
|
||||||
|
#if 0
|
||||||
|
int status = getnameinfo((struct sockaddr*)&(node.addr), node.addr_len, host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST);
|
||||||
|
if(status)
|
||||||
|
// printf("debug: status ==0 : success: Received successfully from %s:%s\n", host,service);
|
||||||
|
// else
|
||||||
|
fprintf(stderr, "getnameinfo: %s\n", gai_strerror(status));
|
||||||
|
|
||||||
|
|
||||||
|
if(NULL == search_node_in_list_y_NODE_T(nodes, node))
|
||||||
|
push_back_list_y_NODE_T(nodes, node);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
size_t seq = 0;//, len_buf_header=0,
|
||||||
|
size_t len_local_header_=0;
|
||||||
|
char * timeid = time_id();
|
||||||
|
char nameid[BUF_SIZE/2];
|
||||||
|
struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1);
|
||||||
|
char * name_f=m_str_name_f->end_list->value->buf;
|
||||||
|
// char srcAddr[64];
|
||||||
|
// set_tempAddr_from_node(srcAddr, node);
|
||||||
|
|
||||||
|
set_tempAddr_from_node(tempAddr, node);
|
||||||
|
c_af=(node).addr.ss_family;
|
||||||
|
|
||||||
|
sprintf(nameid, "%s_%s_%s_%s",name_f, tempAddr, tempAddr, timeid);
|
||||||
|
|
||||||
|
for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, nameid) == 0); ++tour_i){
|
||||||
|
|
||||||
|
fd_file = open( filename , O_RDONLY);
|
||||||
|
if(fd_file == -1){
|
||||||
|
fprintf(stderr,"error opening file |%s| for reading\n",filename);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
if(tour_i==0) seq = 1;
|
||||||
|
else */
|
||||||
|
seq=0;
|
||||||
|
|
||||||
|
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid);
|
||||||
|
while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){
|
||||||
|
buf_send[len_local_header_ + retread]='\0';
|
||||||
|
if(sendto(fds[(c_af==AF_INET6)].fd,
|
||||||
|
buf_send, retread+len_local_header_,
|
||||||
|
0,
|
||||||
|
(struct sockaddr*)&((node).addr),
|
||||||
|
(node).addr_len) !=
|
||||||
|
retread + len_local_header_
|
||||||
|
){
|
||||||
|
fprintf(stderr, "Error sending response to %s\n",tempAddr);
|
||||||
|
}else{
|
||||||
|
printf("debug: sending response to < %s > seq=[%ld] ",tempAddr,seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
++seq;
|
||||||
|
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid);
|
||||||
|
}
|
||||||
|
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq, tempAddr,timeid);
|
||||||
|
if(sendto(fds[(c_af==AF_INET6)].fd,
|
||||||
|
buf_send, len_local_header_,
|
||||||
|
0,
|
||||||
|
(struct sockaddr*)&((node).addr),
|
||||||
|
(node).addr_len) !=
|
||||||
|
len_local_header_
|
||||||
|
){
|
||||||
|
fprintf(stderr, "Error ending response to %s, len_buf_header=%ld\n", tempAddr, len_local_header_);
|
||||||
|
}else{
|
||||||
|
printf("debug: ending response to < %s > [%ld] EOF",tempAddr,seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
close(fd_file);
|
||||||
|
printf("debug: fd=%d closed: filename=%s, for %s\n",fd_file,filename, tempAddr);
|
||||||
|
|
||||||
|
size_t delay = 4000000;
|
||||||
|
printf("debug: wait %ld before checking, in tour:%d\n",delay, tour_i);
|
||||||
|
usleep(delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
free(timeid);
|
||||||
|
purge_ptr_type_list_y_ptr_STRING(m_str_name_f);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
///
|
||||||
|
|
||||||
/* */
|
/* */
|
||||||
#define TEMP_ADDR 1
|
|
||||||
|
|
||||||
//void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename)
|
//void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename)
|
||||||
void* y_socket_send_file_for_all_nodes(void* arg){
|
void* y_socket_send_file_for_all_nodes(void* arg){
|
||||||
@@ -509,16 +686,19 @@ void record_buffer_to_file(void *arg){
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
#if 1
|
#if 1
|
||||||
void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename ){
|
void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /*char * srcAddr*/, char *filename ){
|
||||||
//printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count);
|
//printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count);
|
||||||
|
char srcAddr[64];
|
||||||
|
set_tempAddr_from_node(srcAddr, node);
|
||||||
|
|
||||||
|
struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1);
|
||||||
|
char * name_f=m_str_name_f->end_list->value->buf;
|
||||||
//size_t size_m_str = 0;
|
//size_t size_m_str = 0;
|
||||||
struct list_y_ptr_STRING * local_current_no_rec = m_str->begin_list;
|
struct list_y_ptr_STRING * local_current_no_rec = m_str->begin_list;
|
||||||
struct list_y_ptr_STRING * local_current;
|
struct list_y_ptr_STRING * local_current;
|
||||||
for(local_current = local_current_no_rec; local_current; local_current = local_current->next){
|
for(local_current = local_current_no_rec; local_current; local_current = local_current->next){
|
||||||
char *buf_loc = local_current->value->buf;
|
char *buf_loc = local_current->value->buf;
|
||||||
char nameid[BUF_SIZE]="";
|
char nameid[BUF_SIZE/2]="";
|
||||||
size_t size_nameid=0;
|
size_t size_nameid=0;
|
||||||
struct js_value * js_header_v = create_js_value(buf_loc,NULL);
|
struct js_value * js_header_v = create_js_value(buf_loc,NULL);
|
||||||
//struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v );
|
//struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v );
|
||||||
@@ -549,8 +729,15 @@ void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_l
|
|||||||
char *content = buf_loc+ length_js_header;
|
char *content = buf_loc+ length_js_header;
|
||||||
size_t size_content = strlen(content);// js_header_v->length - length_js_header;
|
size_t size_content = strlen(content);// js_header_v->length - length_js_header;
|
||||||
enum cmd_type cmd_t = cmd_post_file;
|
enum cmd_type cmd_t = cmd_post_file;
|
||||||
size_nameid = sprintf(nameid, "%s_%s_%s_%s",filename,srcAddr, value_of_(js_dst_v)->type.string,value_of_(js_tm_v)->type.string);
|
//char *timeid = value_of_(js_tm_v)->type.string;
|
||||||
|
#if 0
|
||||||
|
size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f /*filename*/, srcAddr, value_of_(js_dst_v)->type.string, timeid/*value_of_(js_tm_v)->type.string*/);
|
||||||
|
#endif
|
||||||
|
size_nameid = sprintf(nameid, "%s_%s_%s_%s",name_f, srcAddr, value_of_(js_dst_v)->type.string, value_of_(js_tm_v)->type.string);
|
||||||
printf("debug: nameid = %s\n", nameid);
|
printf("debug: nameid = %s\n", nameid);
|
||||||
|
|
||||||
|
//int intTimeid = atoi(timeid);
|
||||||
|
|
||||||
y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof);
|
y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof);
|
||||||
long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt);
|
long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt);
|
||||||
if(ret_app != -2){
|
if(ret_app != -2){
|
||||||
@@ -565,9 +752,23 @@ void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_l
|
|||||||
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index);
|
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index);
|
||||||
free_y_ptr_HEADER_T(l_head_to_remove->value);
|
free_y_ptr_HEADER_T(l_head_to_remove->value);
|
||||||
free(l_head_to_remove);
|
free(l_head_to_remove);
|
||||||
|
// sendto srcAddr { "cmd" : "post ok nameid" } again !
|
||||||
|
char buf[BUF_SIZE];
|
||||||
|
size_t len_buf = sprintf(buf, "{ \"cmd\" : \"post ok %s\" }", nameid);
|
||||||
|
if(sendto(fds[(node.addr.ss_family==AF_INET6)].fd, buf, len_buf, 0, (struct sockaddr*)&((node).addr), (node).addr_len) != len_buf){
|
||||||
|
fprintf(stderr, "Error sending ok %s to %s\n", nameid,srcAddr);
|
||||||
|
}else{
|
||||||
|
printf("debug: sending OK %s to < %s > ",nameid,srcAddr);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}/*else if(intTimeNow-intTimeid > TTL_SOCKDRAM){
|
||||||
|
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index);
|
||||||
|
free_y_ptr_HEADER_T(l_head_to_remove->value);
|
||||||
|
free(l_head_to_remove);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
}else{
|
}else{
|
||||||
printf("debug: tm missing!");
|
printf("debug: tm missing!");
|
||||||
@@ -613,6 +814,8 @@ void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_l
|
|||||||
m_str = NULL;
|
m_str = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
purge_ptr_type_list_y_ptr_STRING(m_str_name_f);
|
||||||
|
//free(timeNow);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ struct main_list_y_ptr_STRING * split_str_to_main_list_y_ptr_STRING(char *str_or
|
|||||||
char *cur_str = str_org;
|
char *cur_str = str_org;
|
||||||
size_t size_buf=0;
|
size_t size_buf=0;
|
||||||
size_t size_org_cur = 0;
|
size_t size_org_cur = 0;
|
||||||
while(cur_str && (cur_str-str_org < limit_size_str_org)){
|
while(*cur_str && (cur_str-str_org < limit_size_str_org)){
|
||||||
if(*cur_str != sep) ++cur_str;
|
if(*cur_str != sep) ++cur_str;
|
||||||
else{
|
else{
|
||||||
size_buf = cur_str - buf;
|
size_buf = cur_str - buf;
|
||||||
|
|||||||
@@ -346,6 +346,7 @@ struct arg_handler_{
|
|||||||
struct y_socket_t *sock;
|
struct y_socket_t *sock;
|
||||||
struct argWorker *argw ;
|
struct argWorker *argw ;
|
||||||
struct main_list_y_ptr_HEADER_T *m_head_l_t;
|
struct main_list_y_ptr_HEADER_T *m_head_l_t;
|
||||||
|
struct main_list_y_ptr_HEADER_T *m_ok_head_l_t;
|
||||||
};
|
};
|
||||||
|
|
||||||
//void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock)
|
//void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock)
|
||||||
@@ -387,11 +388,14 @@ void* y_socket_handler_(void *arg){
|
|||||||
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
|
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
|
||||||
argS->fds=fds;
|
argS->fds=fds;
|
||||||
argS->nodes=nodes;
|
argS->nodes=nodes;
|
||||||
|
argS->node=argH->node;
|
||||||
argS->filename=filename;
|
argS->filename=filename;
|
||||||
|
argS->m_ok_head_l_t=argH->m_ok_head_l_t;
|
||||||
push_back_list_TYPE_PTR(argw->list_arg, argS);
|
push_back_list_TYPE_PTR(argw->list_arg, argS);
|
||||||
push_back_list_TYPE_PTR(argw->list_arg, filename);
|
push_back_list_TYPE_PTR(argw->list_arg, filename);
|
||||||
struct y_task_t task_send={
|
struct y_task_t task_send={
|
||||||
.func=y_socket_send_file_for_all_nodes,
|
//.func=y_socket_send_file_for_all_nodes,
|
||||||
|
.func=y_socket_send_file_for_node,
|
||||||
.arg=argS,
|
.arg=argS,
|
||||||
.status=TASK_PENDING,
|
.status=TASK_PENDING,
|
||||||
};
|
};
|
||||||
@@ -413,14 +417,15 @@ void* y_socket_handler_(void *arg){
|
|||||||
if(strncmp(buf+5,"file",4)==0){
|
if(strncmp(buf+5,"file",4)==0){
|
||||||
char *filename = buf+10;
|
char *filename = buf+10;
|
||||||
//index_f=strcpy(filename, buf + 10);
|
//index_f=strcpy(filename, buf + 10);
|
||||||
int index_f = strlen(filename);
|
/*
|
||||||
|
int index_f = strlen(filename);
|
||||||
printf("debug: receve_from_node : file: %s\n",filename);
|
printf("debug: receve_from_node : file: %s\n",filename);
|
||||||
for(--index_f; index_f>=0;--index_f){
|
for(--index_f; index_f>=0;--index_f){
|
||||||
if(filename[index_f]=='/') {
|
if(filename[index_f]=='/') {
|
||||||
++index_f;
|
++index_f;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}*/
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
//struct list_y_ptr_STRING * last_record_=NULL;
|
//struct list_y_ptr_STRING * last_record_=NULL;
|
||||||
@@ -462,9 +467,9 @@ void* y_socket_handler_(void *arg){
|
|||||||
|
|
||||||
#else
|
#else
|
||||||
struct main_list_y_ptr_HEADER_T *m_head_l_t = argH->m_head_l_t;
|
struct main_list_y_ptr_HEADER_T *m_head_l_t = argH->m_head_l_t;
|
||||||
char srcAddr[BUF_SIZE];
|
//char srcAddr[BUF_SIZE];
|
||||||
set_tempAddr_from_node(srcAddr, argH->node);
|
//set_tempAddr_from_node(srcAddr, argH->node);
|
||||||
receve_from_node(m_head_l_t, m_str,srcAddr, filename + index_f);
|
receve_from_node(fds, m_head_l_t, m_str,argH->node/* srcAddr*/, filename /*+ index_f*/);
|
||||||
m_str = NULL;
|
m_str = NULL;
|
||||||
#endif
|
#endif
|
||||||
/*
|
/*
|
||||||
@@ -474,7 +479,10 @@ void* y_socket_handler_(void *arg){
|
|||||||
*/
|
*/
|
||||||
// kill_all_workers(argw);
|
// kill_all_workers(argw);
|
||||||
// printf("debug: kill_all\n");
|
// printf("debug: kill_all\n");
|
||||||
}
|
}else if(strncmp(buf+5,"ok",2)==0){
|
||||||
|
char *nameid = buf+8;
|
||||||
|
y_append_to_ok_header_l_(argH->m_ok_head_l_t,nameid );
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -567,7 +575,7 @@ void handle_input_kbd(char *buf, ssize_t buf_len ,void *arg){
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){
|
void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){
|
||||||
struct y_socket_t * argSock = (struct y_socket_t*)arg;
|
struct y_socket_t * argSock = (struct y_socket_t*)arg;
|
||||||
struct pollfd *fds = argSock->fds;
|
struct pollfd *fds = argSock->fds;
|
||||||
|
|
||||||
@@ -597,6 +605,7 @@ void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_head_l_t,struct ma
|
|||||||
ptr_argHandl->node=node;
|
ptr_argHandl->node=node;
|
||||||
ptr_argHandl->argw=workers->begin_list->value->arg;
|
ptr_argHandl->argw=workers->begin_list->value->arg;
|
||||||
ptr_argHandl->m_head_l_t=m_head_l_t;
|
ptr_argHandl->m_head_l_t=m_head_l_t;
|
||||||
|
ptr_argHandl->m_ok_head_l_t=m_ok_head_l_t;
|
||||||
|
|
||||||
push_back_list_TYPE_PTR(list_arg, ptr_argHandl);
|
push_back_list_TYPE_PTR(list_arg, ptr_argHandl);
|
||||||
struct y_task_t task_handl = {
|
struct y_task_t task_handl = {
|
||||||
@@ -681,6 +690,8 @@ void *y_socket_poll_fds(void *arg){
|
|||||||
char buf[BUF_SIZE+1];
|
char buf[BUF_SIZE+1];
|
||||||
struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING();
|
struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING();
|
||||||
struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T();
|
struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T();
|
||||||
|
struct main_list_y_ptr_HEADER_T *m_ok_head_l_t = create_var_list_y_ptr_HEADER_T();
|
||||||
|
|
||||||
// char *temp_all_buf=NULL;
|
// char *temp_all_buf=NULL;
|
||||||
|
|
||||||
// char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100];
|
// char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100];
|
||||||
@@ -768,7 +779,7 @@ void *y_socket_poll_fds(void *arg){
|
|||||||
}
|
}
|
||||||
if(m_str){
|
if(m_str){
|
||||||
printf("debug: call handle_buf_socket_rec\n");
|
printf("debug: call handle_buf_socket_rec\n");
|
||||||
handle_buf_socket_rec(m_head_l_t,m_str, node, workers, argx, list_arg, arg);
|
handle_buf_socket_rec(m_ok_head_l_t,m_head_l_t,m_str, node, workers, argx, list_arg, arg);
|
||||||
|
|
||||||
m_str=NULL;
|
m_str=NULL;
|
||||||
}
|
}
|
||||||
@@ -876,7 +887,9 @@ void *y_socket_poll_fds(void *arg){
|
|||||||
kill_all_workers(workers->begin_list->value->arg);
|
kill_all_workers(workers->begin_list->value->arg);
|
||||||
printf("debug: kill all done\n");
|
printf("debug: kill all done\n");
|
||||||
purge_ptr_type_list_y_ptr_HEADER_T(m_head_l_t);
|
purge_ptr_type_list_y_ptr_HEADER_T(m_head_l_t);
|
||||||
printf("debug: purge_ptr_type_list_y_ptr_HEADER_T done\n");
|
printf("debug: purge_ptr_type_list_y_ptr_HEADER_T m_head_l_t done\n");
|
||||||
|
purge_ptr_type_list_y_ptr_HEADER_T(m_ok_head_l_t);
|
||||||
|
printf("debug: purge_ptr_type_list_y_ptr_HEADER_T m_ok_head_l_t done\n");
|
||||||
///// ///// /////
|
///// ///// /////
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user