From a0c243b4a3fecb0a3b725ed634b0faf8b63bd01b Mon Sep 17 00:00:00 2001 From: Bin Qian Date: Wed, 29 Aug 2018 10:33:13 -0400 Subject: [PATCH] Create worker thread This change list is to create worker thread mechanism for the upcoming one time tasks to run in separated thread. Story: 2003577 Task: 24898 Change-Id: I5378b80763b104bcf0af95cb083de0cf61463788 Signed-off-by: Bin Qian --- service-mgmt/sm-1.0.0/src/Makefile | 1 + service-mgmt/sm-1.0.0/src/sm_process.c | 28 ++- .../sm-1.0.0/src/sm_worker_thread.cpp | 174 ++++++++++++++++++ service-mgmt/sm-1.0.0/src/sm_worker_thread.h | 81 ++++++++ 4 files changed, 278 insertions(+), 6 deletions(-) create mode 100644 service-mgmt/sm-1.0.0/src/sm_worker_thread.cpp create mode 100644 service-mgmt/sm-1.0.0/src/sm_worker_thread.h diff --git a/service-mgmt/sm-1.0.0/src/Makefile b/service-mgmt/sm-1.0.0/src/Makefile index 243b52cd..fa36a522 100644 --- a/service-mgmt/sm-1.0.0/src/Makefile +++ b/service-mgmt/sm-1.0.0/src/Makefile @@ -104,6 +104,7 @@ SRCS+=fm_api_wrapper.c SRCS+=sm_failover.c SRCS+=sm_failover_thread.c SRCS+=sm_swact_state.c +SRCS+=sm_worker_thread.cpp SRCS+=sm_task_affining_thread.c SRCS+=sm_node_swact_monitor.cpp SRCS+=sm_service_domain_interface_not_in_use_state.c diff --git a/service-mgmt/sm-1.0.0/src/sm_process.c b/service-mgmt/sm-1.0.0/src/sm_process.c index 5a17a5ee..36cc40c0 100644 --- a/service-mgmt/sm-1.0.0/src/sm_process.c +++ b/service-mgmt/sm-1.0.0/src/sm_process.c @@ -53,6 +53,7 @@ #include "sm_failover.h" #include "sm_failover_thread.h" #include "sm_task_affining_thread.h" +#include "sm_worker_thread.h" #define SM_PROCESS_DB_CHECKPOINT_INTERVAL_IN_MS 30000 #define SM_PROCESS_TICK_INTERVAL_IN_MS 200 @@ -192,6 +193,14 @@ static SmErrorT sm_process_initialize( void ) return( SM_FAILED ); } + error = SmWorkerThread::initialize(); + if( SM_OKAY != error ) + { + DPRINTFE( "Failed to initialize worker thread, error=%s.", + sm_error_str( error ) ); + return( SM_FAILED ); + } + error = sm_msg_initialize(); if( SM_OKAY != error ) { @@ -317,7 +326,7 @@ static SmErrorT sm_process_initialize( void ) DPRINTFE( "Failed to initialize service heartbeat api module, " "error=%s.", sm_error_str( error ) ); return( SM_FAILED ); - } + } error = sm_service_heartbeat_thread_start(); if( SM_OKAY != error ) @@ -326,7 +335,7 @@ static SmErrorT sm_process_initialize( void ) sm_error_str(error) ); return( error ); } - + error = sm_main_event_handler_initialize(); if( SM_OKAY != error ) { @@ -360,7 +369,7 @@ static SmErrorT sm_process_initialize( void ) DPRINTFE( "Failed to start the task affining thread, error=%s.", sm_error_str( error ) ); return( SM_FAILED ); - } + } } return( SM_OKAY ); @@ -418,7 +427,7 @@ static SmErrorT sm_process_finalize( void ) { DPRINTFE( "Failed to finalize service heartbeat api module, " "error=%s.", sm_error_str( error ) ); - } + } error = sm_service_api_finalize(); if( SM_OKAY != error ) @@ -511,6 +520,13 @@ static SmErrorT sm_process_finalize( void ) sm_error_str( error ) ); } + error = SmWorkerThread::finalize(); + if( SM_OKAY != error ) + { + DPRINTFE( "Failed to finalize worker thread, error=%s.", + sm_error_str( error ) ); + } + error = sm_hw_finalize(); if( SM_OKAY != error ) { @@ -720,7 +736,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] ) error = sm_process_initialize(); if( SM_OKAY != error ) { - DPRINTFE( "Failed initialize process, error=%s.", + DPRINTFE( "Failed initialize process, error=%s.", sm_error_str(error) ); return( error ); } @@ -843,7 +859,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] ) error = sm_process_finalize(); if( SM_OKAY != error ) { - DPRINTFE( "Failed finalize process, error=%s.", + DPRINTFE( "Failed finalize process, error=%s.", sm_error_str(error) ); } diff --git a/service-mgmt/sm-1.0.0/src/sm_worker_thread.cpp b/service-mgmt/sm-1.0.0/src/sm_worker_thread.cpp new file mode 100644 index 00000000..8d5daeb0 --- /dev/null +++ b/service-mgmt/sm-1.0.0/src/sm_worker_thread.cpp @@ -0,0 +1,174 @@ +// +// Copyright (c) 2018 Wind River Systems, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +#include "sm_worker_thread.h" +#include +#include +#include +#include +#include "sm_util_types.h" +#include "sm_debug.h" + +SmWorkerThread SmWorkerThread::_the_worker; + +// **************************************************************************** +// SmWorkerThread::get_worker get the thread singleton object +// **************************************************************************** +SmWorkerThread& SmWorkerThread::get_worker() +{ + return SmWorkerThread::_the_worker; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::initialize initialize the worker thread +// **************************************************************************** +SmErrorT SmWorkerThread::initialize() +{ + SmWorkerThread::get_worker().go(); + return SM_OKAY; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::finalize finalize the worker thread +// **************************************************************************** +SmErrorT SmWorkerThread::finalize() +{ + SmWorkerThread::get_worker().stop(); + return SM_OKAY; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::SmWorkerThread +// **************************************************************************** +SmWorkerThread::SmWorkerThread() : _priority_queue(), _regular_queue() +{ + this->_mutex = PTHREAD_MUTEX_INITIALIZER; + this->_goon = true; + this->_thread_created = false; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::~SmWorkerThread +// **************************************************************************** +SmWorkerThread::~SmWorkerThread() +{ + sem_destroy(&this->_sem); +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::thread_helper helper function for the thread entry +// **************************************************************************** +void* SmWorkerThread::thread_helper(SmWorkerThread* workerThread) +{ + workerThread->thread_run(); + return 0; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::go prepare and start the thread +// **************************************************************************** +SmErrorT SmWorkerThread::go() +{ + if(this->_thread_created) + { + DPRINTFE("Worker thread has already been created"); + return SM_FAILED; + } + + this->_thread_created = true; + if( 0 != sem_init(&this->_sem, 0, MAX_QUEUED_ACTIONS) ) + { + DPRINTFE("Cannot init semaphore"); + return SM_FAILED; + } + + int result = pthread_create( &this->_thread, NULL, + (void*(*)(void*))SmWorkerThread::thread_helper, (void*)this ); + if( 0 != result ) + { + DPRINTFE("Failed to create thread. Error %d", errno); + return SM_FAILED; + } + return SM_OKAY; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::stop stop the worker thread +// **************************************************************************** +SmErrorT SmWorkerThread::stop() +{ + this->_goon = false; + void* result = NULL; + int res = pthread_join(this->_thread, &result); + + if(0 != res) + { + DPRINTFE("pthread_join failed. Error %d", res); + } + if(NULL != result) + { + free(result); + } + return SM_OKAY; +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::add_action add a regular priority action +// **************************************************************************** +void SmWorkerThread::add_action(SmAction& action) +{ + mutex_holder(&this->_mutex); + this->_regular_queue.push(&action); +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::add_priority_action add a high priority action +// **************************************************************************** +void SmWorkerThread::add_priority_action(SmAction& action) +{ + mutex_holder(&this->_mutex); + this->_priority_queue.push(&action); +} +// **************************************************************************** + +// **************************************************************************** +// SmWorkerThread::thread_run main loop of the worker thread +// **************************************************************************** +void SmWorkerThread::thread_run() +{ + while(this->_goon) + { + if(0 == sem_trywait(&this->_sem)) + { + SmAction* action = NULL; + if(!this->_priority_queue.empty()) + { + action = this->_priority_queue.front(); + this->_priority_queue.pop(); + }else if(!this->_regular_queue.empty()) + { + action = this->_regular_queue.front(); + this->_regular_queue.pop(); + } + if( NULL != action ) + { + action->action(); + } + }else if(EAGAIN != errno) + { + DPRINTFE("Semaphore wait failed. Error %d", errno); + } + } +} +// **************************************************************************** diff --git a/service-mgmt/sm-1.0.0/src/sm_worker_thread.h b/service-mgmt/sm-1.0.0/src/sm_worker_thread.h new file mode 100644 index 00000000..49ed2547 --- /dev/null +++ b/service-mgmt/sm-1.0.0/src/sm_worker_thread.h @@ -0,0 +1,81 @@ +// +// Copyright (c) 2018 Wind River Systems, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +#ifndef __SM_WORKER_THREAD_H__ +#define __SM_WORKER_THREAD_H__ + +#include "sm_types.h" +#include +#include +#include + +#define MAX_QUEUED_ACTIONS 24 +// **************************************************************************** +// SmAction interface, action to process in a worker thread +// **************************************************************************** +class SmAction +{ + public: + virtual void action() = 0; +}; + +typedef std::queue SmActionQueueT; +class SmWorkerThread; + +// **************************************************************************** +// SmWorkerThread work thread class +// **************************************************************************** +class SmWorkerThread +{ + public: + SmWorkerThread(); + virtual ~SmWorkerThread(); + + /* Add an action to the normal priority FCFS queue. + A normal priority action will be scheduled after all + high priority actions. + */ + void add_action(SmAction& action); + /* + Add an action to high priority FCFS queue. + A high priority action is nonpreemptive. It will + be scheduled after the current action. + */ + void add_priority_action(SmAction& action); + + + // retrieve singleton object + static SmWorkerThread& get_worker(); + // initialize worker thread + static SmErrorT initialize(); + // stop worker thread + static SmErrorT finalize(); + + private: + pthread_mutex_t _mutex; + SmActionQueueT _priority_queue; + SmActionQueueT _regular_queue; + sem_t _sem; + pthread_t _thread; + bool _thread_created; + bool _goon; + + // create worker thread and run tasks + SmErrorT go(); + // stop processing tasks and stop the worker thread + SmErrorT stop(); + + // run the thread + void thread_run(); + /* help function to provide function pointer for + creating the worker thread + */ + static void* thread_helper(SmWorkerThread* me); + + static SmWorkerThread _the_worker; + +}; + +#endif //__SM_WORKER_THREAD_H__ \ No newline at end of file