y_socket: refactor and try to debug kill all

This commit is contained in:
2025-10-15 22:56:16 +02:00
parent 4c1fc03d0c
commit 1ca0eeaf69
15 changed files with 461 additions and 254 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ YSOCKSRC_O=$(YSOCKSRC:.c=.o)
YNODESRC=$(PWD)/src/y_socket_t/y_node_t.c YNODESRC=$(PWD)/src/y_socket_t/y_node_t.c
YNODESRC_O=$(YNODESRC:.c=.o) YNODESRC_O=$(YNODESRC:.c=.o)
YY_STRINGSRC=$(PWD)/src/y_socket_t/y_list_string.c YY_STRINGSRC=$(PWD)/src/y_socket_t/y_list_var_tool.c
YY_STRINGSRC_O=$(YY_STRINGSRC:.c=.o) YY_STRINGSRC_O=$(YY_STRINGSRC:.c=.o)
FILEHANDLERSRC=$(PWD)/src/y_socket_t/y_file_handler.c FILEHANDLERSRC=$(PWD)/src/y_socket_t/y_file_handler.c
@@ -4,7 +4,7 @@
#include "y_socket_t/y_socket_t.h" #include "y_socket_t/y_socket_t.h"
#include "y_socket_t/y_node_t.h" #include "y_socket_t/y_node_t.h"
#include "y_socket_t/y_list_string.h" #include "y_socket_t/y_list_var_tool.h"
#include "y_worker_t/y_worker_t.h" #include "y_worker_t/y_worker_t.h"
#include "y_worker_t/y_task_t.h" #include "y_worker_t/y_task_t.h"
@@ -21,6 +21,7 @@ struct arg_send_file{
y_NODE_T node; y_NODE_T node;
char * filename; char * filename;
struct main_list_y_ptr_HEADER_T *m_ok_head_l_t; struct main_list_y_ptr_HEADER_T *m_ok_head_l_t;
struct main_list_y_ptr_VARIABLE *m_var;
}; };
void* y_socket_send_file_for_all_nodes(void* arg); void* y_socket_send_file_for_all_nodes(void* arg);
@@ -73,7 +74,7 @@ GEN_HEAD_PTR_LIST(y_ptr_HEADER_T)
size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node); size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node);
//void receve_from_node(struct pollfd *fds, char *msg, size_t count); //void receve_from_node(struct pollfd *fds, char *msg, size_t count);
void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /* char * srcAddr*/, char *filename); void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_VARIABLE *m_var, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /* char * srcAddr*/, char *filename);
long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ); long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid );
#endif /*Y_FILE_HANDLER_T_H__C*/ #endif /*Y_FILE_HANDLER_T_H__C*/
@@ -1,26 +0,0 @@
/*file: include/y_socket_t/y_list_string.h */
#ifndef Y_PTR_STRING_T_H__C
#define Y_PTR_STRING_T_H__C
#include <string.h>
#include "list_t/list_t.h"
struct y_string{
char * buf;
size_t size;
};
typedef struct y_string * y_ptr_STRING;
struct y_string * create_y_ptr_STRING(const char *buf, size_t size);
GENERATE_LIST_ALL(y_ptr_STRING)
GEN_HEAD_PTR_LIST(y_ptr_STRING)
size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr);
size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr);
struct main_list_y_ptr_STRING * split_str_to_main_list_y_ptr_STRING(char *str_org, char sep, size_t limit_size_str_org);
#endif /* Y_PTR_STRING_T_H__C */
@@ -0,0 +1,44 @@
/*file: include/y_socket_t/y_list_var_tool.h */
#ifndef Y_PTR_VAR_TOOL_H__C
#define Y_PTR_VAR_TOOL_H__C
#include <string.h>
#include "list_t/list_t.h"
long long_time_id();
struct y_variable{
char * name;
size_t size_name;
void * value;
size_t size_value;
long time_l;
char src[64];/* */
};
typedef struct y_variable * y_ptr_VARIABLE;
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_name, void * value, size_t size_value, char * src);
GENERATE_LIST_ALL(y_ptr_VARIABLE)
GEN_HEAD_PTR_LIST(y_ptr_VARIABLE)
void update_list_y_ptr_VARIABLE(struct main_list_y_ptr_VARIABLE *listVariables, y_ptr_VARIABLE var);
struct y_string{
char * buf;
size_t size;
};
typedef struct y_string * y_ptr_STRING;
struct y_string * create_y_ptr_STRING(const char *buf, size_t size);
GENERATE_LIST_ALL(y_ptr_STRING)
GEN_HEAD_PTR_LIST(y_ptr_STRING)
size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr);
size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr);
struct main_list_y_ptr_STRING * split_str_to_main_list_y_ptr_STRING(char *str_org, char sep, size_t limit_size_str_org);
#endif /* Y_PTR_VAR_TOOL_H__C */
+6 -3
View File
@@ -27,13 +27,16 @@ void init_len_list_y_NODE_T(struct main_list_y_NODE_T *listNodes);
int y_NODE_T_cmp(y_NODE_T nodeA, y_NODE_T nodeB); int y_NODE_T_cmp(y_NODE_T nodeA, y_NODE_T nodeB);
struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *listNodes, y_NODE_T node); struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *listNodes, y_NODE_T node);
int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr); int set_addr_y_NODE_T_from_str_addr(y_NODE_T *node, char * addrStr);
void set_port_y_NODE_T(y_NODE_T *node, int port); void set_port_y_NODE_T_from_int_port(y_NODE_T *node, int port);
void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port); void set_port_y_NODE_T_from_str_port(y_NODE_T *node, char *str_port);
size_t set_addr_str_from_node(char *tempAddr, y_NODE_T node);
const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst); const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst);
void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes); void update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes);
void * remove_node_from_nodes(void* arg); void * remove_node_from_nodes(void* arg);
void * add_node_to_nodes(void* arg); void * add_node_to_nodes(void* arg);
int export_nodes_to_file(char * file_nodes_name, struct main_list_y_NODE_T *nodes);
int import_nodes_from_file(char * file_nodes_name, int int_port, struct main_list_y_NODE_T *nodes);
#endif /* __Y_NODE_T_H__C */ #endif /* __Y_NODE_T_H__C */
+33 -32
View File
@@ -31,7 +31,7 @@
#include "y_worker_t/y_worker_t.h" #include "y_worker_t/y_worker_t.h"
#include "y_worker_t/y_task_t.h" #include "y_worker_t/y_task_t.h"
#include "y_socket_t/y_list_string.h" #include "y_socket_t/y_list_var_tool.h"
#include "json_t/json_t.h" #include "json_t/json_t.h"
@@ -47,37 +47,6 @@ enum ipVersions{
extern const int af_array[nbIpVersion];//={AF_INET, AF_INET6}; extern const int af_array[nbIpVersion];//={AF_INET, AF_INET6};
/* y_ptr_STRING */
#if 0
struct y_string{
char * buf;
size_t size;
};
typedef struct y_string * y_ptr_STRING;
struct y_string * create_y_ptr_STRING(const char *buf, size_t size);
GENERATE_LIST_ALL(y_ptr_STRING)
GEN_HEAD_PTR_LIST(y_ptr_STRING)
size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr);
size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr);
#endif /* y_ptr_STRING */
struct y_variable{
char * name;
void * value;
};
typedef struct y_variable * y_ptr_VARIABLE;
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_value);
GENERATE_LIST_ALL(y_ptr_VARIABLE)
GEN_HEAD_PTR_LIST(y_ptr_VARIABLE)
struct y_socket_t{ struct y_socket_t{
struct pollfd *fds; struct pollfd *fds;
size_t size_fds; size_t size_fds;
@@ -111,6 +80,38 @@ void *threadFuncSend(void *arg);
#define GET_IN_type_ADDR(PointerSockAddr,type) \ #define GET_IN_type_ADDR(PointerSockAddr,type) \
((struct sockaddr_in##type *)(PointerSockAddr))->sin##type##_addr.s##type##_addr ((struct sockaddr_in##type *)(PointerSockAddr))->sin##type##_addr.s##type##_addr
#if 0
/* y_ptr_STRING */
struct y_string{
char * buf;
size_t size;
};
typedef struct y_string * y_ptr_STRING;
struct y_string * create_y_ptr_STRING(const char *buf, size_t size);
GENERATE_LIST_ALL(y_ptr_STRING)
GEN_HEAD_PTR_LIST(y_ptr_STRING)
size_t total_size_list_y_ptr_STRING(struct main_list_y_ptr_STRING *mstr);
size_t copy_list_y_ptr_STRING_to_one_string(char **p_dst_str, struct main_list_y_ptr_STRING *mstr);
#endif /* y_ptr_STRING */
/* y_ptr_VARIABLE */
#if 0
struct y_variable{
char * name;
void * value;
};
typedef struct y_variable * y_ptr_VARIABLE;
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_value);
GENERATE_LIST_ALL(y_ptr_VARIABLE)
GEN_HEAD_PTR_LIST(y_ptr_VARIABLE)
#endif /* y_ptr_VARIABLE */
#endif /* Y_SOCKET_T_H__C */ #endif /* Y_SOCKET_T_H__C */
+48 -83
View File
@@ -125,7 +125,7 @@ int funcCmp_y_ptr_HEADER_T_fn_nameid_mask(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){
long check_if_in_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){ long check_if_in_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok ); y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok );
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T_fn_nameid_mask); struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_end_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T_fn_nameid_mask);
free_y_ptr_HEADER_T(current_header); free_y_ptr_HEADER_T(current_header);
if(l_ocate_header){ if(l_ocate_header){
@@ -137,7 +137,7 @@ long check_if_in_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, ch
long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){ long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, char *nameid ){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok ); y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(nameid, strlen(nameid), cmd_post_ok );
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_end_in_list_y_ptr_HEADER_T(m_ok_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
if(l_ocate_header){ if(l_ocate_header){
free_y_ptr_HEADER_T(current_header); free_y_ptr_HEADER_T(current_header);
printf("debug: already in m_ok_head_l_t"); printf("debug: already in m_ok_head_l_t");
@@ -162,7 +162,7 @@ long y_append_to_ok_header_l_(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, ch
long y_append_content_to_header_l(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){ long y_append_content_to_header_l(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t); y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t);
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T); struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_end_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid); printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid);
if(l_ocate_header){ if(l_ocate_header){
free_y_ptr_HEADER_T(current_header); free_y_ptr_HEADER_T(current_header);
@@ -442,18 +442,33 @@ void fileNameDateScore(char* filename, char * pre, char* post,size_t score){
//return filename; //return filename;
} }
/*
long long_time_id(){
// char *filename=malloc(256);
//char timeid[64];//="20251011215824";
time_t t = time(NULL);
struct tm tm = *localtime(&t);
//sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
long long_tm = (tm.tm_year + 1900)*10000000000+ (tm.tm_mon + 1)*100000000+ tm.tm_mday*1000000 + tm.tm_hour*10000+ tm.tm_min*100+ tm.tm_sec ;
//return filename;
///printf("debug: timeid=%s, vs tm=%ld\n",timeid, intm);
//printf("debug: timeof=%ld, vs tm=%ld, tm_zone=%s\n",tm.tm_gmtoff, long_tm, tm.tm_zone);
return long_tm;
}
char * time_id(){ char * time_id(){
// char *filename=malloc(256); // char *filename=malloc(256);
char *timeid=malloc(128); char *timeid=malloc(128);
time_t t = time(NULL); time_t t = time(NULL);
struct tm tm = *localtime(&t); struct tm tm = *localtime(&t);
sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
//printf("debug: TTT \n%ld\n",int_time_id());
return timeid; return timeid;
//return filename; //return filename;
} }
*/
#if 0 #if 0
struct arg_send_file{ struct arg_send_file{
@@ -463,8 +478,8 @@ struct arg_send_file{
}; };
#endif #endif
/* */ /* */
#if 0
size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node) { size_t set_addr_str_from_node(char *tempAddr, y_NODE_T node) {
int c_af=(node).addr.ss_family; int c_af=(node).addr.ss_family;
if(c_af==AF_INET){ if(c_af==AF_INET){
if(NULL == inet_ntop(c_af, if(NULL == inet_ntop(c_af,
@@ -482,7 +497,7 @@ size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node) {
size_t ret_len = strlen(tempAddr); size_t ret_len = strlen(tempAddr);
return ret_len; return ret_len;
} }
#endif
/// ///
/// ///
/// ///
@@ -520,23 +535,25 @@ void* y_socket_send_file_for_node(void* arg){
size_t seq = 0;//, len_buf_header=0, size_t seq = 0;//, len_buf_header=0,
size_t len_local_header_=0; size_t len_local_header_=0;
char * timeid = time_id(); //char * timeid = time_id();
long timeid = long_time_id();
char nameid[BUF_SIZE/2]; char nameid[BUF_SIZE/2];
struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1); struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1);
char * name_f=m_str_name_f->end_list->value->buf; char * name_f=m_str_name_f->end_list->value->buf;
// char srcAddr[64]; // char srcAddr[64];
// set_tempAddr_from_node(srcAddr, node); // set_addr_str_from_node(srcAddr, node);
set_tempAddr_from_node(tempAddr, node); set_addr_str_from_node(tempAddr, node);
c_af=(node).addr.ss_family; c_af=(node).addr.ss_family;
sprintf(nameid, "%s_%s_%s_%s",name_f, tempAddr, tempAddr, timeid); sprintf(nameid, "%s_%s_%s_%ld",name_f, tempAddr, tempAddr, timeid);
for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, nameid) == 0); ++tour_i){ for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, nameid) == 0); ++tour_i){
fd_file = open( filename , O_RDONLY); fd_file = open( filename , O_RDONLY);
if(fd_file == -1){ if(fd_file == -1){
fprintf(stderr,"error opening file |%s| for reading\n",filename); fprintf(stderr,"error opening file |%s| for reading\n",filename);
purge_ptr_type_list_y_ptr_STRING(m_str_name_f);
return NULL; return NULL;
} }
/* /*
@@ -544,7 +561,7 @@ for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, na
else */ else */
seq=0; seq=0;
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%ld\" }",filename, seq,tempAddr, timeid);
while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){ while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){
buf_send[len_local_header_ + retread]='\0'; buf_send[len_local_header_ + retread]='\0';
if(sendto(fds[(c_af==AF_INET6)].fd, if(sendto(fds[(c_af==AF_INET6)].fd,
@@ -560,9 +577,9 @@ for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, na
} }
++seq; ++seq;
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%ld\" }",filename, seq,tempAddr, timeid);
} }
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq, tempAddr,timeid); len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%ld\" }",filename, seq, tempAddr,timeid);
if(sendto(fds[(c_af==AF_INET6)].fd, if(sendto(fds[(c_af==AF_INET6)].fd,
buf_send, len_local_header_, buf_send, len_local_header_,
0, 0,
@@ -588,7 +605,7 @@ for(int tour_i=0;(tour_i<4) && (check_if_in_ok_header_l_(argS->m_ok_head_l_t, na
free(timeid); //free(timeid);
purge_ptr_type_list_y_ptr_STRING(m_str_name_f); purge_ptr_type_list_y_ptr_STRING(m_str_name_f);
return NULL; return NULL;
} }
@@ -601,6 +618,10 @@ void* y_send_buf_for_all_(void* arg){
struct pollfd *fds=argS->fds; struct pollfd *fds=argS->fds;
struct main_list_y_NODE_T *nodes=argS->nodes; struct main_list_y_NODE_T *nodes=argS->nodes;
// if(export_nodes_to_file("file_nodes_name", nodes)==-1){
// fprintf(stderr, "error export_nodes_to_file\n");
// }
char * buf_send=argS->filename; char * buf_send=argS->filename;
#if TEMP_ADDR #if TEMP_ADDR
char tempAddr[64]; char tempAddr[64];
@@ -623,7 +644,7 @@ void* y_send_buf_for_all_(void* arg){
size_t len_buf_send=strlen(buf_send); size_t len_buf_send=strlen(buf_send);
for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){
set_tempAddr_from_node(tempAddr, local_list_current->value); set_addr_str_from_node(tempAddr, local_list_current->value);
c_af=(local_list_current->value).addr.ss_family; c_af=(local_list_current->value).addr.ss_family;
@@ -652,6 +673,7 @@ void* y_send_buf_for_all_(void* arg){
/* */ /* */
#if 0
//void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename) //void y_socket_send_file_for_all_nodes(struct pollfd *fds, struct main_list_y_NODE_T *nodes, char * filename)
void* y_socket_send_file_for_all_nodes(void* arg){ void* y_socket_send_file_for_all_nodes(void* arg){
struct arg_send_file *argS=(struct arg_send_file*)arg; struct arg_send_file *argS=(struct arg_send_file*)arg;
@@ -682,7 +704,8 @@ void* y_socket_send_file_for_all_nodes(void* arg){
size_t seq = 0;//, len_buf_header=0, size_t seq = 0;//, len_buf_header=0,
size_t len_local_header_=0; size_t len_local_header_=0;
char * timeid = time_id(); //char * timeid = time_id();
long timeid = long_time_id();
for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){ for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){
fd_file = open( filename , O_RDONLY); fd_file = open( filename , O_RDONLY);
@@ -690,12 +713,12 @@ void* y_socket_send_file_for_all_nodes(void* arg){
fprintf(stderr,"error opening file |%s| for reading\n",filename); fprintf(stderr,"error opening file |%s| for reading\n",filename);
return NULL; return NULL;
} }
set_tempAddr_from_node(tempAddr, local_list_current->value); set_addr_str_from_node(tempAddr, local_list_current->value);
c_af=(local_list_current->value).addr.ss_family; c_af=(local_list_current->value).addr.ss_family;
seq=0; seq=0;
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%ld\" }",filename, seq,tempAddr, timeid);
while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){ while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){
buf_send[len_local_header_ + retread]='\0'; buf_send[len_local_header_ + retread]='\0';
if(sendto(fds[(c_af==AF_INET6)].fd, if(sendto(fds[(c_af==AF_INET6)].fd,
@@ -711,9 +734,9 @@ void* y_socket_send_file_for_all_nodes(void* arg){
} }
++seq; ++seq;
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid); len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%ld\" }",filename, seq,tempAddr, timeid);
} }
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq, tempAddr,timeid); len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%ld\" }",filename, seq, tempAddr,timeid);
if(sendto(fds[(c_af==AF_INET6)].fd, if(sendto(fds[(c_af==AF_INET6)].fd,
buf_send, len_local_header_, buf_send, len_local_header_,
0, 0,
@@ -733,30 +756,15 @@ void* y_socket_send_file_for_all_nodes(void* arg){
free(timeid); //free(timeid);
return NULL; return NULL;
} }
#endif
void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_VARIABLE *m_var, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /*char * srcAddr*/, char *filename ){
//main_list_y_ptr_HEADER_T
/*
struct arg_record_to_file{
int *is_file_to_record,
char * filename,
char *buf
};
void record_buffer_to_file(void *arg){
}
*/
void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, y_NODE_T node /*char * srcAddr*/, char *filename ){
//printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count); //printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count);
char srcAddr[64]; char srcAddr[64];
set_tempAddr_from_node(srcAddr, node); set_addr_str_from_node(srcAddr, node);
struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1); struct main_list_y_ptr_STRING *m_str_name_f=split_str_to_main_list_y_ptr_STRING(filename, '/', -1);
char * name_f=m_str_name_f->end_list->value->buf; char * name_f=m_str_name_f->end_list->value->buf;
@@ -905,46 +913,3 @@ void receve_from_node(struct pollfd *fds, struct main_list_y_ptr_HEADER_T *m_hea
purge_ptr_type_list_y_ptr_STRING(m_str_name_f); purge_ptr_type_list_y_ptr_STRING(m_str_name_f);
//free(timeNow); //free(timeNow);
} }
/*
char filename[500];
int fd_file;
long int nread;
char buf[BUF_SIZE];
struct sockaddr_storage peer_addr;
socklen_t peer_addr_len = sizeof(struct sockaddr_storage);
//update_nodes(y_NODE_T node, struct main_list_y_NODE_T *nodes);
fileNameDateScore(filename, "__",msg,count);
if((fd_file = open(filename, O_WRONLY | O_CREAT ,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) == -1){
fprintf(stderr,"erreur write %s\n",filename);
return ; //NULL;
}
memset(buf,0, BUF_SIZE);
while((nread = recvfrom(fds[1].fd, buf, BUF_SIZE, 0,
(struct sockaddr *)&peer_addr, &peer_addr_len
//(struct sockaddr *)&(node.addr), &(node.addr_len)
)) == BUF_SIZE){
if(nread == -1)
{
fprintf(stderr,"error nread\n");
}
else {
printf("msg: %s\n",buf);
write(fd_file, buf, nread);
}
printf("nread==%ld\n",nread);
}
if(nread >0 && nread <BUF_SIZE){
printf("msg: %s\n",buf);
write(fd_file, buf, nread);
}
printf("debug: <receve_from_node> close nread==%ld\n",nread);
close(fd_file);
*/
@@ -1,5 +1,83 @@
/*file: "src/y_socket_t/y_list_string.c" */ /*file: "src/y_socket_t/y_list_var_tool.c" */
#include "y_socket_t/y_list_string.h" #include "y_socket_t/y_list_var_tool.h"
long long_time_id(){
// char *filename=malloc(256);
//char timeid[64];//="20251011215824";
time_t t = time(NULL);
struct tm tm = *localtime(&t);
//sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
long long_tm = (tm.tm_year + 1900)*10000000000+ (tm.tm_mon + 1)*100000000+ tm.tm_mday*1000000 + tm.tm_hour*10000+ tm.tm_min*100+ tm.tm_sec ;
//return filename;
///printf("debug: timeid=%s, vs tm=%ld\n",timeid, intm);
//printf("debug: timeof=%ld, vs tm=%ld, tm_zone=%s\n",tm.tm_gmtoff, long_tm, tm.tm_zone);
return long_tm;
}
char * time_id(){
// char *filename=malloc(256);
char *timeid=malloc(128);
time_t t = time(NULL);
struct tm tm = *localtime(&t);
sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
//printf("debug: TTT \n%ld\n",int_time_id());
return timeid;
//return filename;
}
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_name, void * value, size_t size_value, char *src){
struct y_variable *variable=malloc(sizeof(struct y_variable));
variable->size_name = size_name;
variable->name=malloc(size_name+1);
if(name){
memcpy(variable->name, name, size_name+1);
if(name[size_name]!='\0')
variable->name[size_name]='\0';
}
variable->size_value = size_value;
variable->value=malloc(size_value);
memcpy(variable->value, value, size_value);
variable->time_l = long_time_id();
if(src){
memcpy(variable->src, src, 64);
}else{
memset(variable->src, 0, 64);
}
return variable;
}
GEN_LIST_ALL(y_ptr_VARIABLE)
GEN_FUNC_PTR_LIST_FREE(y_ptr_VARIABLE){
free(arg->name);
free(arg->value);
free(arg);
}
int y_ptr_VARIABLE_cmp(y_ptr_VARIABLE varA, y_ptr_VARIABLE varB){
return strcmp(varA->name, varB->name);
}
struct list_y_ptr_VARIABLE * search_variable_in_list_y_ptr_VARIABLE(struct main_list_y_ptr_VARIABLE *listVariables, y_ptr_VARIABLE var){
return search_first_occ_from_end_in_list_y_ptr_VARIABLE(listVariables, var, y_ptr_VARIABLE_cmp);
//return search_first_occ_from_begin_in_list_y_NODE_T(listNodes, node, y_NODE_T_cmp);
}
void update_list_y_ptr_VARIABLE_then_free_if_needed(struct main_list_y_ptr_VARIABLE *listVariables, y_ptr_VARIABLE var){
struct list_y_ptr_VARIABLE * l_y_ptr_var = search_variable_in_list_y_ptr_VARIABLE(listVariables, var);
if(l_y_ptr_var){
memcpy(l_y_ptr_var->value->value, var->value, var->size_value);
l_y_ptr_var->value->time_l = var->time_l;
free_y_ptr_VARIABLE(var);
}
else{
push_back_list_y_ptr_VARIABLE(listVariables, var);
}
}
struct y_string * create_y_ptr_STRING(const char *buf, size_t size){ struct y_string * create_y_ptr_STRING(const char *buf, size_t size){
struct y_string *string=malloc(sizeof(struct y_string)); struct y_string *string=malloc(sizeof(struct y_string));
+137 -18
View File
@@ -21,23 +21,37 @@ void init_len_list_y_NODE_T(struct main_list_y_NODE_T *listNodes){
} }
int y_NODE_T_cmp(y_NODE_T nodeA, y_NODE_T nodeB){ int y_NODE_T_cmp(y_NODE_T nodeA, y_NODE_T nodeB){
int ret = nodeA.addr_len - nodeB.addr_len; #if 0
char addrA[65];
char addrB[65];
set_addr_str_from_node(addrA, nodeA);
set_addr_str_from_node(addrB, nodeB);
printf("nodeAstr=%s nodeBstr=%s ",addrA,addrB);
// return strcmp(addrA,addrB);
#endif
int ret;
ret = nodeA.addr_len - nodeB.addr_len;
if(ret == 0){ if(ret == 0){
//printf("debug: ++++++++++++ lenA=%d, lenB=%d\n", nodeA.addr_len , nodeB.addr_len);
ret = nodeA.addr.ss_family - nodeB.addr.ss_family; ret = nodeA.addr.ss_family - nodeB.addr.ss_family;
if(ret == 0){ if(ret == 0){
if(nodeA.addr.ss_family == AF_INET){ if(nodeA.addr.ss_family == AF_INET){
//ret = memcmp((struct sockaddr_in*)&(nodeA.addr), (struct sockaddr_in*)&(nodeB.addr), sizeof(struct sockaddr_in)); //ret = memcmp((struct sockaddr_in*)&(nodeA.addr), (struct sockaddr_in*)&(nodeB.addr), sizeof(struct sockaddr_in));
ret = ((struct sockaddr_in*)&(nodeA.addr))->sin_port - ((struct sockaddr_in*)&(nodeB.addr))->sin_port; ret = ((struct sockaddr_in*)&(nodeA.addr))->sin_port - ((struct sockaddr_in*)&(nodeB.addr))->sin_port;
if(ret==0) if(ret==0){
ret = GET_IN_type_ADDR(&(nodeA.addr),) - GET_IN_type_ADDR(&(nodeB.addr),); //ret = GET_IN_type_ADDR(&(nodeA.addr),) - GET_IN_type_ADDR(&(nodeB.addr),);
//ret = ((struct sockaddr_in*)&(nodeA.addr))->sin_addr.s_addr - ((struct sockaddr_in*)&(nodeB.addr))->sin_addr.s_addr; ret = ((struct sockaddr_in*)&(nodeA.addr))->sin_addr.s_addr - ((struct sockaddr_in*)&(nodeB.addr))->sin_addr.s_addr;
//printf("debug --->ret=%d; nodeAint=%d vs nodeBint=%d",ret, ((struct sockaddr_in*)&(nodeA.addr))->sin_addr.s_addr , ((struct sockaddr_in*)&(nodeB.addr))->sin_addr.s_addr);
}
}else }else
if(nodeA.addr.ss_family == AF_INET6){ if(nodeA.addr.ss_family == AF_INET6){
ret = ((struct sockaddr_in6*)&(nodeA.addr))->sin6_port - ((struct sockaddr_in6*)&(nodeB.addr))->sin6_port; ret = ((struct sockaddr_in6*)&(nodeA.addr))->sin6_port - ((struct sockaddr_in6*)&(nodeB.addr))->sin6_port;
if(ret ==0) if(ret ==0)
//ret = memcmp((struct sockaddr_in6*)&(nodeA.addr), (struct sockaddr_in6*)&(nodeB.addr), sizeof(struct sockaddr_in6)); //ret = memcmp((struct sockaddr_in6*)&(nodeA.addr), (struct sockaddr_in6*)&(nodeB.addr), sizeof(struct sockaddr_in6));
ret = memcmp(GET_IN_type_ADDR(&(nodeA.addr),6), GET_IN_type_ADDR(&(nodeB.addr),6), 8); //ret = memcmp(GET_IN_type_ADDR(&(nodeA.addr),6), GET_IN_type_ADDR(&(nodeB.addr),6), 8);
//ret = memcmp(((struct sockaddr_in6*)&(nodeA.addr))->sin6_addr.s6_addr , ((struct sockaddr_in6*)&(nodeB.addr))->sin6_addr.s6_addr, 8); ret = memcmp(((struct sockaddr_in6*)&(nodeA.addr))->sin6_addr.s6_addr , ((struct sockaddr_in6*)&(nodeB.addr))->sin6_addr.s6_addr, 8);
} }
} }
} }
@@ -52,10 +66,11 @@ struct list_y_NODE_T * search_node_in_list_y_NODE_T(struct main_list_y_NODE_T *l
} }
return NULL; return NULL;
*/ */
return search_first_occ_from_begin_in_list_y_NODE_T(listNodes, node, y_NODE_T_cmp); return search_first_occ_from_end_in_list_y_NODE_T(listNodes, node, y_NODE_T_cmp);
//return search_first_occ_from_begin_in_list_y_NODE_T(listNodes, node, y_NODE_T_cmp);
} }
int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){ int set_addr_y_NODE_T_from_str_addr(y_NODE_T *node, char * addrStr){
//memset(&(node->addr), 0, sizeof(struct sockaddr_storage)); //memset(&(node->addr), 0, sizeof(struct sockaddr_storage));
int af = AF_INET, ret = -2; int af = AF_INET, ret = -2;
for(int i=0; i<strlen(addrStr); ++i){ for(int i=0; i<strlen(addrStr); ++i){
@@ -67,11 +82,13 @@ int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){
} }
} }
node->addr.ss_family = af; node->addr.ss_family = af;
if(af==AF_INET) if(af==AF_INET){
ret = inet_pton(af, addrStr, &(GET_IN_type_ADDR(&(node->addr),))); ret = inet_pton(af, addrStr, &(GET_IN_type_ADDR(&(node->addr),)));
else if(af == AF_INET6){ node->addr_len = sizeof(struct sockaddr_in);
}else if(af == AF_INET6){
//((struct sockaddr_in6*)(&(node->addr)))->sin6_flowinfo = 0; //((struct sockaddr_in6*)(&(node->addr)))->sin6_flowinfo = 0;
ret = inet_pton(af, addrStr, (GET_IN_type_ADDR(&(node->addr), 6))); ret = inet_pton(af, addrStr, (GET_IN_type_ADDR(&(node->addr), 6)));
node->addr_len = sizeof(struct sockaddr_in6);
} }
return ret; return ret;
@@ -79,7 +96,7 @@ int set_addr_y_NODE_T(y_NODE_T *node, char * addrStr){
} }
void set_port_y_NODE_T(y_NODE_T *node, int port){ void set_port_y_NODE_T_from_int_port(y_NODE_T *node, int port){
int af = node->addr.ss_family; int af = node->addr.ss_family;
if(af==AF_INET) if(af==AF_INET)
((struct sockaddr_in*)(&(node->addr)))->sin_port = htons(port); ((struct sockaddr_in*)(&(node->addr)))->sin_port = htons(port);
@@ -87,9 +104,9 @@ void set_port_y_NODE_T(y_NODE_T *node, int port){
((struct sockaddr_in6*)(&(node->addr)))->sin6_port = htons(port); ((struct sockaddr_in6*)(&(node->addr)))->sin6_port = htons(port);
} }
void set_str_port_y_NODE_T(y_NODE_T *node, char *str_port){ void set_port_y_NODE_T_from_str_port(y_NODE_T *node, char *str_port){
int port = atoi(str_port); int port = atoi(str_port);
set_port_y_NODE_T(node, port); set_port_y_NODE_T_from_int_port(node, port);
} }
const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst){ const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst){
@@ -103,22 +120,22 @@ const char * put_y_NODE_T_in_string(y_NODE_T *node, char * dst){
printf("getnameinfo: %s\n", gai_strerror(status)); printf("getnameinfo: %s\n", gai_strerror(status));
sprintf(dst,"%s:[%s]",host,service); sprintf(dst,"%s:[%s]",host,service);
#endif #endif
char temp_addr[INET6_ADDRSTRLEN]; char buffer_addr[INET6_ADDRSTRLEN];
if(node->addr.ss_family == AF_INET){ if(node->addr.ss_family == AF_INET){
struct sockaddr_in *sinaddrv4 = ((struct sockaddr_in*)&(node->addr)); struct sockaddr_in *sinaddrv4 = ((struct sockaddr_in*)&(node->addr));
if(inet_ntop(node->addr.ss_family, &(sinaddrv4->sin_addr), if(inet_ntop(node->addr.ss_family, &(sinaddrv4->sin_addr),
temp_addr, INET6_ADDRSTRLEN) == NULL){ buffer_addr, INET6_ADDRSTRLEN) == NULL){
return NULL; return NULL;
} }
sprintf(dst, "%s:[%d]",temp_addr, ntohs(sinaddrv4->sin_port )); sprintf(dst, "%s:[%d]",buffer_addr, ntohs(sinaddrv4->sin_port ));
}else if(node->addr.ss_family == AF_INET6){ }else if(node->addr.ss_family == AF_INET6){
struct sockaddr_in6 *sinaddrv6 = ((struct sockaddr_in6*)&(node->addr)); struct sockaddr_in6 *sinaddrv6 = ((struct sockaddr_in6*)&(node->addr));
if(inet_ntop(node->addr.ss_family, &(sinaddrv6->sin6_addr), if(inet_ntop(node->addr.ss_family, &(sinaddrv6->sin6_addr),
temp_addr, INET6_ADDRSTRLEN) == NULL){ buffer_addr, INET6_ADDRSTRLEN) == NULL){
return NULL; return NULL;
} }
sprintf(dst, "%s:[%d]",temp_addr, ntohs(((struct sockaddr_in6*)&(node->addr))->sin6_port )); sprintf(dst, "%s:[%d]",buffer_addr, ntohs(((struct sockaddr_in6*)&(node->addr))->sin6_port ));
} }
return dst; return dst;
} }
@@ -176,3 +193,105 @@ void* update_nodes(void* arg)
// return NULL; // return NULL;
} }
size_t set_addr_str_from_node(char *tempAddr, y_NODE_T node) {
int c_af=(node).addr.ss_family;
if(c_af==AF_INET){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(node),)),
tempAddr, INET6_ADDRSTRLEN/*(argSock->local_list_current->value).addr_len*/)){
fprintf(stderr, "error inet_ntop v4\n");
}
}else if(c_af==AF_INET6){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(node),6)),
tempAddr, INET6_ADDRSTRLEN /* node.addr_len BUF_SIZE (argSock->local_list_current->value).addr_len*/)){
fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno);
}
}
size_t ret_len = strlen(tempAddr);
printf("debug: c_af=%d, ret_len=%ld, addr=%s\n",c_af, ret_len,tempAddr);
return ret_len;
}
int export_nodes_to_file(char * file_nodes_name, struct main_list_y_NODE_T *nodes){
int fd_file ;
if((fd_file = open(file_nodes_name, O_WRONLY | O_CREAT | O_TRUNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) == -1){
fprintf(stderr,"erreur write %s\n",file_nodes_name);
return -1;
}
char buffer_addr[65];
struct list_y_NODE_T *tmp_nd=nodes->begin_list;
int len_addr=0;
while(tmp_nd){
len_addr=set_addr_str_from_node(buffer_addr,tmp_nd->value);
buffer_addr[len_addr++]='\n';
buffer_addr[len_addr]='\0';
write(fd_file, buffer_addr, len_addr);
printf("debug: [#%d] buffer_addr=|%s|[index:%ld]\n",len_addr,buffer_addr,tmp_nd->index);
tmp_nd=tmp_nd->next;
memset(buffer_addr,0,65);
}
close(fd_file);
return fd_file;
}
int import_nodes_from_file(char * file_nodes_name, int int_port, struct main_list_y_NODE_T *nodes){
int fd_file ;
fd_file = open( file_nodes_name , O_RDONLY);
if(fd_file == -1){
fprintf(stderr,"error opening file |%s| for reading\n",file_nodes_name);
return -1;
}
#define SIZE_ADDR_LOC 28
char buffer_addr[SIZE_ADDR_LOC+1], current_addr[SIZE_ADDR_LOC+1],*begin_addr,*cur_Str=NULL;
int retread;
int offset=0;
memset(buffer_addr,0,SIZE_ADDR_LOC+1);
//memset(current_addr,0,SIZE_ADDR_LOC+1);
while((retread = read(fd_file, buffer_addr+offset, SIZE_ADDR_LOC-offset) ) > 0 ){
buffer_addr[offset+retread]='\0';
//printf("debug: --++buffer_addr:|%s|, retread #%d#\n",buffer_addr,retread);
begin_addr=buffer_addr;
for(cur_Str=buffer_addr;*cur_Str ;++cur_Str){
if(*cur_Str=='\n'){
memset(current_addr,0,SIZE_ADDR_LOC+1);
strncpy(current_addr,begin_addr,cur_Str-begin_addr);
//printf("debug: current_addr:[%s]\n",current_addr);
y_NODE_T node;
if(set_addr_y_NODE_T_from_str_addr(&node,current_addr)){
set_port_y_NODE_T_from_int_port(&node, int_port);
update_nodes(node,nodes);
begin_addr = cur_Str+1;
}else{
//printf("debug: something wrong, perhaps format!! current_addr:[%s]\n",current_addr);
}
}
}
offset=cur_Str-begin_addr;
//memset(current_addr,0,SIZE_ADDR_LOC+1);
//strncpy(current_addr,begin_addr,offset);
strncpy(buffer_addr,begin_addr,offset);
//printf("debug: offset=%d current_addr:[%s]\n",offset,current_addr);
//memcpy(buffer_addr,current_addr,SIZE_ADDR_LOC+1);
//printf("debug: **buffer_addr:|%s|, strlen #%ld#\n",buffer_addr,strlen(buffer_addr));
}
//printf("debug: >>> buffer_addr:|%s|\n",buffer_addr);
if(offset>1){
y_NODE_T node;
if(set_addr_y_NODE_T_from_str_addr(&node,buffer_addr)){
set_port_y_NODE_T_from_int_port(&node, int_port);
update_nodes(node,nodes);
}
}
close(fd_file);
return fd_file;
}
+48 -43
View File
@@ -2,31 +2,9 @@
#include "y_socket_t/y_socket_t.h" #include "y_socket_t/y_socket_t.h"
//#include "y_socket_t/y_list_string.h" //#include "y_socket_t/y_list_var_tool.h"
//#include "json_t/json_t.h" //#include "json_t/json_t.h"
struct y_variable * create_y_ptr_VARIABLE(const char *name, size_t size_value){
struct y_variable *variable=malloc(sizeof(struct y_variable));
size_t len_name = strlen(name);
variable->name=malloc(len_name+1);
variable->value=malloc(size_value);
if(name){
memcpy(variable->name, name, len_name+1);
if(name[len_name]!='\0')
variable->name[len_name]='\0';
}
return variable;
}
GEN_LIST_ALL(y_ptr_VARIABLE)
GEN_FUNC_PTR_LIST_FREE(y_ptr_VARIABLE){
free(arg->name);
free(arg->value);
free(arg);
}
const int af_array[nbIpVersion]={AF_INET, AF_INET6}; const int af_array[nbIpVersion]={AF_INET, AF_INET6};
struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){ struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers){
@@ -196,6 +174,7 @@ int flags = fcntl(fds[af].fd, F_GETFL);
} }
struct arg_handler_{ struct arg_handler_{
struct main_list_y_ptr_STRING *m_str; struct main_list_y_ptr_STRING *m_str;
struct main_list_y_ptr_VARIABLE *m_var;
//char *buf; //char *buf;
struct pollfd *fds; struct pollfd *fds;
y_NODE_T node; y_NODE_T node;
@@ -233,9 +212,10 @@ void* y_socket_handler_(void *arg){
} }
/* */ /* */
char * buf = js_cmd->type.object.value->type.string; char * buf = js_cmd->type.object.value->type.string;
size_t len_buf=strlen(buf);
if(strncmp(buf, "get", 3)==0){ if(strncmp(buf, "get", 3)==0){
if(strncmp(buf+4,"file",4)==0){ if(len_buf > 4 && strncmp(buf+4,"file",4)==0){
if(len_buf > 9){
size_t len_filename = strlen(buf + 9); size_t len_filename = strlen(buf + 9);
char *filename = malloc(len_filename+1); char *filename = malloc(len_filename+1);
memcpy(filename, buf + 9, len_filename ); memcpy(filename, buf + 9, len_filename );
@@ -259,16 +239,24 @@ void* y_socket_handler_(void *arg){
//y_socket_send_file_for_all_nodes(fds, nodes, filename) ; //y_socket_send_file_for_all_nodes(fds, nodes, filename) ;
} }
} }
else if(strncmp(buf, "update", 6)==0){ }
else if(len_buf >7 && strncmp(buf, "update", 6)==0){
if(strncmp(buf+7,"kill",4)==0){ if(strncmp(buf+7,"kill",4)==0){
pthread_mutex_lock(sock->mut_go_on); pthread_mutex_lock(sock->mut_go_on);
sock->go_on = 0; sock->go_on = 0;
pthread_mutex_unlock(sock->mut_go_on); pthread_mutex_unlock(sock->mut_go_on);
// kill_all_workers(argw); // kill_all_workers(argw);
// printf("debug: kill_all\n");
}else if(strncmp(buf+7,"file nodes",10)==0){
if(export_nodes_to_file(".file_nodes_name", nodes)==-1){
fprintf(stderr, "error export_nodes_to_file\n");
}
// kill_all_workers(argw);
// printf("debug: kill_all\n"); // printf("debug: kill_all\n");
}else if(strncmp(buf+7,"remove node",11)==0){ }else if(strncmp(buf+7,"remove node",11)==0){
if(set_addr_y_NODE_T(&(argH->node), buf + 19)){ if(len_buf>19 && set_addr_y_NODE_T_from_str_addr(&(argH->node), buf + 19)){
set_str_port_y_NODE_T(&(argH->node), argH->sock->port); set_port_y_NODE_T_from_str_port(&(argH->node), argH->sock->port);
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file)); struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
argS->fds=fds; argS->fds=fds;
@@ -286,8 +274,8 @@ void* y_socket_handler_(void *arg){
push_tasQ(argw->argx->tasQ, task_send); push_tasQ(argw->argx->tasQ, task_send);
} }
}else if(strncmp(buf+7,"add node",8)==0){ }else if(strncmp(buf+7,"add node",8)==0){
if(set_addr_y_NODE_T(&(argH->node), buf + 16)){ if(len_buf >16 && set_addr_y_NODE_T_from_str_addr(&(argH->node), buf + 16)){
set_str_port_y_NODE_T(&(argH->node), argH->sock->port); set_port_y_NODE_T_from_str_port(&(argH->node), argH->sock->port);
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file)); struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
argS->fds=fds; argS->fds=fds;
@@ -307,19 +295,25 @@ void* y_socket_handler_(void *arg){
} }
} }
else if(strncmp(buf, "post", 4)==0){ else if(len_buf > 5 && strncmp(buf, "post", 4)==0){
if(strncmp(buf+5,"file",4)==0){ if(strncmp(buf+5,"file",4)==0){
if(len_buf > 10){
char *filename = buf+10; char *filename = buf+10;
receve_from_node(fds, argH->m_head_l_t, m_str,argH->node, filename ); receve_from_node(fds, argH->m_head_l_t, argH->m_var, m_str,argH->node, filename );
m_str = NULL; m_str = NULL;
}
}else if(strncmp(buf+5,"ok",2)==0){ }else if(strncmp(buf+5,"ok",2)==0){
if(len_buf>8){
char *nameid = buf+8; char *nameid = buf+8;
y_append_to_ok_header_l_(argH->m_ok_head_l_t,nameid ); y_append_to_ok_header_l_(argH->m_ok_head_l_t,nameid );
}
}else if(strncmp(buf+5,"var",3)==0){ }else if(strncmp(buf+5,"var",3)==0){
if(len_buf>9){
char *var_nameid = buf+9; char *var_nameid = buf+9;
receve_from_node(fds, argH->m_head_l_t, m_str,argH->node, var_nameid ); receve_from_node(fds, argH->m_head_l_t, argH->m_var, m_str,argH->node, var_nameid );
m_str = NULL; m_str = NULL;
} }
}
} }
@@ -338,7 +332,7 @@ void* y_socket_handler_(void *arg){
} }
void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){ void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_VARIABLE *m_var,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){
struct y_socket_t * argSock = (struct y_socket_t*)arg; struct y_socket_t * argSock = (struct y_socket_t*)arg;
struct pollfd *fds = argSock->fds; struct pollfd *fds = argSock->fds;
@@ -363,6 +357,7 @@ void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_ok_head_l_t, struc
else{ else{
struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_)); struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_));
ptr_argHandl->m_str = m_str; ptr_argHandl->m_str = m_str;
ptr_argHandl->m_var = m_var;
ptr_argHandl->fds=fds; ptr_argHandl->fds=fds;
ptr_argHandl->sock=argSock; ptr_argHandl->sock=argSock;
ptr_argHandl->node=node; ptr_argHandl->node=node;
@@ -422,6 +417,7 @@ void *y_socket_poll_fds(void *arg){
struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING(); struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING();
struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T(); struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T();
struct main_list_y_ptr_HEADER_T *m_ok_head_l_t = create_var_list_y_ptr_HEADER_T(); struct main_list_y_ptr_HEADER_T *m_ok_head_l_t = create_var_list_y_ptr_HEADER_T();
struct main_list_y_ptr_VARIABLE *m_var = create_var_list_y_ptr_VARIABLE();
// char *temp_all_buf=NULL; // char *temp_all_buf=NULL;
@@ -429,9 +425,10 @@ void *y_socket_poll_fds(void *arg){
// int len_msgRet; // int len_msgRet;
// I had to initialize all attribute of addr to avoid error uninitialized value with valgrind, for example "sin6_flowinfo" in sockaddr_in6 // I had to initialize all attribute of addr to avoid error uninitialized value with valgrind, for example "sin6_flowinfo" in sockaddr_in6
memset(&(node.addr), 0, sizeof(struct sockaddr_storage)); memset(&(node.addr), 0, sizeof(struct sockaddr_storage));
size_t len_sockaddr_storage = sizeof(struct sockaddr_storage); //size_t len_sockaddr_storage = sizeof(struct sockaddr_storage);
node.addr_len = len_sockaddr_storage; // sizeof(struct sockaddr_storage); //node.addr_len = len_sockaddr_storage; // sizeof(struct sockaddr_storage);
node.addr_len = 0;/* init here to be sure it will have the appropriate value */
// printf("debug: ------ //// node.addr_len = %d\n",node.addr_len);
for(;check_y_socket_go_on(argSock);){ for(;check_y_socket_go_on(argSock);){
printf("poll: wait events\n"); printf("poll: wait events\n");
status = poll(fds, nbIpVersion + 1, -1); status = poll(fds, nbIpVersion + 1, -1);
@@ -451,6 +448,7 @@ void *y_socket_poll_fds(void *arg){
while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0, while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0,
(struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){ (struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){
// printf("debug: ------ //RCVFR// node.addr_len = %d\n",node.addr_len);
//if(buf[nread-1]=='\n') buf[nread-1]='\0'; //if(buf[nread-1]=='\n') buf[nread-1]='\0';
buf[nread]='\0'; buf[nread]='\0';
@@ -478,9 +476,9 @@ void *y_socket_poll_fds(void *arg){
/// ///
} }
if(argSock->go_on && m_str){ if(check_y_socket_go_on(argSock) && m_str){
printf("debug: call handle_buf_socket_rec\n"); printf("debug: call handle_buf_socket_rec\n");
handle_buf_socket_rec(m_ok_head_l_t,m_head_l_t,m_str, node, workers, argx, list_arg, arg); handle_buf_socket_rec(m_ok_head_l_t,m_head_l_t, m_var, m_str, node, workers, argx, list_arg, arg);
m_str=NULL; m_str=NULL;
} }
@@ -527,7 +525,7 @@ void *y_socket_poll_fds(void *arg){
#endif #endif
if(strncmp(cmd, "sendto", 6)==0){ if(check_y_socket_go_on(argSock) && strncmp(cmd, "sendto", 6)==0){
printf("debug : sendto match, dst_addr=[%s]\n", dst_addr); printf("debug : sendto match, dst_addr=[%s]\n", dst_addr);
if(strcmp(dst_addr, "all" ) == 0){ if(strcmp(dst_addr, "all" ) == 0){
struct arg_send_file *argS = malloc(sizeof(struct arg_send_file)); struct arg_send_file *argS = malloc(sizeof(struct arg_send_file));
@@ -550,9 +548,9 @@ void *y_socket_poll_fds(void *arg){
} }
else if(set_addr_y_NODE_T(&node, dst_addr)){ else if(set_addr_y_NODE_T_from_str_addr(&node, dst_addr)){
printf("debug : set_addr_y_NODE_T done\n"); printf("debug : set_addr_y_NODE_T done\n");
set_str_port_y_NODE_T(&node, argSock->port); set_port_y_NODE_T_from_str_port(&node, argSock->port);
update_nodes(node, argSock->nodes); update_nodes(node, argSock->nodes);
af=(node.addr.ss_family == AF_INET6); af=(node.addr.ss_family == AF_INET6);
@@ -560,8 +558,9 @@ void *y_socket_poll_fds(void *arg){
printf("debug : af = AF_INET=%d, af = AF_INET6=%d, vs af=[%d]\n",AF_INET, AF_INET6, af); printf("debug : af = AF_INET=%d, af = AF_INET6=%d, vs af=[%d]\n",AF_INET, AF_INET6, af);
//node.addr_len = sizeof(struct sockaddr_storage); //node.addr_len = sizeof(struct sockaddr_storage);
//node.addr_len = sizeof(node.addr);
if(sendto(fds[af].fd, buf+index_buf , buf_len-index_buf, 0, if(sendto(fds[af].fd, buf+index_buf , buf_len-index_buf, 0,
(struct sockaddr*)(&(node.addr)), len_sockaddr_storage //node.addr_len (struct sockaddr*)(&(node.addr)), node.addr_len
) == -1){ ) == -1){
printf("message erreur sendto : %s, error :%d\n\n",buf,errno); printf("message erreur sendto : %s, error :%d\n\n",buf,errno);
perror("sendto:"); perror("sendto:");
@@ -598,6 +597,12 @@ void *y_socket_poll_fds(void *arg){
printf("debug: m_str!=NULL -> purge_ptr_type_list_y_ptr_STRING done\n"); printf("debug: m_str!=NULL -> purge_ptr_type_list_y_ptr_STRING done\n");
} }
if(m_var){
purge_ptr_type_list_y_ptr_VARIABLE(m_var);
printf("debug: m_var!=NULL -> purge_ptr_type_list_y_ptr_VARIABLE done\n");
}
/* /*
if(temp_all_buf){ if(temp_all_buf){
free(temp_all_buf); free(temp_all_buf);
+1 -1
View File
@@ -36,7 +36,7 @@ YFILEHANDLSRC_O=$(YFILEHANDLSRC:.c=.o)
YNODESRC=$(ROOT_DIR)/src/y_socket_t/y_node_t.c YNODESRC=$(ROOT_DIR)/src/y_socket_t/y_node_t.c
YNODESRC_O=$(YNODESRC:.c=.o) YNODESRC_O=$(YNODESRC:.c=.o)
YY_STRINGSRC=$(ROOT_DIR)/src/y_socket_t/y_list_string.c YY_STRINGSRC=$(ROOT_DIR)/src/y_socket_t/y_list_var_tool.c
YY_STRINGSRC_O=$(YY_STRINGSRC:.c=.o) YY_STRINGSRC_O=$(YY_STRINGSRC:.c=.o)
YLISTSRC=$(YLISTDIR)/src/list_t/list_t.c YLISTSRC=$(YLISTDIR)/src/list_t/list_t.c
+27 -10
View File
@@ -62,8 +62,8 @@ TEST(equalNode){
//((struct sockaddr_in*)(&(nA.addr)))->sin_port = 22; //((struct sockaddr_in*)(&(nA.addr)))->sin_port = 22;
//((struct sockaddr_in*)(&(nB.addr)))->sin_port = 22; //((struct sockaddr_in*)(&(nB.addr)))->sin_port = 22;
set_port_y_NODE_T(&nA, 22); set_port_y_NODE_T_from_int_port(&nA, 22);
set_port_y_NODE_T(&nB, 22); set_port_y_NODE_T_from_int_port(&nB, 22);
//((struct sockaddr_in*)&(nA.addr))->sin_addr.s_addr = inet_addr("192.168.1.2"); //((struct sockaddr_in*)&(nA.addr))->sin_addr.s_addr = inet_addr("192.168.1.2");
//((struct sockaddr_in*)&(nB.addr))->sin_addr.s_addr = inet_addr("192.168.1.2"); //((struct sockaddr_in*)&(nB.addr))->sin_addr.s_addr = inet_addr("192.168.1.2");
@@ -87,8 +87,8 @@ TEST(equalNode6){
nA.addr.ss_family=AF_INET6; nA.addr.ss_family=AF_INET6;
nB.addr.ss_family=AF_INET6; nB.addr.ss_family=AF_INET6;
set_port_y_NODE_T(&nA, 22); set_port_y_NODE_T_from_int_port(&nA, 22);
set_port_y_NODE_T(&nB, 22); set_port_y_NODE_T_from_int_port(&nB, 22);
//((struct sockaddr_in6*)(&(nA.addr)))->sin6_port = 22; //((struct sockaddr_in6*)(&(nA.addr)))->sin6_port = 22;
//((struct sockaddr_in6*)(&(nB.addr)))->sin6_port = 22; //((struct sockaddr_in6*)(&(nB.addr)))->sin6_port = 22;
@@ -116,8 +116,8 @@ TEST(searchNode){
nA.addr.ss_family=AF_INET; nA.addr.ss_family=AF_INET;
nB.addr.ss_family=AF_INET; nB.addr.ss_family=AF_INET;
set_port_y_NODE_T(&nA, 22); set_port_y_NODE_T_from_int_port(&nA, 22);
set_port_y_NODE_T(&nB, 22); set_port_y_NODE_T_from_int_port(&nB, 22);
//((struct sockaddr_in*)(&(nA.addr)))->sin_port = 22; //((struct sockaddr_in*)(&(nA.addr)))->sin_port = 22;
//((struct sockaddr_in*)(&(nB.addr)))->sin_port = 22; //((struct sockaddr_in*)(&(nB.addr)))->sin_port = 22;
@@ -137,7 +137,7 @@ TEST(searchNode){
push_back_list_y_NODE_T(listNodes, nA); push_back_list_y_NODE_T(listNodes, nA);
//GET_IN_type_ADDR(&(nB.addr),) = inet_addr("0.1.1.1"); //GET_IN_type_ADDR(&(nB.addr),) = inet_addr("0.1.1.1");
int ret = set_addr_y_NODE_T(&nB, "0.1.1.1"); int ret = set_addr_y_NODE_T_from_str_addr(&nB, "0.1.1.1");
LOG("return of set =%d\n", ret); LOG("return of set =%d\n", ret);
EXPECT_TRUE(NULL == search_node_in_list_y_NODE_T(listNodes, nB)); EXPECT_TRUE(NULL == search_node_in_list_y_NODE_T(listNodes, nB));
@@ -147,13 +147,13 @@ TEST(searchNode){
//inet_pton(AF_INET6, "::1", GET_IN_type_ADDR(&(nB.addr),6)); //inet_pton(AF_INET6, "::1", GET_IN_type_ADDR(&(nB.addr),6));
//((struct sockaddr_in6*)(&(nB.addr)))->sin6_port = 22; //((struct sockaddr_in6*)(&(nB.addr)))->sin6_port = 22;
ret = set_addr_y_NODE_T(&nB, "::1"); ret = set_addr_y_NODE_T_from_str_addr(&nB, "::1");
set_port_y_NODE_T(&nB, 22); set_port_y_NODE_T_from_int_port(&nB, 22);
LOG("return of set =%d\n", ret); LOG("return of set =%d\n", ret);
push_back_list_y_NODE_T(listNodes, nB); push_back_list_y_NODE_T(listNodes, nB);
//inet_pton(AF_INET6, "::", GET_IN_type_ADDR(&(nA.addr),6)); //inet_pton(AF_INET6, "::", GET_IN_type_ADDR(&(nA.addr),6));
ret = set_addr_y_NODE_T(&nB, "::"); ret = set_addr_y_NODE_T_from_str_addr(&nB, "::");
LOG("return of set =%d\n", ret); LOG("return of set =%d\n", ret);
EXPECT_FALSE(NULL == search_node_in_list_y_NODE_T(listNodes, nA)); EXPECT_FALSE(NULL == search_node_in_list_y_NODE_T(listNodes, nA));
@@ -163,6 +163,23 @@ TEST(searchNode){
} }
TEST(import_nodes){
struct main_list_y_NODE_T * listNodes = create_var_list_y_NODE_T();
char *file_nodes_name = "FILE_NODES";
if(import_nodes_from_file(file_nodes_name, 1600, listNodes)==-1){
LOG("something wrong check file %s\n",file_nodes_name);
}
export_nodes_to_file("CPY_file_nodes_name", listNodes);
free_all_var_list_y_NODE_T(listNodes);
}
TEST(pollThread){ TEST(pollThread){
struct y_socket_t *argS=y_socket_create("1600", 2, 3); struct y_socket_t *argS=y_socket_create("1600", 2, 3);
+1 -1
View File
@@ -12,7 +12,7 @@ LISTSRC=$(LISTDIR)/src/list_t/list_t.c
LISTSRC_O=$(LISTSRC:.c=.o) LISTSRC_O=$(LISTSRC:.c=.o)
YSOCKETDIR=$(PWD)/../y_socket_t/ YSOCKETDIR=$(PWD)/../y_socket_t/
YSTRINGLISTSRC=$(YSOCKETDIR)/src/y_socket_t/y_list_string.c YSTRINGLISTSRC=$(YSOCKETDIR)/src/y_socket_t/y_list_var_tool.c
YSTRINGLISTSRC_O=$(YSOCKETSRC:.c=.o) YSTRINGLISTSRC_O=$(YSOCKETSRC:.c=.o)
#SETTSRC=src/set_theoric_t/set_theoric_t.c #SETTSRC=src/set_theoric_t/set_theoric_t.c
+1 -1
View File
@@ -7,7 +7,7 @@
#include <stdbool.h> #include <stdbool.h>
#include "tools_t/tools_t.h" #include "tools_t/tools_t.h"
#include "y_socket_t/y_list_string.h" #include "y_socket_t/y_list_var_tool.h"
#define ITERATOR__(type) \ #define ITERATOR__(type) \
struct iterator_##type {\ struct iterator_##type {\
+1 -1
View File
@@ -28,7 +28,7 @@ LISTSRC_O=$(LISTDIR)/src/list_t/list_t.o
JSONDIR=$(PWD)/.. JSONDIR=$(PWD)/..
JSONSRC=$(JSONDIR)/src/json_t/json_t.c JSONSRC=$(JSONDIR)/src/json_t/json_t.c
JSONSRC_O=$(JSONDIR)/src/json_t/json_t.o JSONSRC_O=$(JSONDIR)/src/json_t/json_t.o
YSTRINGLIST_O=$(YSOCKETDIR)/src/y_socket_t/y_list_string.o YSTRINGLIST_O=$(YSOCKETDIR)/src/y_socket_t/y_list_var_tool.o
TOPTARGETS := all clean TOPTARGETS := all clean