diff --git a/y_worker_t/Makefile b/y_worker_t/Makefile index 67b2c99..88431f0 100644 --- a/y_worker_t/Makefile +++ b/y_worker_t/Makefile @@ -5,9 +5,11 @@ INCLUDE_DIRS=$(PWD)/include SOCDIR=$(PWD) #$(wildcard $(PWD)/**/include) +TOOLDIR=$(PWD)/../ytools_t YLISTDIR=$(PWD)/../list_t -INCLUDE=-I$(INCLUDE_DIRS) -I$(YLISTDIR)/src + +INCLUDE=-I$(INCLUDE_DIRS) -I$(YLISTDIR)/src -I$(TOOLDIR)/include CFLAGS=-g -lpthread -Wall -Werror -fpic $(INCLUDE) #"-D DEBUG=1" #LDFLAGS= @@ -21,13 +23,14 @@ YTASKSRC_O=$(YTASKSRC:.c=.o) YLISTSRC=$(YLISTDIR)/src/list_t/list_t.c YLISTSRC_O=$(YLISTSRC:.c=.o) +TOOLSRC_O=$(TOOLDIR)/src/tools_t/tools_t.o TOPTARGETS := all clean DEPS=$(YLISTDIR) -OBJ=$(YWORKSRC_O) $(YLISTSRC_O) +OBJ=$(YWORKSRC_O) $(YLISTSRC_O) $(TOPTARGETS): $(DEPS) diff --git a/y_worker_t/include/y_worker_t/y_task_t.h b/y_worker_t/include/y_worker_t/y_task_t.h index a988f5f..bd82c61 100644 --- a/y_worker_t/include/y_worker_t/y_task_t.h +++ b/y_worker_t/include/y_worker_t/y_task_t.h @@ -2,6 +2,7 @@ #define Y_TASK_T_H__C #include +#include #include "list_t/list_t.h" @@ -33,7 +34,7 @@ void push_tasQ(struct y_tasQ *tasQ, struct y_task_t task); struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ); struct argExecTasQ{ - int * go_on; + int go_on; struct y_tasQ *tasQ; struct y_tasQ *historytasQ; }; diff --git a/y_worker_t/include/y_worker_t/y_worker_t.h b/y_worker_t/include/y_worker_t/y_worker_t.h index 03b6384..3a0c410 100644 --- a/y_worker_t/include/y_worker_t/y_worker_t.h +++ b/y_worker_t/include/y_worker_t/y_worker_t.h @@ -11,23 +11,47 @@ #define KILL_WORKER 0 #define GO_ON_WORKER 1 +#define WORKER_ON 1 +#define WORKER_OFF 0 + +struct argWorker; + typedef struct y_worker_t{ int exec; -// pthread_mutex_t *mut_worker; -// pthread_cond_t *cond_worker; + int status; + pthread_mutex_t *mut_worker; + pthread_cond_t *cond_worker; size_t id; - pthread_t thread; + size_t id_thread; + pthread_t *thread; + struct argWorker *arg; } y_WORKER_T; +typedef struct y_worker_t * ptr_y_WORKER_T; -GENERATE_LIST_ALL(y_WORKER_T) +//GENERATE_LIST_ALL(y_WORKER_T) +GENERATE_LIST_ALL(ptr_y_WORKER_T) + +ptr_y_WORKER_T create_ptr_y_WORKER_T(int exec, int id); +void free_ptr_y_WORKER_T(ptr_y_WORKER_T pworker); + +void purge_ptr_y_WORKER_T_in_list(struct main_list_ptr_y_WORKER_T *list_workers); struct argWorker { struct argExecTasQ *argx; - struct list_y_WORKER_T *worker; + struct y_worker_t *pworker; + pthread_mutex_t *mut_workers; }; +void assign_argWorker_of_ptr_y_WORKER_T(ptr_y_WORKER_T pworker, struct argExecTasQ *argx, pthread_mutex_t *mut_worker); + void* execute_work(void* arg); +void kill_all_workers( + struct main_list_ptr_y_WORKER_T * workers, + struct argExecTasQ *argx +); +void wait_and_free_workers(struct main_list_ptr_y_WORKER_T *workers); + #endif /* Y_WORKER_T_H__C */ diff --git a/y_worker_t/src/y_worker_t/y_task_t.c b/y_worker_t/src/y_worker_t/y_task_t.c index 9a2d2b2..ce439fc 100644 --- a/y_worker_t/src/y_worker_t/y_task_t.c +++ b/y_worker_t/src/y_worker_t/y_task_t.c @@ -34,7 +34,7 @@ void push_tasQ(struct y_tasQ *tasQ, struct y_task_t task){ printf("debug: push_tasQ fin\n"); } struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){ - printf("debug: pull_tasQ debut\n"); + printf("debug: pull_tasQ debut id_th:%ld\n",pthread_self()); struct list_y_TASK_T *valueRet = NULL; pthread_mutex_lock(tasQ->mut_tasQ); while(tasQ->list_tasQ->end_list == NULL){ @@ -46,31 +46,37 @@ struct list_y_TASK_T* pull_tasQ(struct y_tasQ *tasQ){ pthread_mutex_unlock(tasQ->mut_tasQ); - printf("debug: pull_tasQ fin : is valueRet NULL ? = %d\n", valueRet == NULL); + printf("debug: pull_tasQ fin : is valueRet NULL ? = %d, id_th:%ld\n", valueRet == NULL, pthread_self()); return valueRet; } struct argExecTasQ * create_argExecTasQ(){ struct argExecTasQ * retasQ = malloc(sizeof(struct argExecTasQ)); - retasQ->go_on = malloc(sizeof(int)); - *(retasQ->go_on)=1; + (retasQ->go_on)=1; retasQ->tasQ = create_y_tasQ(); retasQ->historytasQ = create_y_tasQ(); return retasQ; } void free_argExecTasQ(struct argExecTasQ * arg){ - free(arg->go_on); free_y_tasQ(arg->tasQ); free_y_tasQ(arg->historytasQ); free(arg); } +int check_go_on_tasQ(struct argExecTasQ *argx){ + int ret; + pthread_mutex_lock(argx->tasQ->mut_tasQ); + ret = (argx->go_on); + pthread_mutex_unlock(argx->tasQ->mut_tasQ); + return ret; +} + void * execute_task(void *arg){ struct argExecTasQ *argx = (struct argExecTasQ *)arg; struct y_tasQ *tasQ = argx->tasQ; struct y_tasQ *historytasQ = argx->historytasQ; struct list_y_TASK_T *l_task=NULL; - while(*(argx->go_on)){ + while(check_go_on_tasQ(argx)){ l_task = pull_tasQ(tasQ); printf("debug: is l_task NULL? = %d\n", l_task==NULL); @@ -86,6 +92,9 @@ void * execute_task(void *arg){ } + printf("debug: -------------------> exit task exec \n"); + usleep(1000); + return NULL; } diff --git a/y_worker_t/src/y_worker_t/y_worker_t.c b/y_worker_t/src/y_worker_t/y_worker_t.c index 8776acb..0f75274 100644 --- a/y_worker_t/src/y_worker_t/y_worker_t.c +++ b/y_worker_t/src/y_worker_t/y_worker_t.c @@ -1,18 +1,147 @@ #include "y_worker_t/y_worker_t.h" -GEN_LIST_ALL(y_WORKER_T) +//GEN_LIST_ALL(y_WORKER_T) + +GEN_LIST_ALL(ptr_y_WORKER_T) + +ptr_y_WORKER_T create_ptr_y_WORKER_T(int exec, int id){ + ptr_y_WORKER_T pworker = malloc(sizeof(y_WORKER_T)); + pworker->exec=exec; + pworker->mut_worker = malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(pworker->mut_worker, NULL); + pworker->cond_worker = malloc(sizeof(pthread_cond_t)); + pthread_cond_init(pworker->cond_worker, NULL); + pworker->id=id; + pworker->thread = malloc(sizeof(pthread_t)); + + pworker->arg = NULL; + + return pworker; +} +void free_ptr_y_WORKER_T(ptr_y_WORKER_T pworker){ + //ptr_y_WORKER_T pworker = *p_worker; + pthread_mutex_destroy(pworker->mut_worker); + free(pworker->mut_worker); + pthread_cond_destroy(pworker->cond_worker); + free(pworker->cond_worker); + free(pworker->thread); + if(pworker->arg!=NULL) free(pworker->arg); + free(pworker); +} + +void purge_ptr_y_WORKER_T_in_list(struct main_list_ptr_y_WORKER_T *list_workers){ + for(move_current_to_begin_list_ptr_y_WORKER_T(list_workers); list_workers->current_list; increment_list_ptr_y_WORKER_T(list_workers)){ + free_ptr_y_WORKER_T(list_workers->current_list->value); + } +} + +void assign_argWorker_of_ptr_y_WORKER_T(ptr_y_WORKER_T pworker, struct argExecTasQ *argx, pthread_mutex_t *mut_workers){ + pworker->arg = malloc(sizeof( struct argWorker)); + pworker->arg->argx = argx; + pworker->arg->pworker = pworker; + pworker->arg->mut_workers = mut_workers; + +} void* execute_work(void* arg){ struct argWorker *argw = (struct argWorker*)arg; struct argExecTasQ *argx=argw->argx; - struct list_y_WORKER_T * worker = argw->worker; +// pthread_mutex_t *mut_workers=argw->mut_workers; + struct y_worker_t * pworker = argw->pworker; + size_t id_thread=pthread_self(); + int exec; + pthread_mutex_lock(pworker->mut_worker); + exec=pworker->exec; + pworker->status=WORKER_ON; + pworker->id_thread=id_thread; + pthread_mutex_unlock(pworker->mut_worker); + printf("debug: ############################ execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); do{ - printf("debug: execute_task call : thread_id:%ld, self=%ld \n",worker->value.id,id_thread); + printf("debug: execute_task call : thread_id:%ld, self=%ld \n",pworker->id,id_thread); execute_task((void*)argx); - printf("debug: execute_task end, worker exec=%d \n",worker->value.exec); - }while(worker->value.exec); + printf("debug: <<<<>>>> execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); + pthread_mutex_lock(pworker->mut_worker); + exec=pworker->exec; + pthread_mutex_unlock(pworker->mut_worker); + printf("debug: execute_task end, worker exec=%d id:%ld self:%ld \n",exec,pworker->id, pworker->id_thread); + }while(exec); + pthread_mutex_lock(pworker->mut_worker); + pworker->status=WORKER_OFF; + printf("debug: =========>>> execute_task end, worker OFF =%d, id=%ld self:%ld\n",pworker->status, pworker->id, pworker->id_thread); + pthread_mutex_unlock(pworker->mut_worker); + pthread_cond_signal(pworker->cond_worker); //free_y_worker_t(worker); +// usleep(1000); + return NULL; +} + +void kill_all_workers( + struct main_list_ptr_y_WORKER_T * workers, + struct argExecTasQ *argx +){ + for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ + pthread_mutex_lock(workers->current_list->value->mut_worker); + workers->current_list->value->exec=KILL_WORKER; + pthread_mutex_unlock(workers->current_list->value->mut_worker); + } + + pthread_mutex_lock(argx->tasQ->mut_tasQ); + (argx->go_on)=0; + pthread_mutex_unlock(argx->tasQ->mut_tasQ); + + + for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ + //if(workers->current_list->value->status == WORKER_ON) + { + struct y_task_t task = { + .func=NULL, + .arg=argx, + .status=TASK_DONE, + }; + push_tasQ(argx->tasQ, task); + } + } + +} + + +void wait_and_free_workers(struct main_list_ptr_y_WORKER_T *workers){ + int reterror; + for(move_current_to_begin_list_ptr_y_WORKER_T(workers); workers->current_list;increment_list_ptr_y_WORKER_T(workers)){ + pthread_mutex_lock(workers->current_list->value->mut_worker); + if((workers->current_list->value->status) == WORKER_ON){ + while(workers->current_list->value->status == WORKER_ON){ + pthread_cond_wait(workers->current_list->value->cond_worker, workers->current_list->value->mut_worker); + } + reterror = pthread_join(*(workers->current_list->value->thread) ,NULL); + if(0!=reterror){ + printf("debug: error %d pthread_join thread %ld\n",reterror,(workers->current_list->value->id)); + } + printf("debug: JOIN done, thread id:%ld id_thread:%ld\n", + (workers->current_list->value->id), + (workers->current_list->value->id_thread) + ); + }else{ + printf("debug: thread id:%ld id_thread:%ld already KILLED\n", + (workers->current_list->value->id), + (workers->current_list->value->id_thread) + ); + + } + + workers->current_list->value->status=WORKER_OFF; + pthread_mutex_unlock(workers->current_list->value->mut_worker); + } + +// free_argExecTasQ(argx); + purge_ptr_y_WORKER_T_in_list(workers); + free_all_var_list_ptr_y_WORKER_T(workers); + +// pthread_mutex_destroy(mut_workers); +// free(mut_workers); + + } diff --git a/y_worker_t/test/Makefile b/y_worker_t/test/Makefile index f8f3aaf..2cf363d 100644 --- a/y_worker_t/test/Makefile +++ b/y_worker_t/test/Makefile @@ -10,7 +10,7 @@ YLISTDIR=$(PWD)/../../list_t ROOT_DIR=$(PWD)/.. INCLUDE_DIR=$(ROOT_DIR)/include CFLAGS=-I$(INCLUDE_DIR) -I$(YTESTDIR)/include_ytest/include -I$(YLISTDIR)/src -LDFLAGS=-L$(YTESTDIR) -lytest -lpthread -lm -lOpenCL +LDFLAGS=-L$(YTESTDIR) -lytest -lpthread -lm -lOpenCL -Wall -Werror #SRC_DIR=$(ROOT_DIR)/src #SRC=$(wildcard */*/*.c) diff --git a/y_worker_t/test/is_good.c b/y_worker_t/test/is_good.c index 77980ae..33394aa 100644 --- a/y_worker_t/test/is_good.c +++ b/y_worker_t/test/is_good.c @@ -25,8 +25,9 @@ #define VALGRIND_ 1 +#if 0 TEST(first){ - struct y_tasQ tasQ0 ; + //struct y_tasQ tasQ0 ; struct y_tasQ *tasQ = create_y_tasQ(); free_y_tasQ(tasQ); @@ -48,44 +49,58 @@ TEST(worker){ } +#endif void* funcPrintSelf(void* arg){ - LOG("begin fun call\n"); + LOG("func: begin func call\n"); struct argExecTasQ *argx=(struct argExecTasQ*)arg; - int randomm=rand()%1000; + static long int count = 0; //randomm=rand()%1000; size_t thread_id=pthread_self(); - LOG("my status=%d, idfunc=%d; threadid=%ld\n",*(argx->go_on), randomm, thread_id); + LOG("func: my status=%d, func number=%ld; threadid=%ld\n",(argx->go_on), count, thread_id); usleep(200000); - LOG("end func%d; in thread=%ld\n",randomm,thread_id); + LOG("func: end func %ld; in thread=%ld\n",count,thread_id); + ++count; + return NULL; } +void* funcEnd(void* arg){ + LOG("funcEnd: begin func End\n"); + struct argExecTasQ *argx=(struct argExecTasQ*)arg; + static long int count = 0; //randomm=rand()%1000; + size_t thread_id=pthread_self(); + LOG("funcEnd: my status=%d, func number=%ld; threadid=%ld\n",(argx->go_on), count, thread_id); + usleep(20000); + LOG("funcEnd: end func %ld; in thread=%ld\n",count,thread_id); + ++count; + return NULL; +} + + + TEST(threads){ srand(time(NULL)); - struct main_list_y_WORKER_T * workers = create_var_list_y_WORKER_T(); + struct main_list_ptr_y_WORKER_T * workers = create_var_list_ptr_y_WORKER_T(); struct argExecTasQ *argx = create_argExecTasQ(); - + pthread_mutex_t *mut_workers = malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(mut_workers, NULL); for(int i=0; i<4; ++i){ - struct y_worker_t w={.exec=GO_ON_WORKER, .id=i}; - struct list_y_WORKER_T * liwo=malloc(sizeof(struct list_y_WORKER_T)); - liwo->value=w; - //liwo->preview=NULL; - liwo->next=NULL; - append_list_y_WORKER_T(workers, liwo); - //push_back_list_y_WORKER_T(workers, w); - struct argWorker argw; - argw.argx = argx; - argw.worker=workers->end_list; - LOG("%d workers %ld created\n",4,argw.worker->value.id); - pthread_create(&(w.thread),NULL,execute_work,(void*)&argw); -//usleep(5000); + struct y_worker_t *pw= create_ptr_y_WORKER_T(GO_ON_WORKER, i); + assign_argWorker_of_ptr_y_WORKER_T(pw, argx, mut_workers); + pthread_mutex_lock(mut_workers); + //append_list_ptr_y_WORKER_T(workers, li_pw); + push_back_list_ptr_y_WORKER_T(workers, pw); + pthread_mutex_unlock(mut_workers); + LOG("%d workers %ld created\n",4,(pw->arg->pworker->id)); + pthread_create(pw->thread,NULL,execute_work,(void*)(pw->arg)); +//usleep(500); } -usleep(500000); +//usleep(500000); - for(int i=0; i<10;++i) + for(int i=0; i<40;++i) { struct y_task_t task = { .func=funcPrintSelf, @@ -93,58 +108,30 @@ usleep(500000); .status=TASK_PENDING, }; push_tasQ(argx->tasQ, task); - usleep(10000); +// usleep(10000); } LOG("%d tasks created\n",40); - for(int i=4; i<8; ++i){ - struct y_worker_t w={.exec=GO_ON_WORKER, .id=i}; - struct list_y_WORKER_T * liwo=malloc(sizeof(struct list_y_WORKER_T)); - liwo->value=w; - //liwo->preview=NULL; - liwo->next=NULL; - append_list_y_WORKER_T(workers, liwo); - //push_back_list_y_WORKER_T(workers, w); - struct argWorker argw; - argw.argx = argx; - argw.worker=workers->end_list; - pthread_create(&(w.thread),NULL,execute_work,(void*)&argw); -usleep(5000); + for(int i=4; i<8; ++i){ + struct y_worker_t *pw= create_ptr_y_WORKER_T(GO_ON_WORKER, i); + assign_argWorker_of_ptr_y_WORKER_T(pw, argx, mut_workers); + pthread_mutex_lock(mut_workers); + //append_list_ptr_y_WORKER_T(workers, li_pw); + push_back_list_ptr_y_WORKER_T(workers, pw); + pthread_mutex_unlock(mut_workers); + LOG("%d workers %ld created\n",4,(pw->arg->pworker->id)); + pthread_create(pw->thread,NULL,execute_work,(void*)(pw->arg)); +//usleep(500); } - LOG("another %d workers created\n",4); + LOG("another %d workers created\n",4); + kill_all_workers(workers, argx); + wait_and_free_workers(workers); - - - -// usleep(400000); - for(move_current_to_begin_list_y_WORKER_T(workers); workers->current_list;increment_list_y_WORKER_T(workers)){ - workers->current_list->value.exec=KILL_WORKER; - } - - *(argx->go_on)=0; - - for(move_current_to_begin_list_y_WORKER_T(workers); workers->current_list;increment_list_y_WORKER_T(workers)){ - struct y_task_t task = { - .func=NULL, - .arg=argx, - .status=TASK_DONE, - }; - push_tasQ(argx->tasQ, task); - - } - - - for(move_current_to_begin_list_y_WORKER_T(workers); workers->current_list;increment_list_y_WORKER_T(workers)){ - pthread_join(workers->current_list->value.thread ,NULL); - LOG("debug: JOIN donei, thread id:%ld\n",workers->current_list->value.id); - } - -// usleep(5000000); free_argExecTasQ(argx); - free_all_var_list_y_WORKER_T(workers); - + pthread_mutex_destroy(mut_workers); + free(mut_workers); }