y_socket: add header(json_t) & content list, send and receive functions

This commit is contained in:
2025-10-04 00:02:03 +02:00
parent ec8cc86d8e
commit 61103dd60e
8 changed files with 824 additions and 208 deletions
+2 -2
View File
@@ -59,7 +59,7 @@ $(PROJECT_LIB): $(OBJ)
$(CC) -shared -o $@ $^ $(CFLAGS)
$(YSOCKSRC_O): $(YSOCKSRC) $(YNODESRC_O) $(YFILEHANDLERSRC_O)
$(YSOCKSRC_O): $(YSOCKSRC) $(YNODESRC_O) $(YFILEHANDLERSRC_O) $(WORKSRC_0) $(YTASKSRC_0) $(YJSONSRC_O)
$(CC) -o $@ -c $< $(CFLAGS)
$(YNODESRC_O): $(YNODESRC) $(YLISTSRC_O)
@@ -68,7 +68,7 @@ $(YNODESRC_O): $(YNODESRC) $(YLISTSRC_O)
$(YY_STRINGSRC_O): $(YY_STRINGSRC) $(YLISTSRC_O)
$(CC) -o $@ -c $< $(CFLAGS)
$(YFILEHANDLERSRC_O): $(YFILEHANDLERSRC) $(YSOCKSRC_O)
$(YFILEHANDLERSRC_O): $(YFILEHANDLERSRC) $(YSOCKSRC_O) $(YNODESRC_O) $(WORKSRC_0) $(YTASKSRC_0) $(YY_STRINGSRC_O) $(YJSONSRC_O)
$(CC) -o $@ -c $< $(CFLAGS)
$(DEPS):
+55 -1
View File
@@ -1,7 +1,17 @@
/*file: include/y_socket_t/y_file_handler.h */
#ifndef Y_FILE_HANDLER_T_H__C
#define Y_FILE_HANDLER_T_H__C
#include "y_socket_t/y_socket_t.h"
#include "y_socket_t/y_node_t.h"
#include "y_socket_t/y_list_string.h"
#include "y_worker_t/y_worker_t.h"
#include "y_worker_t/y_task_t.h"
#include "json_t/json_t.h"
void fileNameDateScore(char* filename, char * pre, char* post,size_t score);
@@ -12,7 +22,51 @@ struct arg_send_file{
};
void* y_socket_send_file_for_all_nodes(void* arg);
void receve_from_node(struct pollfd *fds, char *msg, size_t count);
enum cmd_type {
cmd_update_kill,
cmd_update_standby,
cmd_update_wakeup,
cmd_post_file,
cmd_post_var,
cmd_get_file,
cmd_get_var,
};
typedef struct msg_content_t {
enum cmd_type cmd_t;
size_t seq;
char eof;
size_t size_content;
char *content;
size_t size_nameid;
char * nameid;/* containerid: filename_src_dst_tm */
} y_MSG_CONTENT_T;
typedef struct msg_content_t * y_ptr_MSG_CONTENT_T;
GENERATE_LIST_ALL(y_ptr_MSG_CONTENT_T)
GEN_HEAD_PTR_LIST(y_ptr_MSG_CONTENT_T)
typedef struct header_t {
enum cmd_type cmd_t;
// size_t seq;
char eof;
// void *content;
size_t size_nameid;
char * nameid;/* containerid: filename_src_dst_tm */
struct main_list_y_ptr_MSG_CONTENT_T * m_content_l;
} y_HEADER_T;
typedef struct header_t * y_ptr_HEADER_T;
GENERATE_LIST_ALL(y_ptr_HEADER_T)
GEN_HEAD_PTR_LIST(y_ptr_HEADER_T)
size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node);
//void receve_from_node(struct pollfd *fds, char *msg, size_t count);
void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename);
#endif /*Y_FILE_HANDLER_T_H__C*/
+4 -1
View File
@@ -9,6 +9,8 @@
#include <fcntl.h>
#include <string.h>
#include <signal.h>
//#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
@@ -46,6 +48,7 @@ enum ipVersions{
extern const int af_array[nbIpVersion];//={AF_INET, AF_INET6};
/* y_ptr_STRING */
#if 0
struct y_string{
char * buf;
@@ -68,7 +71,7 @@ struct y_socket_t{
size_t size_fds;
char * port;
struct main_list_y_NODE_T *nodes;
pthread_mutex_t *mut_nodes;
// pthread_mutex_t *mut_nodes;
int go_on;
pthread_mutex_t *mut_go_on;
int nb_workers;
+517 -124
View File
@@ -4,7 +4,357 @@
//#include "y_socket_t/y_node_t.h"
GEN_LIST_ALL(y_ptr_MSG_CONTENT_T)
GEN_FUNC_PTR_LIST_FREE(y_ptr_MSG_CONTENT_T){
free(arg->content);
free(arg->nameid);
free(arg);
}
GEN_LIST_ALL(y_ptr_HEADER_T)
GEN_FUNC_PTR_LIST_FREE(y_ptr_HEADER_T){
// free(arg->content);
free(arg->nameid);
purge_ptr_type_list_y_ptr_MSG_CONTENT_T(arg->m_content_l);
free(arg);
}
y_ptr_MSG_CONTENT_T create_y_ptr_MSG_CONTENT_T(char *nameid, size_t size_nameid, char* content, size_t size_content,
enum cmd_type cmd_t,
size_t seq,
char eof
){
y_ptr_MSG_CONTENT_T new_p_content = malloc(sizeof(struct msg_content_t));
new_p_content->size_content = size_content;
new_p_content->content = malloc(size_content+1);
memcpy(new_p_content->content, content, size_content);
new_p_content->content[size_content]='\0';
new_p_content->size_nameid = size_nameid;
new_p_content->nameid = malloc(size_nameid+1);
memcpy(new_p_content->nameid, nameid, size_nameid);
new_p_content->nameid[size_nameid]='\0';
new_p_content->cmd_t = cmd_t;
new_p_content->seq = seq;
new_p_content->eof = eof;
return new_p_content;
}
y_ptr_HEADER_T create_y_ptr_HEADER_T(char *nameid, size_t size_nameid, enum cmd_type cmd_t ){
y_ptr_HEADER_T new_header_t = malloc(sizeof(struct header_t));
new_header_t->cmd_t = cmd_t;
new_header_t->size_nameid = size_nameid;
new_header_t->nameid = malloc(size_nameid+1);
memcpy(new_header_t->nameid, nameid, size_nameid);
new_header_t->nameid[size_nameid]='\0';
new_header_t->m_content_l = create_var_list_y_ptr_MSG_CONTENT_T();
return new_header_t;
}
int funcCmp_y_ptr_HEADER_T(y_ptr_HEADER_T h1, y_ptr_HEADER_T h2){
if(h1==NULL || h2==NULL) return -1;
if(h1->size_nameid == h2->size_nameid){
if(h1->cmd_t == h2->cmd_t){
return strcmp(h1->nameid, h2->nameid);
}else return (h1->cmd_t - h2->cmd_t);
}else if(h1->size_nameid < h2->size_nameid){
return -1;
}else return 1;
}
#define TEST_DUPLICATE_SEQ()\
if(temp_curr->value->seq == cnt->seq){\
printf("debug: index_nearest_seq=%ld: seq equal: doublon problem ? seq:%ld appuyer sur une touche\n", index_nearest_seq, cnt->seq);\
free_y_ptr_MSG_CONTENT_T(cnt); \
getchar();\
return -2;\
}
long y_append_content_to_header_l(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t);
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid);
if(l_ocate_header){
free_y_ptr_HEADER_T(current_header);
pthread_mutex_t *mut_m_content_l = l_ocate_header->value->m_content_l->mut_list;
pthread_mutex_lock(mut_m_content_l);
struct list_y_ptr_MSG_CONTENT_T * current_cnt = l_ocate_header->value->m_content_l->current_list;
struct list_y_ptr_MSG_CONTENT_T * end_cnt = l_ocate_header->value->m_content_l->end_list;
struct list_y_ptr_MSG_CONTENT_T * begin_cnt = l_ocate_header->value->m_content_l->begin_list;
if(begin_cnt == NULL){
pthread_mutex_unlock(mut_m_content_l);
printf("debug: current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq);
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
printf("debug: after current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq);
return 0;
}
long last_seq = end_cnt->value->seq;
if(cnt->seq > last_seq){
pthread_mutex_unlock(mut_m_content_l);
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
struct list_y_ptr_MSG_CONTENT_T *temp_curr = NULL;
long from_current_seq = current_cnt->value->seq - cnt->seq; \
size_t abs_cur_diff_seq = abs(from_current_seq); \
size_t array_diff_seq[3] = {cnt->seq - begin_cnt->value->seq, abs_cur_diff_seq , end_cnt->value->seq - cnt->seq}; \
size_t index_nearest_seq = ARG_MIN_ARRAY_TYPE_SIZE_T(array_diff_seq, 3);\
if(index_nearest_seq == 0){\
for(temp_curr = begin_cnt ;temp_curr && (temp_curr->value->seq < cnt->seq); temp_curr = temp_curr->next){}
if(temp_curr){
l_ocate_header->value->m_content_l->current_list = temp_curr;
pthread_mutex_unlock(mut_m_content_l);
TEST_DUPLICATE_SEQ()
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index, cnt);
}else{
pthread_mutex_unlock(mut_m_content_l);
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
}\
else if(index_nearest_seq == 2){
for(temp_curr = end_cnt; temp_curr && temp_curr->value->seq > cnt->seq; temp_curr = temp_curr->preview) {}
if(temp_curr){
l_ocate_header->value->m_content_l->current_list = temp_curr;
pthread_mutex_unlock(mut_m_content_l);
TEST_DUPLICATE_SEQ()
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index + 1, cnt);
}else{
pthread_mutex_unlock(mut_m_content_l);
push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
}else if(from_current_seq >= 0) {
for(temp_curr = current_cnt; temp_curr && temp_curr->value->seq > cnt->seq; temp_curr = temp_curr->preview) {}
if(temp_curr){
l_ocate_header->value->m_content_l->current_list = temp_curr;
pthread_mutex_unlock(mut_m_content_l);
TEST_DUPLICATE_SEQ()
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index + 1, cnt);
}else{
pthread_mutex_unlock(mut_m_content_l);
push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
}else{
for(temp_curr = current_cnt ;temp_curr && (temp_curr->value->seq < cnt->seq); temp_curr = temp_curr->next){}
if(temp_curr){
l_ocate_header->value->m_content_l->current_list = temp_curr;
pthread_mutex_unlock(mut_m_content_l);
TEST_DUPLICATE_SEQ()
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, temp_curr->index, cnt);
}else{
pthread_mutex_unlock(mut_m_content_l);
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
}
#if 0
// cnt->seq < last_seq
while(current_cnt){
printf("debug: last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq);
if(cnt->seq == current_cnt->value->seq){
printf("debug: equal last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq);
//return -2;
}
if(cnt->seq < last_seq && cnt->seq > current_cnt->value->seq){
pthread_mutex_unlock(mut_m_content_l);
/*if(cnt->seq > last_seq && cnt->seq < current_cnt->value->seq)*/
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, current_cnt->index + 1, cnt);
}else if(current_cnt->preview==NULL){
pthread_mutex_unlock(mut_m_content_l);
push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
last_seq = current_cnt->value->seq;
current_cnt = current_cnt->preview;
}
printf("debug: some thing wrong here! last_seq: %ld, cnt->seq=%ld\n", last_seq, cnt->seq);
return -1;
#endif
}
else{
push_back_list_y_ptr_MSG_CONTENT_T(current_header->m_content_l, cnt);
push_back_list_y_ptr_HEADER_T(m_head_l_t, current_header);
printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_head_l_t->size);
return 0;
}
}
long y_append_content_to_header_l_from_end(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t);
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid);
if(l_ocate_header){
free_y_ptr_HEADER_T(current_header);
struct list_y_ptr_MSG_CONTENT_T * current_cnt = l_ocate_header->value->m_content_l->end_list;
if(current_cnt == NULL){
printf("debug: current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq);
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
printf("debug: after current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq);
return 0;
}
long last_seq = current_cnt->value->seq + 1;
if(cnt->seq >= last_seq){
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
// cnt->seq < last_seq
while(current_cnt){
printf("debug: last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq);
if(cnt->seq == current_cnt->value->seq){
printf("debug: equal last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq);
//return -2;
}
if(cnt->seq < last_seq && cnt->seq > current_cnt->value->seq){
/*if(cnt->seq > last_seq && cnt->seq < current_cnt->value->seq)*/
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, current_cnt->index + 1, cnt);
}else if(current_cnt->preview==NULL){
push_front_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
last_seq = current_cnt->value->seq;
current_cnt = current_cnt->preview;
}
printf("debug: some thing wrong here! last_seq: %ld, cnt->seq=%ld\n", last_seq, cnt->seq);
return -1;
}
else{
push_back_list_y_ptr_MSG_CONTENT_T(current_header->m_content_l, cnt);
push_back_list_y_ptr_HEADER_T(m_head_l_t, current_header);
printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_head_l_t->size);
return 0;
}
}
long y_append_content_to_header_l_from_begin(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t);
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T(m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
printf("debug: search done, nameid:%s, #%ld\n",cnt->nameid, cnt->size_nameid);
if(l_ocate_header){
free_y_ptr_HEADER_T(current_header);
struct list_y_ptr_MSG_CONTENT_T * current_cnt = l_ocate_header->value->m_content_l->begin_list;
if(current_cnt == NULL){
printf("debug: current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq);
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
printf("debug: after current_cnt==NULL, size_ m_content_l=%ld cnt->seq=%ld, push_back_list_y_ptr_MSG_CONTENT_T\n", l_ocate_header->value->m_content_l->size, cnt->seq);
return 0;
}
long last_seq = -1;
while(current_cnt){
printf("debug: last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq);
if(cnt->seq == current_cnt->value->seq){
printf("debug: equal last_seq: %ld, cnt->seq=%ld, current_cnt->value->seq:%ld\n", last_seq, cnt->seq,current_cnt->value->seq);
//return -2;
}
if(cnt->seq > last_seq && cnt->seq < current_cnt->value->seq){
return insert_into_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, current_cnt->index, cnt);
}else if(current_cnt->next==NULL){
push_back_list_y_ptr_MSG_CONTENT_T(l_ocate_header->value->m_content_l, cnt);
return 0;
}
last_seq = current_cnt->value->seq;
current_cnt = current_cnt->next;
}
printf("debug: some thing wrong here! last_seq: %ld, cnt->seq=%ld\n", last_seq, cnt->seq);
return -1;
}
else{
push_back_list_y_ptr_MSG_CONTENT_T(current_header->m_content_l, cnt);
push_back_list_y_ptr_HEADER_T(m_head_l_t, current_header);
printf("debug: push_back_list_y_ptr_HEADER_T when l_ocate_header == NULL, m_head_l_t->size=%ld\n",m_head_l_t->size);
return 0;
}
}
struct list_y_ptr_HEADER_T * check_if_all_contents_done_from_headers(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t);
printf("debug: check_if_all_contents_done_from_headers, begin search\n");
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T (m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
printf("debug: check_if_all_contents_done_from_headers, search done\n");
free_y_ptr_HEADER_T(current_header);
if(l_ocate_header){
printf("debug: check_if_all_contents_done_from_headers, l_ocate_header->index=%ld\n",l_ocate_header->index);
if(l_ocate_header->value->m_content_l){
struct list_y_ptr_MSG_CONTENT_T *end_list = l_ocate_header->value->m_content_l->end_list;
printf("debug: check_if_all_contents_done_from_headers, l_ocate_header->m_content_l->size=%ld,\n",l_ocate_header->value->m_content_l->size);
if(end_list){
printf("debug: check_if_all_contents_done_from_headers, end->eof=%d, end->seq=%ld, end->index = %ld\n",end_list->value->eof, end_list->value->seq, end_list->index);
// check if all contents are done!
if(end_list->value->eof && end_list->value->seq == end_list->index){
return l_ocate_header;
}
}else{
printf("debug: check_if_all_contents_done_from_headers, end_list==NULL\n");
}
}else{
printf("debug: check_if_all_contents_done_from_headers, l_ocate_header->value->m_content_l==NULL\n");
}
}
return NULL;
}
int remove_content_from_headers(struct main_list_y_ptr_HEADER_T *m_head_l_t, y_ptr_MSG_CONTENT_T cnt){
y_ptr_HEADER_T current_header = create_y_ptr_HEADER_T(cnt->nameid, cnt->size_nameid, cnt->cmd_t);
struct list_y_ptr_HEADER_T * l_ocate_header = search_first_occ_from_begin_in_list_y_ptr_HEADER_T (m_head_l_t, current_header, funcCmp_y_ptr_HEADER_T);
free_y_ptr_HEADER_T(current_header);
if(l_ocate_header){
struct list_y_ptr_MSG_CONTENT_T *end_list = l_ocate_header->value->m_content_l->end_list;
// check if all contents are done!
if(end_list->value->eof && end_list->value->seq == end_list->index){ //l_ocate_header->value->m_content_l->size - 1
struct list_y_ptr_HEADER_T *current_list = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, l_ocate_header->index);
if(current_list == l_ocate_header){
free_y_ptr_HEADER_T(l_ocate_header->value);
free(l_ocate_header);
}
return 0;
}
else{
printf("some thing wrong here: EOF:%d seq_end:%ld, size m_content_l :%ld\n",end_list->value->eof, end_list->value->seq, l_ocate_header->value->m_content_l->size);
return 1;
}
}
else{
printf("\n%s is not in header list\n",cnt->nameid);
return -1;
}
}
void fileNameDateScore(char* filename, char * pre, char* post,size_t score){
// char *filename=malloc(256);
@@ -15,6 +365,16 @@ void fileNameDateScore(char* filename, char * pre, char* post,size_t score){
//return filename;
}
char * time_id(){
// char *filename=malloc(256);
char *timeid=malloc(128);
time_t t = time(NULL);
struct tm tm = *localtime(&t);
sprintf(timeid,"%d%02d%02d%02d%02d%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
return timeid;
//return filename;
}
#if 0
struct arg_send_file{
@@ -24,68 +384,26 @@ struct arg_send_file{
};
#endif
/* */
void y_send_post_file_to_all_nodes(void *arg){
struct arg_send_file *argS=(struct arg_send_file*)arg;
struct pollfd *fds=argS->fds;
struct main_list_y_NODE_T *nodes=argS->nodes;
char * filename=argS->filename;
#if TEMP_ADDR
char tempAddr[BUF_SIZE+1];
#endif
int c_af;
// char host[NI_MAXHOST], service[NI_MAXSERV];
char buf_send[BUF_SIZE+1]={0};
int fd_file;
int retsprintf = snprintf(buf_send, 50,"post file %s", filename );
printf("debug: buf_send=%s, size=%d\n",buf_send, retsprintf);
for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){
//memset(tempAddr, 0, BUF_SIZE+1);
c_af=(local_list_current->value).addr.ss_family;
#if TEMP_ADDR
if(c_af==AF_INET){
size_t set_tempAddr_from_node(char *tempAddr, y_NODE_T node) {
int c_af=(node).addr.ss_family;
if(c_af==AF_INET){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(local_list_current->value),)),
&(GET_IN_type_ADDR(&(node),)),
tempAddr, BUF_SIZE/*(argSock->local_list_current->value).addr_len*/)){
fprintf(stderr, "error inet_ntop v4\n");
}
}else if(c_af==AF_INET6){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(local_list_current->value),6)),
&(GET_IN_type_ADDR(&(node),6)),
tempAddr, BUF_SIZE /*(argSock->local_list_current->value).addr_len*/)){
fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno);
}
}
#endif
#if 0
off_t offset = 0;
ssize_t ret_sendfile ;
while((ret_sendfile = sendfile(fds[(c_af==AF_INET6)].fd ,fd_file, &offset, BUF_SIZE))>0){
size_t ret_len = strlen(tempAddr);
return ret_len;
}
}
#endif
/// printf("debug: destination %s :\n",tempAddr);
if(sendto(fds[(c_af==AF_INET6)].fd,
buf_send, retsprintf,
/*msgRet, len_msgRet,*/
0,
(struct sockaddr*)&((local_list_current->value).addr),
(local_list_current->value).addr_len) !=
retsprintf
/*len_msgRet*/
){
#if TEMP_ADDR
fprintf(stderr, "Error sending response to %s\n",tempAddr);
#endif
}else{
#if TEMP_ADDR
printf("debug: sending %s to < %s >",buf_send,tempAddr);
#endif
}
}
}
/* */
#define TEMP_ADDR 1
@@ -98,7 +416,7 @@ void* y_socket_send_file_for_all_nodes(void* arg){
struct main_list_y_NODE_T *nodes=argS->nodes;
char * filename=argS->filename;
#if TEMP_ADDR
char tempAddr[BUF_SIZE+1];
char tempAddr[64];
#endif
int c_af;
// char host[NI_MAXHOST], service[NI_MAXSERV];
@@ -117,99 +435,68 @@ void* y_socket_send_file_for_all_nodes(void* arg){
push_back_list_y_NODE_T(nodes, node);
#endif
fd_file = open( filename , O_RDONLY);
size_t seq = 0;//, len_buf_header=0,
size_t len_local_header_=0;
char * timeid = time_id();
for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){
fd_file = open( filename , O_RDONLY);
if(fd_file == -1){
fprintf(stderr,"error opening file |%s| for reading\n",filename);
return NULL;
}
set_tempAddr_from_node(tempAddr, local_list_current->value);
c_af=(local_list_current->value).addr.ss_family;
y_send_post_file_to_all_nodes(arg);
usleep(1);
//for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next )
seq=0;
//memset(buf_send, 0, BUF_SIZE+1);
while((retread = read(fd_file, buf_send, BUF_SIZE) ) > 0 ){
buf_send[retread]='\0';
//memset(msgRet, 0, BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100);
// sprintf(msgRet, "from %s:%s =%s",host, service, buf);
// len_msgRet = strlen(msgRet);
///printf("debug: sending response %s :\n",buf_send);
//FOR_LIST_FORM_BEGIN(y_NODE_T, nodes)
for(struct list_y_NODE_T *local_list_current = nodes->begin_list; local_list_current; local_list_current=local_list_current->next ){
//memset(tempAddr, 0, BUF_SIZE+1);
c_af=(local_list_current->value).addr.ss_family;
#if TEMP_ADDR
if(c_af==AF_INET){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(local_list_current->value),)),
tempAddr, BUF_SIZE/*(argSock->local_list_current->value).addr_len*/)){
fprintf(stderr, "error inet_ntop v4\n");
}
}else if(c_af==AF_INET6){
if(NULL == inet_ntop(c_af,
&(GET_IN_type_ADDR(&(local_list_current->value),6)),
tempAddr, BUF_SIZE /*(argSock->local_list_current->value).addr_len*/)){
fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno);
}
}
#endif
#if 0
off_t offset = 0;
ssize_t ret_sendfile ;
while((ret_sendfile = sendfile(fds[(c_af==AF_INET6)].fd ,fd_file, &offset, BUF_SIZE))>0){
}
#endif
/// printf("debug: destination %s :\n",tempAddr);
#if 1
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid);
while((retread = read(fd_file, buf_send+len_local_header_, BUF_SIZE - len_local_header_) ) > 0 ){
buf_send[len_local_header_ + retread]='\0';
if(sendto(fds[(c_af==AF_INET6)].fd,
buf_send, retread,
/*msgRet, len_msgRet,*/
buf_send, retread+len_local_header_,
0,
(struct sockaddr*)&((local_list_current->value).addr),
(local_list_current->value).addr_len) !=
retread
/*len_msgRet*/
retread + len_local_header_
){
#if TEMP_ADDR
fprintf(stderr, "Error sending response to %s\n",tempAddr);
#endif
}else{
#if TEMP_ADDR
printf("debug: sending response to < %s >",tempAddr);
#endif
printf("debug: sending response to < %s > seq=[%ld] ",tempAddr,seq);
}
}
#endif
#if 0
//memset(buf_send, 0, BUF_SIZE+1);
retread = sprintf(buf_send, "post file %s", filename);
if(sendto(fds[(c_af==AF_INET6)].fd,
buf_send, retread,
/*msgRet, len_msgRet,*/
++seq;
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq,tempAddr, timeid);
}
len_local_header_ = sprintf(buf_send, "{ \"cmd\" : \"post file %s\", \"seq\" : %ld , \"EOF\" : true , \"dst\" : \"%s\" , \"tm\" : \"%s\" }",filename, seq, tempAddr,timeid);
if(sendto(fds[(c_af==AF_INET6)].fd,
buf_send, len_local_header_,
0,
(struct sockaddr*)&((local_list_current->value).addr),
(local_list_current->value).addr_len) !=
retread
/*len_msgRet*/
len_local_header_
){
fprintf(stderr, "Error sending response to %s\n",tempAddr);
}else{
printf("debug: sending response to < %s >",tempAddr);
}
#endif
}
fprintf(stderr, "Error ending response to %s, len_buf_header=%ld\n", tempAddr, len_local_header_);
}else{
printf("debug: ending response to < %s > [%ld] EOF",tempAddr,seq);
}
close(fd_file);
printf("debug: fd=%d closed: filename=%s\n",fd_file,filename);
printf("debug: fd=%d closed: filename=%s, for index = %ld\n",fd_file,filename, local_list_current->index);
}
free(timeid);
return NULL;
}
//main_list_y_ptr_HEADER_T
/*
struct arg_record_to_file{
int *is_file_to_record,
@@ -221,8 +508,115 @@ void record_buffer_to_file(void *arg){
}
*/
void receve_from_node(struct pollfd *fds, char *msg, size_t count){
printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count);
#if 1
void receve_from_node(struct main_list_y_ptr_HEADER_T *m_head_l_t, struct main_list_y_ptr_STRING *m_str, char * srcAddr, char *filename ){
//printf("\ndebug: <<<< receve_from_node %s %ld\n\n",msg,count);
//size_t size_m_str = 0;
struct list_y_ptr_STRING * local_current_no_rec = m_str->begin_list;
struct list_y_ptr_STRING * local_current;
for(local_current = local_current_no_rec; local_current; local_current = local_current->next){
char *buf_loc = local_current->value->buf;
char nameid[BUF_SIZE]="";
size_t size_nameid=0;
struct js_value * js_header_v = create_js_value(buf_loc,NULL);
//struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v );
//printf("debug: index=[%ld] \n BEGIN file ***\n%s\n END\n",local_current->index,buf_loc);
if(js_header_v){
struct js_value *js_seq_v = get_js_value_of_key("seq", js_header_v );
char eof=0;
if(js_seq_v){
if(js_seq_v->type.object.value->code_type == jstype_number){
size_t seq_local = (long)(js_seq_v->type.object.value->type.number);
printf("debug: \n*********seq_local=%ld ***\n\n",seq_local);
struct js_value *js_eof_v = get_js_value_of_key("EOF", js_header_v );
if(js_eof_v){
// size_m_str = seq_local;
eof=1;
printf("debug: \n****************************end of file ***\n\n");
//printf("debug: \n****************************end of file ***\n%s\n**********************************\n",buf_loc);
}
struct js_value *js_dst_v = get_js_value_of_key("dst", js_header_v );
if(js_dst_v){
struct js_value *js_tm_v = get_js_value_of_key("tm", js_header_v );
if(js_tm_v){
size_t length_js_header = js_org_str_length(js_header_v);
char *content = buf_loc+ length_js_header;
size_t size_content = strlen(content);// js_header_v->length - length_js_header;
enum cmd_type cmd_t = cmd_post_file;
size_nameid = sprintf(nameid, "%s_%s_%s_%s",filename,srcAddr, value_of_(js_dst_v)->type.string,value_of_(js_tm_v)->type.string);
printf("debug: nameid = %s\n", nameid);
y_ptr_MSG_CONTENT_T y_msg_cnt=create_y_ptr_MSG_CONTENT_T(nameid, size_nameid, content, size_content, cmd_t, seq_local,eof);
long ret_app = y_append_content_to_header_l(m_head_l_t,y_msg_cnt);
if(ret_app != -2){
struct list_y_ptr_HEADER_T * local_header = check_if_all_contents_done_from_headers(m_head_l_t, y_msg_cnt);
if(local_header){
struct main_list_y_ptr_MSG_CONTENT_T *m_content_l = local_header->value->m_content_l;
struct list_y_ptr_MSG_CONTENT_T * tmpCnt_l = m_content_l->begin_list;
while(tmpCnt_l){
printf("debug: nameid:%s seq = %ld eof %d\n\n%s\n",tmpCnt_l->value->nameid, tmpCnt_l->value->seq, tmpCnt_l->value->eof, tmpCnt_l->value->content);
tmpCnt_l=tmpCnt_l->next;
}
struct list_y_ptr_HEADER_T * l_head_to_remove = pull_index_from_list_y_ptr_HEADER_T(m_head_l_t, local_header->index);
free_y_ptr_HEADER_T(l_head_to_remove->value);
free(l_head_to_remove);
}
}
}else{
printf("debug: tm missing!");
}
}
else{
printf("debug: dst missing!");
}
}
else{
printf("debug: \n SSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ type:%d \n",js_seq_v->type.object.value->code_type);
}
}else{
printf("debug: \n NNNNNNNNNNNNNNNNOOOOOOOOOOOOOSSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ :type header : %d \n",js_header_v->code_type);
}
free_js_value(js_header_v);
}else{
printf("\ndebug NULLL JS___HHHEADER_V \n");
}
}
//if(local_current){
// printf("debug: getchar\n");
// getchar();
//}
//local_current_no_rec = local_current_no_rec->next;
/*if(cur_index == size_m_str){
printf("debug: all seq are received: %ld\n",cur_index);
}
else{
printf("debug: some issue cur_index %ld vs size_m_str :%ld\n",cur_index,size_m_str);
}*/
if(m_str){
purge_ptr_type_list_y_ptr_STRING(m_str);
m_str = NULL;
}
}
#endif
/*
char filename[500];
int fd_file;
long int nread;
@@ -262,6 +656,5 @@ void receve_from_node(struct pollfd *fds, char *msg, size_t count){
}
printf("debug: <receve_from_node> close nread==%ld\n",nread);
close(fd_file);
*/
}
+1 -1
View File
@@ -7,7 +7,7 @@ struct y_string * create_y_ptr_STRING(const char *buf, size_t size){
if(buf){
//strncpy(string->buf, buf, size);
//snprintf(string->buf, size, "%s", buf);
memcpy(string->buf, buf, size);
memcpy(string->buf, buf, size+1);
//if(strlen(buf)>=size)
if(buf[size]!='\0')
string->buf[size]='\0';
+236 -71
View File
@@ -1,6 +1,7 @@
/*file: src/y_socket_t/y_socket_t.c */
#include "y_socket_t/y_socket_t.h"
//#include "y_socket_t/y_list_string.h"
//#include "json_t/json_t.h"
@@ -72,8 +73,8 @@ struct y_socket_t * y_socket_create(char *port, size_t size_fds, int nb_workers)
sock_temp->port=port;
sock_temp->nodes = create_var_list_y_NODE_T();
sock_temp->mut_nodes = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(sock_temp->mut_nodes, NULL);
// sock_temp->mut_nodes = malloc(sizeof(pthread_mutex_t));
// pthread_mutex_init(sock_temp->mut_nodes, NULL);
sock_temp->go_on = 1;
sock_temp->mut_go_on = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(sock_temp->mut_go_on, NULL);
@@ -87,8 +88,8 @@ struct y_socket_t * y_socket_create_(char * port){
void y_socket_free(struct y_socket_t *socket){
free(socket->fds);
free_all_var_list_y_NODE_T(socket->nodes);
pthread_mutex_destroy(socket->mut_nodes);
free(socket->mut_nodes);
// pthread_mutex_destroy(socket->mut_nodes);
// free(socket->mut_nodes);
pthread_mutex_destroy(socket->mut_go_on);
free(socket->mut_go_on);
free(socket);
@@ -303,6 +304,22 @@ void y_socket_get_fds(struct pollfd * fds, char * port, char * addrDistant){
if(bind(fds[af].fd, rp->ai_addr, rp->ai_addrlen)==-1){
close(fds[af].fd);
fds[af].fd=-1;
}else{
char tempAddr[BUF_SIZE]={0};
if(af_array[af]==AF_INET){
if(NULL == inet_ntop(AF_INET,
&(GET_IN_type_ADDR(rp->ai_addr,)),
tempAddr, BUF_SIZE)){
fprintf(stderr, "error inet_ntop v4\n");
}
}else if(af_array[af]==AF_INET6){
if(NULL == inet_ntop(AF_INET6,
&(GET_IN_type_ADDR(rp->ai_addr,6)),
tempAddr, BUF_SIZE )){
fprintf(stderr, "error inet_ntop v6 :errno=%d\n",errno);
}
}
printf("\n\ndebug: ADDR_LOCAL v%d:%s\n\n", 2*af+4,tempAddr);
}
#if 0
int flags = fcntl(fds[af].fd, F_GETFL);
@@ -322,17 +339,19 @@ int flags = fcntl(fds[af].fd, F_GETFL);
}
struct arg_handler_{
char *buf;
struct main_list_y_ptr_STRING *m_str;
//char *buf;
struct pollfd *fds;
y_NODE_T node;
struct y_socket_t *sock;
struct argWorker *argw ;
struct main_list_y_ptr_HEADER_T *m_head_l_t;
};
//void y_socket_handler_(char * buf, struct pollfd *fds, struct y_socket_t *sock)
void* y_socket_handler_(void *arg){
struct arg_handler_ *argH = (struct arg_handler_ *)arg;
char *buf=argH->buf;
struct main_list_y_ptr_STRING *m_str =argH->m_str;
struct pollfd *fds=argH->fds;
struct y_socket_t *sock=argH->sock;
struct argWorker *argw=argH->argw;
@@ -340,56 +359,139 @@ void* y_socket_handler_(void *arg){
struct main_list_y_NODE_T *nodes = sock->nodes;
update_nodes(argH->node, nodes);
printf("\n\n:::::::::::::::::::::::::::handler: : \n\n%s\n\n::::::::::::::::::::::::::\n",buf);
if(strncmp(buf, "get", 3)==0){
if(strncmp(buf+4,"file",4)==0){
char *filename = buf + 9;
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
argS->fds=fds;
argS->nodes=nodes;
argS->filename=filename;
push_back_list_TYPE_PTR(argw->list_arg, argS);
struct y_task_t task_send={
.func=y_socket_send_file_for_all_nodes,
.arg=argS,
.status=TASK_PENDING,
};
push_tasQ(argw->argx->tasQ, task_send);
//y_socket_send_file_for_all_nodes(fds, nodes, filename) ;
}
}
else if(strncmp(buf, "update", 6)==0){
if(strncmp(buf+7,"kill",4)==0){
pthread_mutex_lock(sock->mut_go_on);
sock->go_on = 0;
pthread_mutex_unlock(sock->mut_go_on);
// kill_all_workers(argw);
// printf("debug: kill_all\n");
char *buf_org = m_str->begin_list->value->buf;
//printf("\n\n:::::::::::::::::::::::::::handler: : \n\n%s\n\n::::::::::::::::::::::::::\n",buf_org);
struct js_value *js_header = create_js_value(buf_org, NULL);
if(js_header && js_header->code_type == jstype_object){
struct js_value *js_cmd = get_js_value_of_key("cmd", js_header );
if(js_cmd && js_cmd->type.object.value->code_type == jstype_string){
/** */
struct js_value *js_seq = get_js_value_of_key("seq", js_header );
if(js_seq){
size_t seq_local = (long)(js_seq->type.object.value->type.number);
printf("debug: \n HANDLER header seq_local=%ld \n",seq_local);
}else{
printf("debug: \n HANDLER header no seq\n");
}
/* */
char * buf = js_cmd->type.object.value->type.string;
if(strncmp(buf, "get", 3)==0){
if(strncmp(buf+4,"file",4)==0){
size_t len_filename = strlen(buf + 9);
char *filename = malloc(len_filename+1);
memcpy(filename, buf + 9, len_filename );
filename[len_filename]='\0';
//printf("debug: filename: %s \n\n",filename);
struct arg_send_file *argS=malloc(sizeof(struct arg_send_file));
argS->fds=fds;
argS->nodes=nodes;
argS->filename=filename;
push_back_list_TYPE_PTR(argw->list_arg, argS);
push_back_list_TYPE_PTR(argw->list_arg, filename);
struct y_task_t task_send={
.func=y_socket_send_file_for_all_nodes,
.arg=argS,
.status=TASK_PENDING,
};
push_tasQ(argw->argx->tasQ, task_send);
//y_socket_send_file_for_all_nodes(fds, nodes, filename) ;
}
}
else if(strncmp(buf, "update", 6)==0){
if(strncmp(buf+7,"kill",4)==0){
pthread_mutex_lock(sock->mut_go_on);
sock->go_on = 0;
pthread_mutex_unlock(sock->mut_go_on);
// kill_all_workers(argw);
// printf("debug: kill_all\n");
}
}
else if(strncmp(buf, "post", 4)==0){
if(strncmp(buf+5,"file",4)==0){
char *filename = buf+10;
//index_f=strcpy(filename, buf + 10);
int index_f = strlen(filename);
printf("debug: receve_from_node : file: %s\n",filename);
for(--index_f; index_f>=0;--index_f){
if(filename[index_f]=='/') {
++index_f;
break;
}
}
#if 0
//struct list_y_ptr_STRING * last_record_=NULL;
for(struct list_y_ptr_STRING * local_current = m_str->begin_list; local_current; local_current = local_current->next){
char *buf_loc = local_current->value->buf;
struct js_value * js_header_v = create_js_value(buf_loc,NULL);
//struct js_value *js_cmd_v = get_js_value_of_key("cmd", js_header_v );
//printf("debug: index=[%ld] \n BBBBBEGINNNNNN file ***\n%s\n EEEENDDDDD\n",local_current->index,buf_loc);
printf("debug: index=[%ld] \n",local_current->index);
if(js_header_v){
struct js_value *js_seq_v = get_js_value_of_key("seq", js_header_v );
if(js_seq_v){
if(js_seq_v->type.object.value->code_type == jstype_number){
printf("debug: receve : \n################################# seq : %ld ###################################\n",(long)(js_seq_v->type.object.value->type.number));
}
else{
printf("debug: \n SSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ type:%d \n",js_seq_v->type.object.value->code_type);
}
}else{
printf("debug: \n NNNNNNNNNNNNNNNNOOOOOOOOOOOOOSSSSSSSSSSSSSSSEEEEEEEEEEEEEEQQQQQQQQQQQQQ :type header : %d \n",js_header_v->code_type);
}
struct js_value *js_eof_v = get_js_value_of_key("EOF", js_header_v );
if(js_eof_v){
printf("debug: \n****************************end of file ***\n%s\n**********************************\n",buf_loc);
}
else{
//printf("debug: \n*******************************\n%s\n**********************************\n",buf_loc+js_org_str_length(js_header_v));
}
free_js_value(js_header_v);
}else{
printf("\ndebug NULLL JS___HHHEADER_V \n");
}
}
#else
struct main_list_y_ptr_HEADER_T *m_head_l_t = argH->m_head_l_t;
char srcAddr[BUF_SIZE];
set_tempAddr_from_node(srcAddr, argH->node);
receve_from_node(m_head_l_t, m_str,srcAddr, filename + index_f);
m_str = NULL;
#endif
/*
pthread_mutex_lock(sock->mut_go_on);
sock->go_on = 0;
pthread_mutex_unlock(sock->mut_go_on);
*/
// kill_all_workers(argw);
// printf("debug: kill_all\n");
}
}
}
}
else if(strncmp(buf, "post", 4)==0){
if(strncmp(buf+5,"file",4)==0){
char filename[BUF_SIZE];
strcpy(filename, buf + 10);
printf("debug: receve_from_node : file: %s\n",filename);
receve_from_node(fds, filename, strlen(filename));
/*
pthread_mutex_lock(sock->mut_go_on);
sock->go_on = 0;
pthread_mutex_unlock(sock->mut_go_on);
*/
// kill_all_workers(argw);
// printf("debug: kill_all\n");
}
free_js_value(js_header);
if(m_str){
purge_ptr_type_list_y_ptr_STRING(m_str);
printf("debug: purge_ptr_type_list_y_ptr_STRING in y_socket_handler_\n");
}
return NULL;
}
#if 0
void handle_input_kbd(char *buf, ssize_t buf_len ,void *arg){
struct y_socket_t * argSock = (struct y_socket_t*)arg;
struct pollfd *fds = argSock->fds;
@@ -463,26 +565,67 @@ void handle_input_kbd(char *buf, ssize_t buf_len ,void *arg){
}
}
}
#endif
void handle_buf_socket_rec(char *temp_all_buf, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){
void handle_buf_socket_rec(struct main_list_y_ptr_HEADER_T *m_head_l_t,struct main_list_y_ptr_STRING *m_str, y_NODE_T node, struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx, struct main_list_TYPE_PTR * list_arg, void * arg){
struct y_socket_t * argSock = (struct y_socket_t*)arg;
struct pollfd *fds = argSock->fds;
struct js_value *js_header = create_js_value(m_str->begin_list->value->buf, NULL);
if(js_header && js_header->code_type == jstype_object){
struct js_value *js_cmd = get_js_value_of_key("cmd", js_header );
if(js_cmd && js_cmd->type.object.value->code_type == jstype_string){
if(strncmp(js_cmd->type.object.value->type.string,"update standby",14)==0){
//pthread_mutex_lock(sock->mut_go_on);
//sock->go_on = 0;
//pthread_mutex_unlock(sock->mut_go_on);
standby_all_workers(workers->begin_list->value->arg);
// printf("debug: kill_all\n");
}
else if(strncmp(js_cmd->type.object.value->type.string,"update wakeup",13)==0){
//pthread_mutex_lock(sock->mut_go_on);
//sock->go_on = 0;
//pthread_mutex_unlock(sock->mut_go_on);
wakeup_all_workers(workers->begin_list->value->arg);
// printf("debug: kill_all\n");
}
else{
struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_));
ptr_argHandl->m_str = m_str;
ptr_argHandl->fds=fds;
ptr_argHandl->sock=argSock;
ptr_argHandl->node=node;
ptr_argHandl->argw=workers->begin_list->value->arg;
ptr_argHandl->m_head_l_t=m_head_l_t;
push_back_list_TYPE_PTR(list_arg, ptr_argHandl);
struct y_task_t task_handl = {
.func=y_socket_handler_,
.arg=ptr_argHandl,
.status=TASK_PENDING,
};
push_tasQ(argx->tasQ, task_handl);
}
}
}
#if 0
if(strncmp(temp_all_buf,"update standby",14)==0){
//pthread_mutex_lock(sock->mut_go_on);
//sock->go_on = 0;
//pthread_mutex_unlock(sock->mut_go_on);
standby_all_workers(workers->begin_list->value->arg);
// printf("debug: kill_all\n");
}
else if(strncmp(temp_all_buf,"update wakeup",13)==0){
}
else if(strncmp(temp_all_buf,"update wakeup",13)==0){
//pthread_mutex_lock(sock->mut_go_on);
//sock->go_on = 0;
//pthread_mutex_unlock(sock->mut_go_on);
wakeup_all_workers(workers->begin_list->value->arg);
// printf("debug: kill_all\n");
}
else{
}
else{
struct arg_handler_ *ptr_argHandl = malloc(sizeof(struct arg_handler_));
ptr_argHandl->buf = temp_all_buf;
ptr_argHandl->fds=fds;
@@ -497,7 +640,9 @@ void handle_buf_socket_rec(char *temp_all_buf, y_NODE_T node, struct main_list_p
.status=TASK_PENDING,
};
push_tasQ(argx->tasQ, task_handl);
}
}
#endif
free_js_value(js_header);
}
void *y_socket_poll_fds(void *arg){
@@ -533,9 +678,9 @@ void *y_socket_poll_fds(void *arg){
y_NODE_T node;
int af, status;
ssize_t nread, buf_len;
char buf[BUF_SIZE];
struct main_list_y_ptr_STRING *m_str=create_var_list_y_ptr_STRING();
char buf[BUF_SIZE+1];
struct main_list_y_ptr_STRING *m_str=NULL;//=create_var_list_y_ptr_STRING();
struct main_list_y_ptr_HEADER_T *m_head_l_t = create_var_list_y_ptr_HEADER_T();
// char *temp_all_buf=NULL;
// char msgRet[BUF_SIZE + NI_MAXHOST + NI_MAXSERV + 100];
@@ -556,24 +701,33 @@ void *y_socket_poll_fds(void *arg){
}
for(af = v4; af<=v6;++af){
if(fds[af].revents && POLLIN){
remove_all_ptr_type_list_y_ptr_STRING(m_str);
memset(buf, 0, BUF_SIZE);
//remove_all_ptr_type_list_y_ptr_STRING(m_str);
if(m_str == NULL)
m_str=create_var_list_y_ptr_STRING();
memset(buf, 0, BUF_SIZE+1);
#if 1
while((nread = recvfrom(fds[af].fd, buf, BUF_SIZE, 0,
(struct sockaddr *)&(node.addr), &(node.addr_len))) == BUF_SIZE){
if(buf[nread-1]=='\n') buf[nread-1]='\0';
if(buf[nread-1]=='\n')
buf[nread-1]='\0';
buf[nread]='\0';
y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread);
push_back_list_y_ptr_STRING(m_str, y_buf);
///printf("debug: push_back_list_y_ptr_STRING of <%s>\n",buf);
//printf("debug: push_back_list_y_ptr_STRING of <%s>\n",buf);
//memset(buf, 0, BUF_SIZE+1);
//printf("debug: nread: %ld vs BUF_SIZE :%d \n",nread, BUF_SIZE);
}
//printf("debug: out nread: %ld vs BUF_SIZE :%d \n",nread, BUF_SIZE);
if(nread == -1)
fprintf(stderr,"error recvfrom\n");
else if(nread >= 0 && nread < BUF_SIZE){
if(nread && buf[nread-1]=='\n') buf[nread-1]='\0';
else if(nread >= 0 && nread < BUF_SIZE){
if(nread && buf[nread-1]=='\n'
) buf[nread-1]='\0';
buf[nread]='\0';
//printf("msg: %s\n",buf);
y_ptr_STRING y_buf = create_y_ptr_STRING(buf, nread);
@@ -581,6 +735,7 @@ void *y_socket_poll_fds(void *arg){
//printf("debug: out push_back_list_y_ptr_STRING of <%s>\n",buf);
}
#endif
#if 0
struct arg_update_nodes *argUP_N=malloc(sizeof(struct arg_update_nodes));
@@ -601,17 +756,22 @@ void *y_socket_poll_fds(void *arg){
temp_all_buf=NULL;
}*/
/*size_t total_buf = */
char * temp_all_buf = NULL;
/*char * temp_all_buf = NULL;
copy_list_y_ptr_STRING_to_one_string(&temp_all_buf , m_str);
push_back_list_TYPE_PTR(list_arg, temp_all_buf);
*/
handle_buf_socket_rec(temp_all_buf, node, workers, argx, list_arg, arg);
///
//y_socket_handler_(temp_all_buf, fds, argSock);
///
}
if(m_str){
printf("debug: call handle_buf_socket_rec\n");
handle_buf_socket_rec(m_head_l_t,m_str, node, workers, argx, list_arg, arg);
m_str=NULL;
}
}
// stdin poll
if(fds[2].revents){// && POLLIN
@@ -639,12 +799,13 @@ void *y_socket_poll_fds(void *arg){
printf("debug : index_str= %d; cmd=[%s]\n",index_str, cmd);
index_str=0;
while(buf[index_buf]==' '){++index_buf;}
for(; buf[index_buf]!=' '; ++index_buf){
while((index_buf < buf_len) && (buf[index_buf]==' ')){++index_buf;}
for(; (index_buf < buf_len) && (buf[index_buf]!=' '); ++index_buf){
dst_addr[index_str++]=buf[index_buf];
}
dst_addr[index_str]='\0';
while(buf[index_buf]==' '){++index_buf;}
//while(buf[index_buf]==' '){++index_buf;}
while((index_buf < buf_len) && (buf[index_buf]==' ')){++index_buf;}
/*index_str=0;
for(; buf[index_buf]!='\n'; ++index_buf)
msg_buf[index_str++]=buf[index_buf];
@@ -699,8 +860,11 @@ void *y_socket_poll_fds(void *arg){
}
if(m_str){
purge_ptr_type_list_y_ptr_STRING(m_str);
purge_ptr_type_list_y_ptr_STRING(m_str);
printf("debug: m_str!=NULL -> purge_ptr_type_list_y_ptr_STRING done\n");
}
/*
if(temp_all_buf){
free(temp_all_buf);
@@ -711,7 +875,8 @@ void *y_socket_poll_fds(void *arg){
kill_all_workers(workers->begin_list->value->arg);
printf("debug: kill all done\n");
purge_ptr_type_list_y_ptr_HEADER_T(m_head_l_t);
printf("debug: purge_ptr_type_list_y_ptr_HEADER_T done\n");
///// ///// /////
+2 -1
View File
@@ -11,7 +11,8 @@ YJSONDIR=$(PWD)/../../yjson_t
ROOT_DIR=$(PWD)/..
INCLUDE_DIR=$(ROOT_DIR)/include
CFLAGS=-I$(INCLUDE_DIR) -I$(YTESTDIR)/include_ytest/include -I$(YLISTDIR)/src -I$(YWORKDIR)/include -I$(YJSONDIR)/src
INCLUDE=-I$(INCLUDE_DIR) -I$(YTESTDIR)/include_ytest/include -I$(YLISTDIR)/src -I$(YWORKDIR)/include -I$(YJSONDIR)/src
CFLAGS=-Wall -Werror -fpic $(INCLUDE)
LDFLAGS=-L$(YTESTDIR) -lytest -lpthread -lm -lOpenCL
#SRC_DIR=$(ROOT_DIR)/src
+1 -1
View File
@@ -40,8 +40,8 @@ struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){
while(tasQ->list_tasQ->end_list == NULL){
pthread_cond_wait(tasQ->cond_tasQ, tasQ->mut_tasQ);
}
printf("debug: call pull_begin_from_list_y_TASK_T debut\n");
valueRet = pull_begin_from_list_y_TASK_T(tasQ->list_tasQ);
printf("debug: call pull_begin_from_list_y_TASK_T debut\n");
// valueRet = pull_end_from_list_y_TASK_T(tasQ->list_tasQ);
//printf("debug: call pull_begin_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL);
pthread_mutex_unlock(tasQ->mut_tasQ);