diff --git a/service-mgmt/sm/src/sm_cluster_hbs_info_msg.cpp b/service-mgmt/sm/src/sm_cluster_hbs_info_msg.cpp index 678716de..61bcce72 100644 --- a/service-mgmt/sm/src/sm_cluster_hbs_info_msg.cpp +++ b/service-mgmt/sm/src/sm_cluster_hbs_info_msg.cpp @@ -5,6 +5,7 @@ // #include "sm_cluster_hbs_info_msg.h" #include +#include #include #include #include @@ -128,6 +129,8 @@ int SmClusterHbsInfoMsg::this_controller_index = -1; int SmClusterHbsInfoMsg::peer_controller_index = -1; char SmClusterHbsInfoMsg::server_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1] = {0}; char SmClusterHbsInfoMsg::client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1] = {0}; +std::atomic_flag SmClusterHbsInfoMsg::_sending_query = ATOMIC_FLAG_INIT; +struct sockaddr_in SmClusterHbsInfoMsg::sock_addr = {0}; const SmClusterHbsStateT& SmClusterHbsInfoMsg::get_current_state() { @@ -300,7 +303,12 @@ void SmClusterHbsInfoMsg::_cluster_hbs_info_msg_received( int selobj, int64_t us } } -SmErrorT SmClusterHbsInfoMsg::_get_address(struct sockaddr_in* addr) +struct sockaddr_in* SmClusterHbsInfoMsg::_get_address() +{ + return &sock_addr; +} + +SmErrorT SmClusterHbsInfoMsg::set_address() { struct addrinfo *address = NULL; struct addrinfo hints; @@ -320,7 +328,7 @@ SmErrorT SmClusterHbsInfoMsg::_get_address(struct sockaddr_in* addr) return SM_FAILED; } - memcpy(addr, address->ai_addr, sizeof(struct sockaddr_in)); + memcpy(&sock_addr, address->ai_addr, sizeof(struct sockaddr_in)); freeaddrinfo(address); return SM_OKAY; } @@ -335,10 +343,13 @@ static SmSimpleAction _query_hbs_cluster_info_action("send hbs-cluster query", s // **************************************************************************** // SmClusterHbsInfoMsg::cluster_hbs_info_query - // trigger a query of cluster hbs info. +// by providing a none-null callback, it requires hbsAgent to respond the query +// otherwise, it just sends to hbsAgent as an alive pulse. // return true if request sent successfully, false otherwise. // ======================== -bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callback callback, bool alive_pulse) +bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callback callback) { + bool alive_pulse = (NULL == callback); int port = atoi(server_port); if(0 > port) { @@ -346,11 +357,21 @@ bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callbac return false; } + bool already_sending = false; + already_sending = _sending_query.test_and_set(); + if (already_sending && alive_pulse) + { + // an alive pulse happens when a query is being sent, return immediately. + // alive pulse is time interval based, so don't wait as long as one is sent. + return true; + } char query[request_size]; unsigned short reqid; struct timespec ts; { mutex_holder holder(&sm_cluster_hbs_mutex); + already_sending = _sending_query.test_and_set(); + if(alive_pulse) { reqid = 0; @@ -367,10 +388,12 @@ bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callbac } } - struct sockaddr_in addr; - if(SM_OKAY != _get_address(&addr)) + struct sockaddr_in *addr; + addr = _get_address(); + if (NULL == addr) { DPRINTFE("Failed to get address"); + _sending_query.clear(); return false; } @@ -380,22 +403,24 @@ bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callbac { DPRINTFI("send hbs cluster query [%d]", reqid); } - if(0 > sendto(_sock, query, msg_size, 0, (sockaddr*)&addr, sizeof(addr))) + if(0 > sendto(_sock, query, msg_size, 0, (sockaddr*)addr, sizeof(*addr))) { DPRINTFE("Failed to send msg. Error %s", strerror(errno)); + _sending_query.clear(); return false; } if(NULL != callback) { _callbacks.push_back(callback); } + _sending_query.clear(); } return true; } bool SmClusterHbsInfoMsg::send_alive_pulse() { - return cluster_hbs_info_query(NULL, true); + return cluster_hbs_info_query(NULL); } SmErrorT SmClusterHbsInfoMsg::open_socket() @@ -521,6 +546,13 @@ SmErrorT SmClusterHbsInfoMsg::initialize() return SM_FAILED; } + error = SmClusterHbsInfoMsg::set_address(); + if(SM_OKAY != error) + { + DPRINTFE("Failed to set sock address"); + return SM_FAILED; + } + SmWorkerThread::get_worker().add_action(&_query_hbs_cluster_info_action); return SM_OKAY; } diff --git a/service-mgmt/sm/src/sm_cluster_hbs_info_msg.h b/service-mgmt/sm/src/sm_cluster_hbs_info_msg.h index 6fa85686..b2ef17c2 100644 --- a/service-mgmt/sm/src/sm_cluster_hbs_info_msg.h +++ b/service-mgmt/sm/src/sm_cluster_hbs_info_msg.h @@ -5,6 +5,7 @@ // #ifndef __SM_CLUSTER_HBS_INFO_MSG_H__ #define __SM_CLUSTER_HBS_INFO_MSG_H__ +#include #include #include #include @@ -77,11 +78,12 @@ class SmClusterHbsInfoMsg static SmErrorT finalize(); static const SmClusterHbsStateT& get_current_state(); static const SmClusterHbsStateT& get_previous_state(); - static bool cluster_hbs_info_query(cluster_hbs_query_ready_callback callback = NULL, bool alive_pulse = false); + static bool cluster_hbs_info_query(cluster_hbs_query_ready_callback callback = NULL); static bool send_alive_pulse(); static void dump_hbs_record(FILE* fp); static int get_peer_controller_index(); static int get_this_controller_index(); + static SmErrorT set_address(); private: static int _sock; @@ -93,7 +95,7 @@ class SmClusterHbsInfoMsg static SmErrorT open_socket(); static SmErrorT get_controller_index(); - static SmErrorT _get_address(struct sockaddr_in* addr); + static struct sockaddr_in* _get_address(); static void _cluster_hbs_info_msg_received( int selobj, int64_t user_data ); static bool _process_cluster_hbs_history(mtce_hbs_cluster_history_type history, SmClusterHbsStateT& state); @@ -102,6 +104,8 @@ class SmClusterHbsInfoMsg static char client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1]; static int peer_controller_index; static int this_controller_index; + static std::atomic_flag _sending_query; + static struct sockaddr_in sock_addr; }; #endif // __SM_CLUSTER_HBS_INFO_MSG_H__