task & worker: add dependency task
This commit is contained in:
@@ -44,4 +44,16 @@ void free_argExecTasQ(struct argExecTasQ * arg);
|
||||
|
||||
void * execute_task(void *arg);
|
||||
|
||||
struct dependency_task{
|
||||
int done;
|
||||
pthread_mutex_t *mut_dep;
|
||||
pthread_cond_t *cond_dep;
|
||||
};
|
||||
|
||||
struct dependency_task * create_dependency_task();
|
||||
void free_dependency_task(struct dependency_task * dep_task);
|
||||
|
||||
void release_dependancy_task(struct dependency_task *dep);
|
||||
void wait_dependancy_task(struct dependency_task *dep);
|
||||
|
||||
#endif /* Y_TASK_T_H__C */
|
||||
|
||||
@@ -26,27 +26,27 @@ void free_y_tasQ(struct y_tasQ * tasQ){
|
||||
}
|
||||
|
||||
void push_tasQ(struct y_tasQ *tasQ, struct y_task_t task){
|
||||
printf("debug: push_tasQ debut\n");
|
||||
//printf("debug: push_tasQ debut\n");
|
||||
pthread_mutex_lock(tasQ->mut_tasQ);
|
||||
push_back_list_y_TASK_T(tasQ->list_tasQ, task);
|
||||
pthread_mutex_unlock(tasQ->mut_tasQ);
|
||||
pthread_cond_signal(tasQ->cond_tasQ);
|
||||
printf("debug: push_tasQ fin\n");
|
||||
//printf("debug: push_tasQ fin\n");
|
||||
}
|
||||
struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){
|
||||
printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self());
|
||||
//printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self());
|
||||
struct list_y_TASK_T *valueRet = NULL;
|
||||
pthread_mutex_lock(tasQ->mut_tasQ);
|
||||
while(tasQ->list_tasQ->end_list == NULL){
|
||||
pthread_cond_wait(tasQ->cond_tasQ, tasQ->mut_tasQ);
|
||||
}
|
||||
printf("debug: call pull_end_from_list_y_TASK_T debut\n");
|
||||
//printf("debug: call pull_end_from_list_y_TASK_T debut\n");
|
||||
valueRet = pull_end_from_list_y_TASK_T(tasQ->list_tasQ);
|
||||
printf("debug: call pull_end_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL);
|
||||
//printf("debug: call pull_end_from_list_y_TASK_T fin, is tasQ NULL? : %d\nis tasQ->list_tasQ NULL?:%d\n", tasQ==NULL, tasQ->list_tasQ == NULL);
|
||||
pthread_mutex_unlock(tasQ->mut_tasQ);
|
||||
|
||||
|
||||
printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self());
|
||||
//printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self());
|
||||
return valueRet;
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ void * execute_task(void *arg){
|
||||
while(check_go_on_tasQ(argx)){
|
||||
|
||||
l_task = pull_tasQ(tasQ);
|
||||
printf("debug: is l_task NULL? = %d\n", l_task==NULL);
|
||||
//printf("debug: is l_task NULL? = %d\n", l_task==NULL);
|
||||
if(l_task){
|
||||
if((l_task->value.status != TASK_DONE) && (l_task->value.func!=NULL))
|
||||
l_task->value.ret = l_task->value.func(l_task->value.arg);
|
||||
@@ -92,9 +92,45 @@ void * execute_task(void *arg){
|
||||
|
||||
|
||||
}
|
||||
printf("debug: -------------------> exit task exec \n");
|
||||
usleep(1000);
|
||||
//printf("debug: -------------------> exit task exec \n");
|
||||
// usleep(1000);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct dependency_task * create_dependency_task(){
|
||||
struct dependency_task *dep=malloc(sizeof(struct dependency_task));
|
||||
|
||||
dep->mut_dep = malloc(sizeof(pthread_mutex_t));
|
||||
dep->cond_dep = malloc(sizeof(pthread_cond_t));
|
||||
pthread_mutex_init(dep->mut_dep, NULL);
|
||||
pthread_cond_init(dep->cond_dep, NULL);
|
||||
dep->done = 0;
|
||||
|
||||
return dep;
|
||||
}
|
||||
|
||||
void free_dependency_task(struct dependency_task * dep_task){
|
||||
pthread_mutex_destroy(dep_task->mut_dep);
|
||||
pthread_cond_destroy(dep_task->cond_dep);
|
||||
free(dep_task->mut_dep);
|
||||
free(dep_task->cond_dep);
|
||||
|
||||
free(dep_task);
|
||||
dep_task= NULL;
|
||||
}
|
||||
|
||||
void release_dependancy_task(struct dependency_task *dep){
|
||||
pthread_mutex_lock(dep->mut_dep);
|
||||
dep->done = 1;
|
||||
pthread_mutex_unlock(dep->mut_dep);
|
||||
pthread_cond_signal(dep->cond_dep);
|
||||
}
|
||||
|
||||
void wait_dependancy_task(struct dependency_task *dep){
|
||||
pthread_mutex_lock(dep->mut_dep);
|
||||
while(!(dep->done)){
|
||||
pthread_cond_wait(dep->cond_dep, dep->mut_dep);
|
||||
}
|
||||
pthread_mutex_unlock(dep->mut_dep);
|
||||
}
|
||||
|
||||
|
||||
@@ -56,21 +56,21 @@ void* execute_work(void* arg){
|
||||
pworker->status=WORKER_ON;
|
||||
pworker->id_thread=id_thread;
|
||||
pthread_mutex_unlock(pworker->mut_worker);
|
||||
printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread);
|
||||
//printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread);
|
||||
do{
|
||||
printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread);
|
||||
//printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread);
|
||||
execute_task((void*)argx);
|
||||
printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread);
|
||||
//printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread);
|
||||
pthread_mutex_lock(pworker->mut_worker);
|
||||
exec=pworker->exec;
|
||||
pthread_mutex_unlock(pworker->mut_worker);
|
||||
printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread);
|
||||
//printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread);
|
||||
}while(exec);
|
||||
|
||||
|
||||
pthread_mutex_lock(pworker->mut_worker);
|
||||
pworker->status=WORKER_OFF;
|
||||
printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread);
|
||||
//printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread);
|
||||
pthread_mutex_unlock(pworker->mut_worker);
|
||||
pthread_cond_signal(pworker->cond_worker);
|
||||
//free_y_worker_t(worker);
|
||||
@@ -82,6 +82,7 @@ void kill_all_workers(
|
||||
struct main_list_ptr_y_WORKER_T * workers,
|
||||
struct argExecTasQ *argx
|
||||
){
|
||||
usleep(100); // need interruption to wait a little bit
|
||||
for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){
|
||||
pthread_mutex_lock(workers->current_list->value->mut_worker);
|
||||
workers->current_list->value->exec=KILL_WORKER;
|
||||
@@ -118,17 +119,11 @@ void wait_and_free_workers(struct main_list_ptr_y_WORKER_T *workers){
|
||||
}
|
||||
reterror = pthread_join(*(workers->current_list->value->thread) ,NULL);
|
||||
if(0!=reterror){
|
||||
printf("debug: error %d pthread_join thread %ld\n",reterror,(workers->current_list->value->id));
|
||||
//printf("debug: error %d pthread_join thread %ld\n",reterror,(workers->current_list->value->id));
|
||||
}
|
||||
printf("debug: JOIN done, thread id:%ld id_thread:%ld\n",
|
||||
(workers->current_list->value->id),
|
||||
(workers->current_list->value->id_thread)
|
||||
);
|
||||
//printf("debug: JOIN done, thread id:%ld id_thread:%ld\n", (workers->current_list->value->id), (workers->current_list->value->id_thread) );
|
||||
}else{
|
||||
printf("debug: thread id:%ld id_thread:%ld already KILLED\n",
|
||||
(workers->current_list->value->id),
|
||||
(workers->current_list->value->id_thread)
|
||||
);
|
||||
//printf("debug: thread id:%ld id_thread:%ld already KILLED\n", (workers->current_list->value->id), (workers->current_list->value->id_thread) );
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -75,6 +75,25 @@ void* funcEnd(void* arg){
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* funcDepRel(void* arg){
|
||||
LOG("func: begin func release\n");
|
||||
struct dependency_task *dep = (struct dependency_task*)arg;
|
||||
size_t thread_id=pthread_self();
|
||||
LOG("func: threadid=%ld\n", thread_id);
|
||||
usleep(300000);
|
||||
release_dependancy_task(dep);
|
||||
LOG("==============-------------------> func: end func release in thread=%ld\n",thread_id);
|
||||
return NULL;
|
||||
}
|
||||
void* funcDepCall(void* arg){
|
||||
LOG("func: begin func Call dep\n");
|
||||
struct dependency_task *dep = (struct dependency_task*)arg;
|
||||
size_t thread_id=pthread_self();
|
||||
LOG("func call: threadid=%ld\n", thread_id);
|
||||
wait_dependancy_task(dep);
|
||||
LOG("==============-----------------> func: end func call dep in thread=%ld\n",thread_id);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
TEST(threads){
|
||||
@@ -112,6 +131,17 @@ TEST(threads){
|
||||
}
|
||||
LOG("%d tasks created\n",40);
|
||||
|
||||
struct dependency_task * argDep = create_dependency_task();
|
||||
|
||||
struct y_task_t taskCal = {
|
||||
.func=funcDepCall,
|
||||
.arg=argDep,
|
||||
.status=TASK_PENDING,
|
||||
};
|
||||
push_tasQ(argx->tasQ, taskCal);
|
||||
|
||||
LOG(" +++++++++++++++++ task call dep created\n");
|
||||
|
||||
for(int i=4; i<8; ++i){
|
||||
struct y_worker_t *pw= create_ptr_y_WORKER_T(GO_ON_WORKER, i);
|
||||
assign_argWorker_of_ptr_y_WORKER_T(pw, argx, mut_workers);
|
||||
@@ -124,6 +154,15 @@ TEST(threads){
|
||||
//usleep(500);
|
||||
}
|
||||
LOG("another %d workers created\n",4);
|
||||
struct y_task_t taskR = {
|
||||
.func=funcDepRel,
|
||||
.arg=argDep,
|
||||
.status=TASK_PENDING,
|
||||
};
|
||||
push_tasQ(argx->tasQ, taskR);
|
||||
LOG(" +++++++++++++++++ task rel dep created\n");
|
||||
|
||||
//usleep(50);
|
||||
|
||||
kill_all_workers(workers, argx);
|
||||
|
||||
@@ -133,6 +172,7 @@ TEST(threads){
|
||||
pthread_mutex_destroy(mut_workers);
|
||||
free(mut_workers);
|
||||
|
||||
free_dependency_task(argDep);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv){
|
||||
|
||||
Reference in New Issue
Block a user