diff --git a/y_worker_t/include/y_worker_t/y_task_t.h b/y_worker_t/include/y_worker_t/y_task_t.h index bd82c61..67961bb 100644 --- a/y_worker_t/include/y_worker_t/y_task_t.h +++ b/y_worker_t/include/y_worker_t/y_task_t.h @@ -44,4 +44,16 @@ void free_argExecTasQ(struct argExecTasQ * arg); void * execute_task(void *arg); +struct dependency_task{ + int done; + pthread_mutex_t *mut_dep; + pthread_cond_t *cond_dep; +}; + +struct dependency_task * create_dependency_task(); +void free_dependency_task(struct dependency_task * dep_task); + +void release_dependancy_task(struct dependency_task *dep); +void wait_dependancy_task(struct dependency_task *dep); + #endif /* Y_TASK_T_H__C */ diff --git a/y_worker_t/src/y_worker_t/y_task_t.c b/y_worker_t/src/y_worker_t/y_task_t.c index ce439fc..1e890c4 100644 --- a/y_worker_t/src/y_worker_t/y_task_t.c +++ b/y_worker_t/src/y_worker_t/y_task_t.c @@ -26,27 +26,27 @@ void free_y_tasQ(struct y_tasQ * tasQ){ } void push_tasQ(struct y_tasQ *tasQ, struct y_task_t task){ - printf("debug: push_tasQ debut\n"); + //printf("debug: push_tasQ debut\n"); pthread_mutex_lock(tasQ->mut_tasQ); push_back_list_y_TASK_T(tasQ->list_tasQ, task); pthread_mutex_unlock(tasQ->mut_tasQ); pthread_cond_signal(tasQ->cond_tasQ); - printf("debug: push_tasQ fin\n"); + //printf("debug: push_tasQ fin\n"); } struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){ - printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self()); + //printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self()); struct list_y_TASK_T *valueRet = NULL; pthread_mutex_lock(tasQ->mut_tasQ); while(tasQ->list_tasQ->end_list == NULL){ pthread_cond_wait(tasQ->cond_tasQ, tasQ->mut_tasQ); } - printf("debug: call pull_end_from_list_y_TASK_T debut\n"); + //printf("debug: call pull_end_from_list_y_TASK_T debut\n"); valueRet = pull_end_from_list_y_TASK_T(tasQ->list_tasQ); - printf("debug: call pull_end_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL); + //printf("debug: call pull_end_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL); pthread_mutex_unlock(tasQ->mut_tasQ); - printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self()); + //printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self()); return valueRet; } @@ -79,7 +79,7 @@ void * execute_task(void *arg){ while(check_go_on_tasQ(argx)){ l_task = pull_tasQ(tasQ); - printf("debug: is l_task NULL? = %d\n", l_task==NULL); + //printf("debug: is l_task NULL? = %d\n", l_task==NULL); if(l_task){ if((l_task->value.status != TASK_DONE) && (l_task->value.func!=NULL)) l_task->value.ret = l_task->value.func(l_task->value.arg); @@ -92,9 +92,45 @@ void * execute_task(void *arg){ } - printf("debug: -------------------> exit task exec \n"); - usleep(1000); + //printf("debug: -------------------> exit task exec \n"); + // usleep(1000); return NULL; } +struct dependency_task * create_dependency_task(){ + struct dependency_task *dep=malloc(sizeof(struct dependency_task)); + + dep->mut_dep = malloc(sizeof(pthread_mutex_t)); + dep->cond_dep = malloc(sizeof(pthread_cond_t)); + pthread_mutex_init(dep->mut_dep, NULL); + pthread_cond_init(dep->cond_dep, NULL); + dep->done = 0; + + return dep; +} + +void free_dependency_task(struct dependency_task * dep_task){ + pthread_mutex_destroy(dep_task->mut_dep); + pthread_cond_destroy(dep_task->cond_dep); + free(dep_task->mut_dep); + free(dep_task->cond_dep); + + free(dep_task); + dep_task= NULL; +} + +void release_dependancy_task(struct dependency_task *dep){ + pthread_mutex_lock(dep->mut_dep); + dep->done = 1; + pthread_mutex_unlock(dep->mut_dep); + pthread_cond_signal(dep->cond_dep); +} + +void wait_dependancy_task(struct dependency_task *dep){ + pthread_mutex_lock(dep->mut_dep); + while(!(dep->done)){ + pthread_cond_wait(dep->cond_dep, dep->mut_dep); + } + pthread_mutex_unlock(dep->mut_dep); +} diff --git a/y_worker_t/src/y_worker_t/y_worker_t.c b/y_worker_t/src/y_worker_t/y_worker_t.c index 0f75274..1531865 100644 --- a/y_worker_t/src/y_worker_t/y_worker_t.c +++ b/y_worker_t/src/y_worker_t/y_worker_t.c @@ -56,21 +56,21 @@ void* execute_work(void* arg){ pworker->status=WORKER_ON; pworker->id_thread=id_thread; pthread_mutex_unlock(pworker->mut_worker); - printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); + //printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); do{ - printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); + //printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); execute_task((void*)argx); - printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); + //printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); pthread_mutex_lock(pworker->mut_worker); exec=pworker->exec; pthread_mutex_unlock(pworker->mut_worker); - printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); + //printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); }while(exec); pthread_mutex_lock(pworker->mut_worker); pworker->status=WORKER_OFF; - printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread); + //printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread); pthread_mutex_unlock(pworker->mut_worker); pthread_cond_signal(pworker->cond_worker); //free_y_worker_t(worker); @@ -82,6 +82,7 @@ void kill_all_workers( struct main_list_ptr_y_WORKER_T * workers, struct argExecTasQ *argx ){ + usleep(100); // need interruption to wait a little bit for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ pthread_mutex_lock(workers->current_list->value->mut_worker); workers->current_list->value->exec=KILL_WORKER; @@ -118,17 +119,11 @@ void wait_and_free_workers(struct main_list_ptr_y_WORKER_T *workers){ } reterror = pthread_join(*(workers->current_list->value->thread) ,NULL); if(0!=reterror){ - printf("debug: error %d pthread_join thread %ld\n",reterror,(workers->current_list->value->id)); + //printf("debug: error %d pthread_join thread %ld\n",reterror,(workers->current_list->value->id)); } - printf("debug: JOIN done, thread id:%ld id_thread:%ld\n", - (workers->current_list->value->id), - (workers->current_list->value->id_thread) - ); + //printf("debug: JOIN done, thread id:%ld id_thread:%ld\n", (workers->current_list->value->id), (workers->current_list->value->id_thread) ); }else{ - printf("debug: thread id:%ld id_thread:%ld already KILLED\n", - (workers->current_list->value->id), - (workers->current_list->value->id_thread) - ); + //printf("debug: thread id:%ld id_thread:%ld already KILLED\n", (workers->current_list->value->id), (workers->current_list->value->id_thread) ); } diff --git a/y_worker_t/test/is_good.c b/y_worker_t/test/is_good.c index 33394aa..a942f84 100644 --- a/y_worker_t/test/is_good.c +++ b/y_worker_t/test/is_good.c @@ -75,6 +75,25 @@ void* funcEnd(void* arg){ return NULL; } +void* funcDepRel(void* arg){ + LOG("func: begin func release\n"); + struct dependency_task *dep = (struct dependency_task*)arg; + size_t thread_id=pthread_self(); + LOG("func: threadid=%ld\n", thread_id); + usleep(300000); + release_dependancy_task(dep); + LOG("==============-------------------> func: end func release in thread=%ld\n",thread_id); + return NULL; +} +void* funcDepCall(void* arg){ + LOG("func: begin func Call dep\n"); + struct dependency_task *dep = (struct dependency_task*)arg; + size_t thread_id=pthread_self(); + LOG("func call: threadid=%ld\n", thread_id); + wait_dependancy_task(dep); + LOG("==============-----------------> func: end func call dep in thread=%ld\n",thread_id); + return NULL; +} TEST(threads){ @@ -111,6 +130,17 @@ TEST(threads){ // usleep(10000); } LOG("%d tasks created\n",40); + + struct dependency_task * argDep = create_dependency_task(); + + struct y_task_t taskCal = { + .func=funcDepCall, + .arg=argDep, + .status=TASK_PENDING, + }; + push_tasQ(argx->tasQ, taskCal); + + LOG(" +++++++++++++++++ task call dep created\n"); for(int i=4; i<8; ++i){ struct y_worker_t *pw= create_ptr_y_WORKER_T(GO_ON_WORKER, i); @@ -124,7 +154,16 @@ TEST(threads){ //usleep(500); } LOG("another %d workers created\n",4); + struct y_task_t taskR = { + .func=funcDepRel, + .arg=argDep, + .status=TASK_PENDING, + }; + push_tasQ(argx->tasQ, taskR); + LOG(" +++++++++++++++++ task rel dep created\n"); +//usleep(50); + kill_all_workers(workers, argx); wait_and_free_workers(workers); @@ -133,6 +172,7 @@ TEST(threads){ pthread_mutex_destroy(mut_workers); free(mut_workers); + free_dependency_task(argDep); } int main(int argc, char **argv){