Merge "Avoid potential blocking of heartbeat thread"

This commit is contained in:
Zuul 2024-02-14 21:35:21 +00:00 committed by Gerrit Code Review
commit 9367d45672
2 changed files with 45 additions and 9 deletions

View File

@ -5,6 +5,7 @@
// //
#include "sm_cluster_hbs_info_msg.h" #include "sm_cluster_hbs_info_msg.h"
#include <arpa/inet.h> #include <arpa/inet.h>
#include <atomic>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <net/if.h> #include <net/if.h>
@ -128,6 +129,8 @@ int SmClusterHbsInfoMsg::this_controller_index = -1;
int SmClusterHbsInfoMsg::peer_controller_index = -1; int SmClusterHbsInfoMsg::peer_controller_index = -1;
char SmClusterHbsInfoMsg::server_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1] = {0}; char SmClusterHbsInfoMsg::server_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1] = {0};
char SmClusterHbsInfoMsg::client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1] = {0}; char SmClusterHbsInfoMsg::client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1] = {0};
std::atomic_flag SmClusterHbsInfoMsg::_sending_query = ATOMIC_FLAG_INIT;
struct sockaddr_in SmClusterHbsInfoMsg::sock_addr = {0};
const SmClusterHbsStateT& SmClusterHbsInfoMsg::get_current_state() const SmClusterHbsStateT& SmClusterHbsInfoMsg::get_current_state()
{ {
@ -300,7 +303,12 @@ void SmClusterHbsInfoMsg::_cluster_hbs_info_msg_received( int selobj, int64_t us
} }
} }
SmErrorT SmClusterHbsInfoMsg::_get_address(struct sockaddr_in* addr) struct sockaddr_in* SmClusterHbsInfoMsg::_get_address()
{
return &sock_addr;
}
SmErrorT SmClusterHbsInfoMsg::set_address()
{ {
struct addrinfo *address = NULL; struct addrinfo *address = NULL;
struct addrinfo hints; struct addrinfo hints;
@ -320,7 +328,7 @@ SmErrorT SmClusterHbsInfoMsg::_get_address(struct sockaddr_in* addr)
return SM_FAILED; return SM_FAILED;
} }
memcpy(addr, address->ai_addr, sizeof(struct sockaddr_in)); memcpy(&sock_addr, address->ai_addr, sizeof(struct sockaddr_in));
freeaddrinfo(address); freeaddrinfo(address);
return SM_OKAY; return SM_OKAY;
} }
@ -335,10 +343,13 @@ static SmSimpleAction _query_hbs_cluster_info_action("send hbs-cluster query", s
// **************************************************************************** // ****************************************************************************
// SmClusterHbsInfoMsg::cluster_hbs_info_query - // SmClusterHbsInfoMsg::cluster_hbs_info_query -
// trigger a query of cluster hbs info. // trigger a query of cluster hbs info.
// by providing a none-null callback, it requires hbsAgent to respond the query
// otherwise, it just sends to hbsAgent as an alive pulse.
// return true if request sent successfully, false otherwise. // return true if request sent successfully, false otherwise.
// ======================== // ========================
bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callback callback, bool alive_pulse) bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callback callback)
{ {
bool alive_pulse = (NULL == callback);
int port = atoi(server_port); int port = atoi(server_port);
if(0 > port) if(0 > port)
{ {
@ -346,11 +357,21 @@ bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callbac
return false; return false;
} }
bool already_sending = false;
already_sending = _sending_query.test_and_set();
if (already_sending && alive_pulse)
{
// an alive pulse happens when a query is being sent, return immediately.
// alive pulse is time interval based, so don't wait as long as one is sent.
return true;
}
char query[request_size]; char query[request_size];
unsigned short reqid; unsigned short reqid;
struct timespec ts; struct timespec ts;
{ {
mutex_holder holder(&sm_cluster_hbs_mutex); mutex_holder holder(&sm_cluster_hbs_mutex);
already_sending = _sending_query.test_and_set();
if(alive_pulse) if(alive_pulse)
{ {
reqid = 0; reqid = 0;
@ -367,10 +388,12 @@ bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callbac
} }
} }
struct sockaddr_in addr; struct sockaddr_in *addr;
if(SM_OKAY != _get_address(&addr)) addr = _get_address();
if (NULL == addr)
{ {
DPRINTFE("Failed to get address"); DPRINTFE("Failed to get address");
_sending_query.clear();
return false; return false;
} }
@ -380,22 +403,24 @@ bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callbac
{ {
DPRINTFI("send hbs cluster query [%d]", reqid); DPRINTFI("send hbs cluster query [%d]", reqid);
} }
if(0 > sendto(_sock, query, msg_size, 0, (sockaddr*)&addr, sizeof(addr))) if(0 > sendto(_sock, query, msg_size, 0, (sockaddr*)addr, sizeof(*addr)))
{ {
DPRINTFE("Failed to send msg. Error %s", strerror(errno)); DPRINTFE("Failed to send msg. Error %s", strerror(errno));
_sending_query.clear();
return false; return false;
} }
if(NULL != callback) if(NULL != callback)
{ {
_callbacks.push_back(callback); _callbacks.push_back(callback);
} }
_sending_query.clear();
} }
return true; return true;
} }
bool SmClusterHbsInfoMsg::send_alive_pulse() bool SmClusterHbsInfoMsg::send_alive_pulse()
{ {
return cluster_hbs_info_query(NULL, true); return cluster_hbs_info_query(NULL);
} }
SmErrorT SmClusterHbsInfoMsg::open_socket() SmErrorT SmClusterHbsInfoMsg::open_socket()
@ -521,6 +546,13 @@ SmErrorT SmClusterHbsInfoMsg::initialize()
return SM_FAILED; return SM_FAILED;
} }
error = SmClusterHbsInfoMsg::set_address();
if(SM_OKAY != error)
{
DPRINTFE("Failed to set sock address");
return SM_FAILED;
}
SmWorkerThread::get_worker().add_action(&_query_hbs_cluster_info_action); SmWorkerThread::get_worker().add_action(&_query_hbs_cluster_info_action);
return SM_OKAY; return SM_OKAY;
} }

View File

@ -5,6 +5,7 @@
// //
#ifndef __SM_CLUSTER_HBS_INFO_MSG_H__ #ifndef __SM_CLUSTER_HBS_INFO_MSG_H__
#define __SM_CLUSTER_HBS_INFO_MSG_H__ #define __SM_CLUSTER_HBS_INFO_MSG_H__
#include <atomic>
#include <list> #include <list>
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
@ -77,11 +78,12 @@ class SmClusterHbsInfoMsg
static SmErrorT finalize(); static SmErrorT finalize();
static const SmClusterHbsStateT& get_current_state(); static const SmClusterHbsStateT& get_current_state();
static const SmClusterHbsStateT& get_previous_state(); static const SmClusterHbsStateT& get_previous_state();
static bool cluster_hbs_info_query(cluster_hbs_query_ready_callback callback = NULL, bool alive_pulse = false); static bool cluster_hbs_info_query(cluster_hbs_query_ready_callback callback = NULL);
static bool send_alive_pulse(); static bool send_alive_pulse();
static void dump_hbs_record(FILE* fp); static void dump_hbs_record(FILE* fp);
static int get_peer_controller_index(); static int get_peer_controller_index();
static int get_this_controller_index(); static int get_this_controller_index();
static SmErrorT set_address();
private: private:
static int _sock; static int _sock;
@ -93,7 +95,7 @@ class SmClusterHbsInfoMsg
static SmErrorT open_socket(); static SmErrorT open_socket();
static SmErrorT get_controller_index(); static SmErrorT get_controller_index();
static SmErrorT _get_address(struct sockaddr_in* addr); static struct sockaddr_in* _get_address();
static void _cluster_hbs_info_msg_received( int selobj, int64_t user_data ); static void _cluster_hbs_info_msg_received( int selobj, int64_t user_data );
static bool _process_cluster_hbs_history(mtce_hbs_cluster_history_type history, static bool _process_cluster_hbs_history(mtce_hbs_cluster_history_type history,
SmClusterHbsStateT& state); SmClusterHbsStateT& state);
@ -102,6 +104,8 @@ class SmClusterHbsInfoMsg
static char client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1]; static char client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1];
static int peer_controller_index; static int peer_controller_index;
static int this_controller_index; static int this_controller_index;
static std::atomic_flag _sending_query;
static struct sockaddr_in sock_addr;
}; };
#endif // __SM_CLUSTER_HBS_INFO_MSG_H__ #endif // __SM_CLUSTER_HBS_INFO_MSG_H__