Create worker thread

This change list is to create worker thread mechanism for the
upcoming one time tasks to run in separated thread.

Story: 2003577
Task: 24898
Change-Id: I5378b80763b104bcf0af95cb083de0cf61463788
Signed-off-by: Bin Qian <bin.qian@windriver.com>
This commit is contained in:
Bin Qian 2018-08-29 10:33:13 -04:00
parent 41a543c346
commit a0c243b4a3
4 changed files with 278 additions and 6 deletions

View File

@ -104,6 +104,7 @@ SRCS+=fm_api_wrapper.c
SRCS+=sm_failover.c
SRCS+=sm_failover_thread.c
SRCS+=sm_swact_state.c
SRCS+=sm_worker_thread.cpp
SRCS+=sm_task_affining_thread.c
SRCS+=sm_node_swact_monitor.cpp
SRCS+=sm_service_domain_interface_not_in_use_state.c

View File

@ -53,6 +53,7 @@
#include "sm_failover.h"
#include "sm_failover_thread.h"
#include "sm_task_affining_thread.h"
#include "sm_worker_thread.h"
#define SM_PROCESS_DB_CHECKPOINT_INTERVAL_IN_MS 30000
#define SM_PROCESS_TICK_INTERVAL_IN_MS 200
@ -192,6 +193,14 @@ static SmErrorT sm_process_initialize( void )
return( SM_FAILED );
}
error = SmWorkerThread::initialize();
if( SM_OKAY != error )
{
DPRINTFE( "Failed to initialize worker thread, error=%s.",
sm_error_str( error ) );
return( SM_FAILED );
}
error = sm_msg_initialize();
if( SM_OKAY != error )
{
@ -317,7 +326,7 @@ static SmErrorT sm_process_initialize( void )
DPRINTFE( "Failed to initialize service heartbeat api module, "
"error=%s.", sm_error_str( error ) );
return( SM_FAILED );
}
}
error = sm_service_heartbeat_thread_start();
if( SM_OKAY != error )
@ -326,7 +335,7 @@ static SmErrorT sm_process_initialize( void )
sm_error_str(error) );
return( error );
}
error = sm_main_event_handler_initialize();
if( SM_OKAY != error )
{
@ -360,7 +369,7 @@ static SmErrorT sm_process_initialize( void )
DPRINTFE( "Failed to start the task affining thread, error=%s.",
sm_error_str( error ) );
return( SM_FAILED );
}
}
}
return( SM_OKAY );
@ -418,7 +427,7 @@ static SmErrorT sm_process_finalize( void )
{
DPRINTFE( "Failed to finalize service heartbeat api module, "
"error=%s.", sm_error_str( error ) );
}
}
error = sm_service_api_finalize();
if( SM_OKAY != error )
@ -511,6 +520,13 @@ static SmErrorT sm_process_finalize( void )
sm_error_str( error ) );
}
error = SmWorkerThread::finalize();
if( SM_OKAY != error )
{
DPRINTFE( "Failed to finalize worker thread, error=%s.",
sm_error_str( error ) );
}
error = sm_hw_finalize();
if( SM_OKAY != error )
{
@ -720,7 +736,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] )
error = sm_process_initialize();
if( SM_OKAY != error )
{
DPRINTFE( "Failed initialize process, error=%s.",
DPRINTFE( "Failed initialize process, error=%s.",
sm_error_str(error) );
return( error );
}
@ -843,7 +859,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] )
error = sm_process_finalize();
if( SM_OKAY != error )
{
DPRINTFE( "Failed finalize process, error=%s.",
DPRINTFE( "Failed finalize process, error=%s.",
sm_error_str(error) );
}

View File

@ -0,0 +1,174 @@
//
// Copyright (c) 2018 Wind River Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
#include "sm_worker_thread.h"
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include "sm_util_types.h"
#include "sm_debug.h"
SmWorkerThread SmWorkerThread::_the_worker;
// ****************************************************************************
// SmWorkerThread::get_worker get the thread singleton object
// ****************************************************************************
SmWorkerThread& SmWorkerThread::get_worker()
{
return SmWorkerThread::_the_worker;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::initialize initialize the worker thread
// ****************************************************************************
SmErrorT SmWorkerThread::initialize()
{
SmWorkerThread::get_worker().go();
return SM_OKAY;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::finalize finalize the worker thread
// ****************************************************************************
SmErrorT SmWorkerThread::finalize()
{
SmWorkerThread::get_worker().stop();
return SM_OKAY;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::SmWorkerThread
// ****************************************************************************
SmWorkerThread::SmWorkerThread() : _priority_queue(), _regular_queue()
{
this->_mutex = PTHREAD_MUTEX_INITIALIZER;
this->_goon = true;
this->_thread_created = false;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::~SmWorkerThread
// ****************************************************************************
SmWorkerThread::~SmWorkerThread()
{
sem_destroy(&this->_sem);
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::thread_helper helper function for the thread entry
// ****************************************************************************
void* SmWorkerThread::thread_helper(SmWorkerThread* workerThread)
{
workerThread->thread_run();
return 0;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::go prepare and start the thread
// ****************************************************************************
SmErrorT SmWorkerThread::go()
{
if(this->_thread_created)
{
DPRINTFE("Worker thread has already been created");
return SM_FAILED;
}
this->_thread_created = true;
if( 0 != sem_init(&this->_sem, 0, MAX_QUEUED_ACTIONS) )
{
DPRINTFE("Cannot init semaphore");
return SM_FAILED;
}
int result = pthread_create( &this->_thread, NULL,
(void*(*)(void*))SmWorkerThread::thread_helper, (void*)this );
if( 0 != result )
{
DPRINTFE("Failed to create thread. Error %d", errno);
return SM_FAILED;
}
return SM_OKAY;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::stop stop the worker thread
// ****************************************************************************
SmErrorT SmWorkerThread::stop()
{
this->_goon = false;
void* result = NULL;
int res = pthread_join(this->_thread, &result);
if(0 != res)
{
DPRINTFE("pthread_join failed. Error %d", res);
}
if(NULL != result)
{
free(result);
}
return SM_OKAY;
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::add_action add a regular priority action
// ****************************************************************************
void SmWorkerThread::add_action(SmAction& action)
{
mutex_holder(&this->_mutex);
this->_regular_queue.push(&action);
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::add_priority_action add a high priority action
// ****************************************************************************
void SmWorkerThread::add_priority_action(SmAction& action)
{
mutex_holder(&this->_mutex);
this->_priority_queue.push(&action);
}
// ****************************************************************************
// ****************************************************************************
// SmWorkerThread::thread_run main loop of the worker thread
// ****************************************************************************
void SmWorkerThread::thread_run()
{
while(this->_goon)
{
if(0 == sem_trywait(&this->_sem))
{
SmAction* action = NULL;
if(!this->_priority_queue.empty())
{
action = this->_priority_queue.front();
this->_priority_queue.pop();
}else if(!this->_regular_queue.empty())
{
action = this->_regular_queue.front();
this->_regular_queue.pop();
}
if( NULL != action )
{
action->action();
}
}else if(EAGAIN != errno)
{
DPRINTFE("Semaphore wait failed. Error %d", errno);
}
}
}
// ****************************************************************************

View File

@ -0,0 +1,81 @@
//
// Copyright (c) 2018 Wind River Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
#ifndef __SM_WORKER_THREAD_H__
#define __SM_WORKER_THREAD_H__
#include "sm_types.h"
#include <queue>
#include <pthread.h>
#include <semaphore.h>
#define MAX_QUEUED_ACTIONS 24
// ****************************************************************************
// SmAction interface, action to process in a worker thread
// ****************************************************************************
class SmAction
{
public:
virtual void action() = 0;
};
typedef std::queue<SmAction*> SmActionQueueT;
class SmWorkerThread;
// ****************************************************************************
// SmWorkerThread work thread class
// ****************************************************************************
class SmWorkerThread
{
public:
SmWorkerThread();
virtual ~SmWorkerThread();
/* Add an action to the normal priority FCFS queue.
A normal priority action will be scheduled after all
high priority actions.
*/
void add_action(SmAction& action);
/*
Add an action to high priority FCFS queue.
A high priority action is nonpreemptive. It will
be scheduled after the current action.
*/
void add_priority_action(SmAction& action);
// retrieve singleton object
static SmWorkerThread& get_worker();
// initialize worker thread
static SmErrorT initialize();
// stop worker thread
static SmErrorT finalize();
private:
pthread_mutex_t _mutex;
SmActionQueueT _priority_queue;
SmActionQueueT _regular_queue;
sem_t _sem;
pthread_t _thread;
bool _thread_created;
bool _goon;
// create worker thread and run tasks
SmErrorT go();
// stop processing tasks and stop the worker thread
SmErrorT stop();
// run the thread
void thread_run();
/* help function to provide function pointer for
creating the worker thread
*/
static void* thread_helper(SmWorkerThread* me);
static SmWorkerThread _the_worker;
};
#endif //__SM_WORKER_THREAD_H__