diff --git a/list_t/src/list_t/list_t.h b/list_t/src/list_t/list_t.h index c2445b7..d42572d 100644 --- a/list_t/src/list_t/list_t.h +++ b/list_t/src/list_t/list_t.h @@ -33,6 +33,7 @@ struct list_##type * search_first_occ_with_mov_from_curr_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type), void (*incr_or_decr_mov)(struct main_list_##type *));\ struct list_##type * search_first_occ_from_begin_in_list_##type(struct main_list_##type *var_list, type value, int (*funcCmp)(type, type));\ struct list_##type * pull_end_from_list_##type(struct main_list_##type *var_list);\ + struct list_##type * pull_begin_from_list_##type(struct main_list_##type *var_list);\ void append_list_##type(struct main_list_##type *var_list, struct list_##type *list);\ @@ -78,6 +79,7 @@ GENERATE_LIST_ALL(TYPE_PTR) }else {\ var_list->begin_list = list_to_add;\ var_list->current_list= list_to_add;\ + var_list->current_index=0;\ }\ var_list->end_list = list_to_add;\ ++(var_list->size);\ @@ -93,6 +95,7 @@ GENERATE_LIST_ALL(TYPE_PTR) }else {\ var_list->end_list = list_to_add;\ var_list->current_list= list_to_add;\ + var_list->current_index=0;\ }\ var_list->begin_list = list_to_add;\ ++(var_list->size);\ @@ -230,6 +233,24 @@ GENERATE_LIST_ALL(TYPE_PTR) }\ var_list->end_list = prevL;\ --(var_list->size);\ + ret->preview = NULL;\ + }\ + return ret;\ + }\ + struct list_##type * pull_begin_from_list_##type(struct main_list_##type *var_list){\ + struct list_##type *ret = var_list->begin_list;\ + if(ret != NULL){\ + struct list_##type * nextL = ret->next;\ + if(nextL != NULL){\ + nextL->preview=NULL;\ + }else{\ + var_list->end_list=NULL;\ + var_list->current_index = 0;\ + var_list->current_list=NULL;\ + }\ + var_list->begin_list = nextL;\ + --(var_list->size);\ + ret->next = NULL;\ }\ return ret;\ }\ diff --git a/y_worker_t/include/y_worker_t/y_worker_t.h b/y_worker_t/include/y_worker_t/y_worker_t.h index 3a0c410..757daa6 100644 --- a/y_worker_t/include/y_worker_t/y_worker_t.h +++ b/y_worker_t/include/y_worker_t/y_worker_t.h @@ -14,6 +14,7 @@ #define WORKER_ON 1 #define WORKER_OFF 0 + struct argWorker; typedef struct y_worker_t{ @@ -29,29 +30,41 @@ typedef struct y_worker_t{ typedef struct y_worker_t * ptr_y_WORKER_T; + //GENERATE_LIST_ALL(y_WORKER_T) GENERATE_LIST_ALL(ptr_y_WORKER_T) +//GENERATE_PTR_type_SIG((ptr_y_WORKER_T) -ptr_y_WORKER_T create_ptr_y_WORKER_T(int exec, int id); -void free_ptr_y_WORKER_T(ptr_y_WORKER_T pworker); -void purge_ptr_y_WORKER_T_in_list(struct main_list_ptr_y_WORKER_T *list_workers); +//ptr_y_WORKER_T create_ptr_y_WORKER_T(int exec, int id); +ptr_y_WORKER_T create_ptr_y_WORKER_T(struct main_list_ptr_y_WORKER_T * workers, + struct main_list_TYPE_PTR *list_arg, + pthread_mutex_t *mut_workers, + struct argExecTasQ *argx, int exec, int id ); +//void free_ptr_y_WORKER_T(ptr_y_WORKER_T pworker); + + +void purge_list_ptr_y_WORKER_T(struct main_list_ptr_y_WORKER_T *list_workers); +void purge_list_TYPE_PTR(struct main_list_TYPE_PTR *list_voids); struct argWorker { struct argExecTasQ *argx; struct y_worker_t *pworker; + struct main_list_ptr_y_WORKER_T * workers; + struct main_list_TYPE_PTR * list_arg; + pthread_mutex_t *mut_workers; }; -void assign_argWorker_of_ptr_y_WORKER_T(ptr_y_WORKER_T pworker, struct argExecTasQ *argx, pthread_mutex_t *mut_worker); void* execute_work(void* arg); -void kill_all_workers( - struct main_list_ptr_y_WORKER_T * workers, - struct argExecTasQ *argx -); -void wait_and_free_workers(struct main_list_ptr_y_WORKER_T *workers); +void kill_all_workers(struct argWorker *argw); + +//void kill_all_workers(struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx); +//void wait_workers(struct main_list_ptr_y_WORKER_T *workers); +//void free_workers(struct main_list_ptr_y_WORKER_T *workers); +void free_workers_and_argx(struct main_list_ptr_y_WORKER_T *workers, struct argExecTasQ *argx); #endif /* Y_WORKER_T_H__C */ diff --git a/y_worker_t/src/y_worker_t/y_task_t.c b/y_worker_t/src/y_worker_t/y_task_t.c index 1e890c4..0391361 100644 --- a/y_worker_t/src/y_worker_t/y_task_t.c +++ b/y_worker_t/src/y_worker_t/y_task_t.c @@ -26,27 +26,28 @@ void free_y_tasQ(struct y_tasQ * tasQ){ } void push_tasQ(struct y_tasQ *tasQ, struct y_task_t task){ - //printf("debug: push_tasQ debut\n"); + printf("debug: push_tasQ debut\n"); pthread_mutex_lock(tasQ->mut_tasQ); push_back_list_y_TASK_T(tasQ->list_tasQ, task); pthread_mutex_unlock(tasQ->mut_tasQ); pthread_cond_signal(tasQ->cond_tasQ); - //printf("debug: push_tasQ fin\n"); + printf("debug: push_tasQ fin\n"); } struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){ - //printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self()); + printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self()); struct list_y_TASK_T *valueRet = NULL; pthread_mutex_lock(tasQ->mut_tasQ); while(tasQ->list_tasQ->end_list == NULL){ pthread_cond_wait(tasQ->cond_tasQ, tasQ->mut_tasQ); } - //printf("debug: call pull_end_from_list_y_TASK_T debut\n"); - valueRet = pull_end_from_list_y_TASK_T(tasQ->list_tasQ); - //printf("debug: call pull_end_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL); + printf("debug: call pull_begin_from_list_y_TASK_T debut\n"); + valueRet = pull_begin_from_list_y_TASK_T(tasQ->list_tasQ); +// valueRet = pull_end_from_list_y_TASK_T(tasQ->list_tasQ); + printf("debug: call pull_begin_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL); pthread_mutex_unlock(tasQ->mut_tasQ); - //printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self()); + printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self()); return valueRet; } @@ -79,7 +80,7 @@ void * execute_task(void *arg){ while(check_go_on_tasQ(argx)){ l_task = pull_tasQ(tasQ); - //printf("debug: is l_task NULL? = %d\n", l_task==NULL); + printf("debug: is l_task NULL? = %d\n", l_task==NULL); if(l_task){ if((l_task->value.status != TASK_DONE) && (l_task->value.func!=NULL)) l_task->value.ret = l_task->value.func(l_task->value.arg); @@ -92,7 +93,7 @@ void * execute_task(void *arg){ } - //printf("debug: -------------------> exit task exec \n"); + printf("debug: -------------------> exit task exec \n"); // usleep(1000); return NULL; } diff --git a/y_worker_t/src/y_worker_t/y_worker_t.c b/y_worker_t/src/y_worker_t/y_worker_t.c index 1531865..64705fc 100644 --- a/y_worker_t/src/y_worker_t/y_worker_t.c +++ b/y_worker_t/src/y_worker_t/y_worker_t.c @@ -4,44 +4,73 @@ GEN_LIST_ALL(ptr_y_WORKER_T) -ptr_y_WORKER_T create_ptr_y_WORKER_T(int exec, int id){ +//GENERATE_PTR_type_FUNC(ptr_y_WORKER_T) + + +void * execute_workNULL(void *arg){ + usleep(500); + return NULL; +} + +ptr_y_WORKER_T create_ptr_y_WORKER_T(struct main_list_ptr_y_WORKER_T * workers, + struct main_list_TYPE_PTR *list_arg, + pthread_mutex_t *mut_workers, + struct argExecTasQ *argx, int exec, int id ){ ptr_y_WORKER_T pworker = malloc(sizeof(y_WORKER_T)); pworker->exec=exec; + pworker->status=WORKER_OFF; pworker->mut_worker = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(pworker->mut_worker, NULL); pworker->cond_worker = malloc(sizeof(pthread_cond_t)); pthread_cond_init(pworker->cond_worker, NULL); pworker->id=id; + pworker->id_thread=id; pworker->thread = malloc(sizeof(pthread_t)); + - pworker->arg = NULL; + pworker->arg = malloc(sizeof( struct argWorker)); + pworker->arg->argx = argx; + pworker->arg->pworker = pworker; + pworker->arg->workers = workers; + pworker->arg->mut_workers = mut_workers; + pworker->arg->list_arg = list_arg; + + pthread_mutex_lock(mut_workers); + push_back_list_ptr_y_WORKER_T(workers, pworker); + pthread_mutex_unlock(mut_workers); + + pthread_create(pworker->thread,NULL,execute_work,(void*)(pworker->arg)); + return pworker; } -void free_ptr_y_WORKER_T(ptr_y_WORKER_T pworker){ - //ptr_y_WORKER_T pworker = *p_worker; +void free_ptr_y_WORKER_T(void* p_worker){ + ptr_y_WORKER_T pworker = (struct y_worker_t*)p_worker; pthread_mutex_destroy(pworker->mut_worker); free(pworker->mut_worker); pthread_cond_destroy(pworker->cond_worker); free(pworker->cond_worker); free(pworker->thread); - if(pworker->arg!=NULL) free(pworker->arg); + free(pworker->arg); free(pworker); } -void purge_ptr_y_WORKER_T_in_list(struct main_list_ptr_y_WORKER_T *list_workers){ + +void purge_list_ptr_y_WORKER_T(struct main_list_ptr_y_WORKER_T *list_workers){ for(move_current_to_begin_list_ptr_y_WORKER_T(list_workers); list_workers->current_list; increment_list_ptr_y_WORKER_T(list_workers)){ free_ptr_y_WORKER_T(list_workers->current_list->value); } + free_all_var_list_ptr_y_WORKER_T(list_workers); +} +void purge_list_TYPE_PTR(struct main_list_TYPE_PTR *list_voids){ + for(move_current_to_begin_list_TYPE_PTR(list_voids); list_voids->current_list; increment_list_TYPE_PTR(list_voids)){ + free(list_voids->current_list->value); + } + free_all_var_list_TYPE_PTR(list_voids); } -void assign_argWorker_of_ptr_y_WORKER_T(ptr_y_WORKER_T pworker, struct argExecTasQ *argx, pthread_mutex_t *mut_workers){ - pworker->arg = malloc(sizeof( struct argWorker)); - pworker->arg->argx = argx; - pworker->arg->pworker = pworker; - pworker->arg->mut_workers = mut_workers; -} + void* execute_work(void* arg){ struct argWorker *argw = (struct argWorker*)arg; @@ -56,33 +85,47 @@ void* execute_work(void* arg){ pworker->status=WORKER_ON; pworker->id_thread=id_thread; pthread_mutex_unlock(pworker->mut_worker); - //printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); + pthread_cond_signal(pworker->cond_worker); + printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); do{ - //printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); + printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); execute_task((void*)argx); - //printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); + printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); pthread_mutex_lock(pworker->mut_worker); exec=pworker->exec; pthread_mutex_unlock(pworker->mut_worker); - //printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); + printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); }while(exec); pthread_mutex_lock(pworker->mut_worker); pworker->status=WORKER_OFF; - //printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread); + printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread); pthread_mutex_unlock(pworker->mut_worker); pthread_cond_signal(pworker->cond_worker); - //free_y_worker_t(worker); // usleep(1000); return NULL; } -void kill_all_workers( - struct main_list_ptr_y_WORKER_T * workers, - struct argExecTasQ *argx -){ - usleep(100); // need interruption to wait a little bit +void wait_workers(struct main_list_ptr_y_WORKER_T *workers){ + for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ + pthread_join(*(workers->current_list->value->thread) ,NULL); + } +} + + +int check_worker_status(struct y_worker_t * pworker ){ + int ret; + pthread_mutex_lock(pworker->mut_worker); + ret = pworker->status; + pthread_mutex_unlock(pworker->mut_worker); + return ret; +} + +void kill_all_workers ( struct argWorker *argw){ + struct main_list_ptr_y_WORKER_T * workers = argw->workers; + struct argExecTasQ *argx = argw->argx; + for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ pthread_mutex_lock(workers->current_list->value->mut_worker); workers->current_list->value->exec=KILL_WORKER; @@ -93,50 +136,33 @@ void kill_all_workers( (argx->go_on)=0; pthread_mutex_unlock(argx->tasQ->mut_tasQ); - for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ - //if(workers->current_list->value->status == WORKER_ON) +/// if(check_worker_status(workers->current_list->value) == WORKER_ON) { struct y_task_t task = { .func=NULL, - .arg=argx, + .arg=NULL, .status=TASK_DONE, }; push_tasQ(argx->tasQ, task); } } +///} + -} + wait_workers(workers); + + + pthread_mutex_destroy(argw->mut_workers); + free(argw->mut_workers); + purge_list_TYPE_PTR(argw->list_arg); + purge_list_ptr_y_WORKER_T(workers); -void wait_and_free_workers(struct main_list_ptr_y_WORKER_T *workers){ - int reterror; - for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ - pthread_mutex_lock(workers->current_list->value->mut_worker); - if((workers->current_list->value->status) == WORKER_ON){ - while(workers->current_list->value->status == WORKER_ON){ - pthread_cond_wait(workers->current_list->value->cond_worker, workers->current_list->value->mut_worker); - } - reterror = pthread_join(*(workers->current_list->value->thread) ,NULL); - if(0!=reterror){ - //printf("debug: error %d pthread_join thread %ld\n",reterror,(workers->current_list->value->id)); - } - //printf("debug: JOIN done, thread id:%ld id_thread:%ld\n", (workers->current_list->value->id), (workers->current_list->value->id_thread) ); - }else{ - //printf("debug: thread id:%ld id_thread:%ld already KILLED\n", (workers->current_list->value->id), (workers->current_list->value->id_thread) ); - - } - - workers->current_list->value->status=WORKER_OFF; - pthread_mutex_unlock(workers->current_list->value->mut_worker); - } - -// free_argExecTasQ(argx); - purge_ptr_y_WORKER_T_in_list(workers); - free_all_var_list_ptr_y_WORKER_T(workers); - -// pthread_mutex_destroy(mut_workers); -// free(mut_workers); + free_argExecTasQ(argx); + } + + diff --git a/y_worker_t/test/is_good.c b/y_worker_t/test/is_good.c index a942f84..475969a 100644 --- a/y_worker_t/test/is_good.c +++ b/y_worker_t/test/is_good.c @@ -52,14 +52,14 @@ TEST(worker){ #endif void* funcPrintSelf(void* arg){ - LOG("func: begin func call\n"); - struct argExecTasQ *argx=(struct argExecTasQ*)arg; - static long int count = 0; //randomm=rand()%1000; + int *count = (int*)arg; //randomm=rand()%1000; + LOG("func: begin func call %d\n",*count); +// struct argExecTasQ *argx=(struct argExecTasQ*)arg; size_t thread_id=pthread_self(); - LOG("func: my status=%d, func number=%ld; threadid=%ld\n",(argx->go_on), count, thread_id); + LOG("func: number=%d; threadid=%ld\n", *count, thread_id); usleep(200000); - LOG("func: end func %ld; in thread=%ld\n",count,thread_id); - ++count; + LOG("func: end func %d; in thread=%ld\n",*count,thread_id); +// free(count); return NULL; } @@ -98,40 +98,51 @@ void* funcDepCall(void* arg){ TEST(threads){ + srand(time(NULL)); struct main_list_ptr_y_WORKER_T * workers = create_var_list_ptr_y_WORKER_T(); + + struct main_list_TYPE_PTR * list_arg = create_var_list_TYPE_PTR(); + struct argExecTasQ *argx = create_argExecTasQ(); pthread_mutex_t *mut_workers = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(mut_workers, NULL); - - for(int i=0; i<4; ++i){ - struct y_worker_t *pw= create_ptr_y_WORKER_T(GO_ON_WORKER, i); - assign_argWorker_of_ptr_y_WORKER_T(pw, argx, mut_workers); - pthread_mutex_lock(mut_workers); - //append_list_ptr_y_WORKER_T(workers, li_pw); - push_back_list_ptr_y_WORKER_T(workers, pw); - pthread_mutex_unlock(mut_workers); - LOG("%d workers %ld created\n",4,(pw->arg->pworker->id)); - pthread_create(pw->thread,NULL,execute_work,(void*)(pw->arg)); +int nb_worker=4; + for(int i=0; iarg->pworker->id)); //usleep(500); } -//usleep(500000); +usleep(50000); - - for(int i=0; i<40;++i) +int nb_task=9; + for(int i=0; itasQ, task); + push_back_list_TYPE_PTR(list_arg, j); // usleep(10000); } - LOG("%d tasks created\n",40); + LOG("%d tasks created\n",nb_task); struct dependency_task * argDep = create_dependency_task(); + struct y_task_t taskR = { + .func=funcDepRel, + .arg=argDep, + .status=TASK_PENDING, + }; + push_tasQ(argx->tasQ, taskR); + LOG(" +++++++++++++++++ task rel dep created\n"); + struct y_task_t taskCal = { .func=funcDepCall, @@ -141,38 +152,51 @@ TEST(threads){ push_tasQ(argx->tasQ, taskCal); LOG(" +++++++++++++++++ task call dep created\n"); - +/* for(int i=4; i<8; ++i){ - struct y_worker_t *pw= create_ptr_y_WORKER_T(GO_ON_WORKER, i); - assign_argWorker_of_ptr_y_WORKER_T(pw, argx, mut_workers); - pthread_mutex_lock(mut_workers); - //append_list_ptr_y_WORKER_T(workers, li_pw); - push_back_list_ptr_y_WORKER_T(workers, pw); - pthread_mutex_unlock(mut_workers); - LOG("%d workers %ld created\n",4,(pw->arg->pworker->id)); - pthread_create(pw->thread,NULL,execute_work,(void*)(pw->arg)); + struct y_worker_t *pw= create_ptr_y_WORKER_T(workers, mut_workers, argx, GO_ON_WORKER, i); + LOG("%d workers %ld created\n",4,(pw->arg->pworker->id)); //usleep(500); } + LOG("another %d workers created\n",4); - struct y_task_t taskR = { - .func=funcDepRel, - .arg=argDep, - .status=TASK_PENDING, - }; - push_tasQ(argx->tasQ, taskR); - LOG(" +++++++++++++++++ task rel dep created\n"); - -//usleep(50); +*/ + usleep(600000); - kill_all_workers(workers, argx); + kill_all_workers(workers->begin_list->value->arg); +// kill_all_workers(workers, argx); - wait_and_free_workers(workers); +// wait_workers(workers); +//usleep(50000); +// free_workers_and_argx(workers, argx); +// free_argExecTasQ(argx); +// pthread_mutex_destroy(mut_workers); +// free(mut_workers); + +/* + + purge_ptr_y_WORKER_T_in_list(workers); + free_all_var_list_ptr_y_WORKER_T(workers); + free_argExecTasQ(argx); - pthread_mutex_destroy(mut_workers); - free(mut_workers); +*/ + +// kill_all_workers(workers, argx); + +/* pthread_mutex_destroy(mut_workers); + free(mut_workers); +*/ - free_dependency_task(argDep); +free_dependency_task(argDep); + +// purge_TYPE_PTR_in_list(void_list_arg); +// free_all_var_list_TYPE_PTR(void_list_arg); + +/* purge_list_TYPE_PTR(list_arg); + purge_list_ptr_y_WORKER_T(workers); + free_argExecTasQ(argx); + */ } int main(int argc, char **argv){