Retrieve hbs cluster info

This change includes:
1. adds code to receive cluster info update from hbsAgent.
2. support of ondemand hbs cluster info query (asynchronous).

Depends-On: I7d294d40e84469df6b6a6f6dd490cf3c4557b711

Story: 2003577
Task: 27816

Change-Id: Idb65abc58b4afe9649aba442f0798c24d9fffb10
Signed-off-by: Bin Qian <bin.qian@windriver.com>
This commit is contained in:
Bin Qian 2018-10-29 13:51:04 -04:00
parent d7ba498da9
commit 28e293bda5
10 changed files with 568 additions and 6 deletions

View File

@ -2,4 +2,4 @@ SRC_DIR=$PKG_BASE
COPY_LIST="$PKG_BASE/LICENSE"
TAR_NAME=sm
VERSION=1.0.0
TIS_PATCH_VER=26
TIS_PATCH_VER=27

View File

@ -12,6 +12,7 @@ Source1: LICENSE
BuildRequires: fm-common-dev
BuildRequires: sm-db-dev
BuildRequires: sm-common-dev
BuildRequires: mtce-dev
BuildRequires: glib2-devel
BuildRequires: glibc
BuildRequires: sqlite-devel

View File

@ -116,6 +116,7 @@ SRCS+=sm_failover_ss.c
SRCS+=sm_service_domain_interface_not_in_use_state.c
SRCS+=sm_configuration_table.c
SRCS+=sm_failover_utils.c
SRCS+=sm_cluster_hbs_info_msg.cpp
OBJS= $(SRCS:.c=.o)
CCFLAGS= -g -O2 -Wall -Werror -Wformat -std=c++11

View File

@ -0,0 +1,451 @@
//
// Copyright (c) 2018 Wind River Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
#include "sm_cluster_hbs_info_msg.h"
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <net/if.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "sm_configuration_table.h"
#include "sm_debug.h"
#include "sm_limits.h"
#include "sm_selobj.h"
#include "sm_timer.h"
#define LOOPBACK_IP "127.0.0.1"
#define SM_CLIENT_PORT_KEY "sm_client_port"
#define SM_SERVER_PORT_KEY "sm_server_port"
const char json_fmt[] = "{\"origin\":\"sm\",\"service\":\"heartbeat\",\"request\":\"cluster_info\",\"reqid\":\"%d\"}";
const int request_size = sizeof(json_fmt) + 10;
static const unsigned int size_of_msg_header =
sizeof(mtce_hbs_cluster_type)
- sizeof(mtce_hbs_cluster_history_type) * MTCE_HBS_MAX_HISTORY_ELEMENTS;
bool operator==(const SmClusterHbsInfoT& lhs, const SmClusterHbsInfoT& rhs)
{
return lhs.storage0_responding == rhs.storage0_responding &&
lhs.number_of_node_reachable == rhs.number_of_node_reachable;
}
bool operator!=(const SmClusterHbsInfoT& lhs, const SmClusterHbsInfoT& rhs)
{
return !(lhs == rhs);
}
bool operator==(const SmClusterHbsStateT& lhs, const SmClusterHbsStateT& rhs)
{
if(lhs.storage0_enabled != rhs.storage0_enabled)
return false;
for(unsigned int i = 0; i < max_controllers; i ++)
{
if(lhs.controllers[i] != rhs.controllers[i])
{
return false;
}
}
return true;
}
bool operator!=(const SmClusterHbsStateT& lhs, const SmClusterHbsStateT& rhs)
{
return !(lhs == rhs);
}
pthread_mutex_t SmClusterHbsInfoMsg::_mutex;
const unsigned short Invalid_Req_Id = 0;
int SmClusterHbsInfoMsg::_sock = -1;
SmClusterHbsStateT SmClusterHbsInfoMsg::_cluster_hbs_state_current;
SmClusterHbsStateT SmClusterHbsInfoMsg::_cluster_hbs_state_previous;
SmClusterHbsInfoMsg::hbs_query_respond_callback SmClusterHbsInfoMsg::_callbacks;
const SmClusterHbsStateT& SmClusterHbsInfoMsg::get_current_state()
{
return _cluster_hbs_state_current;
}
const SmClusterHbsStateT& SmClusterHbsInfoMsg::get_previous_state()
{
return _cluster_hbs_state_previous;
}
bool SmClusterHbsInfoMsg::_process_cluster_hbs_history(mtce_hbs_cluster_history_type history, SmClusterHbsStateT& state)
{
if(history.controller >= max_controllers)
{
DPRINTFE("Invalid controller id %d", history.controller);
return false;
}
if(MTCE_HBS_NETWORKS <= history.network)
{
DPRINTFE("Invalid network id %d", history.network);
return false;
}
if(MTCE_HBS_HISTORY_ENTRIES < history.entries)
{
DPRINTFE("Invalid entries %d", history.entries);
return false;
}
if(MTCE_HBS_HISTORY_ENTRIES < history.oldest_entry_index)
{
DPRINTFE("Invalid oldest entry index %d", history.oldest_entry_index);
return false;
}
int newest_entry_index = (history.oldest_entry_index + history.entries) % MTCE_HBS_HISTORY_ENTRIES;
mtce_hbs_cluster_entry_type& entry = history.entry[newest_entry_index];
SmClusterHbsInfoT& controller_state = state.controllers[history.controller];
controller_state.storage0_responding = history.storage0_responding;
if(entry.hosts_responding > controller_state.number_of_node_reachable)
{
controller_state.number_of_node_reachable = entry.hosts_responding;
}
return true;
}
void SmClusterHbsInfoMsg::_cluster_hbs_info_msg_received( int selobj, int64_t user_data )
{
mtce_hbs_cluster_type msg = {0};
mutex_holder holder(&_mutex);
while(true)
{
int bytes_read = recv( selobj, &msg, sizeof(msg), MSG_NOSIGNAL | MSG_DONTWAIT );
DPRINTFD("msg received %d bytes. buffer size %d", bytes_read, sizeof(msg));
if(bytes_read < 0)
{
if(EAGAIN != errno)
{
DPRINTFE("Failed to read socket. error %s", strerror(errno));
}
return;
}
if(size_of_msg_header > (unsigned int)bytes_read)
{
DPRINTFE("size not right, msg size %d, expected not less than %d",
bytes_read, size_of_msg_header);
return;
}
DPRINTFD("msg version %d, revision %d, size %d, reqid %d",
msg.version, msg.revision, msg.bytes, msg.reqid);
DPRINTFD("period %d number of rec %d", msg.period_msec, msg.histories);
SmClusterHbsStateT state;
if(msg.histories > 0)
{
int expected_size = sizeof(mtce_hbs_cluster_history_type) * msg.histories
+ size_of_msg_header;
if(bytes_read != expected_size)
{
DPRINTFE("Received size %d not matching %d expected", bytes_read, expected_size);
return;
}
for(int i = 0; i < msg.histories; i ++)
{
if(!_process_cluster_hbs_history(msg.history[i], state))
{
return;
}
}
}else
{
DPRINTFD("No rbs cluster info history data is received");
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
state.last_update = ts.tv_sec;
state.storage0_enabled = (bool)msg.storage0_enabled;
if(state != _cluster_hbs_state_current)
{
_cluster_hbs_state_previous = _cluster_hbs_state_current;
_cluster_hbs_state_current = state;
}
else
{
DPRINTFD("cluster hbs state unchanged");
}
while(!_callbacks.empty())
{
cluster_hbs_query_ready_callback callback = _callbacks.front();
_callbacks.pop_front();
callback();
}
}
}
SmErrorT SmClusterHbsInfoMsg::_get_address(const char* port_key, struct sockaddr_in* addr)
{
struct addrinfo *address = NULL;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; // IPv4 only
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
char port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1];
if( SM_OKAY != sm_configuration_table_get(port_key, port, sizeof(port) - 1) )
{
DPRINTFE("Runtime error: system configuration %s undefined", port_key);
return SM_FAILED;
}
int result = getaddrinfo(LOOPBACK_IP, port, &hints, &address);
if(result != 0)
{
DPRINTFE("Failed to get addrinfo %s:%s", LOOPBACK_IP, port);
return SM_FAILED;
}
memcpy(addr, address->ai_addr, sizeof(struct sockaddr_in));
freeaddrinfo(address);
return SM_OKAY;
}
// ****************************************************************************
// SmClusterHbsInfoMsg::cluster_hbs_info_query -
// trigger a query of cluster hbs info.
// return true if request sent successfully, false otherwise.
// ========================
bool SmClusterHbsInfoMsg::cluster_hbs_info_query(cluster_hbs_query_ready_callback callback)
{
char server_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1];
if( SM_OKAY != sm_configuration_table_get(SM_SERVER_PORT_KEY, server_port, sizeof(server_port) - 1) )
{
DPRINTFE("Runtime error: system configuration %s undefined", SM_SERVER_PORT_KEY);
return false;
}
int port = atoi(server_port);
if(0 > port)
{
DPRINTFE("Runtime error: Invalid configuration %s: %s", SM_SERVER_PORT_KEY, server_port);
return false;
}
char query[request_size];
unsigned short reqid;
struct timespec ts;
mutex_holder holder(&_mutex);
if(0 != clock_gettime(CLOCK_REALTIME, &ts))
{
DPRINTFE("Failed to get realtime");
reqid = (unsigned short)1;
}else
{
unsigned short* v = (unsigned short*)(&ts.tv_nsec);
reqid = (*v) % 0xFFFE + 1;
}
struct sockaddr_in addr;
if(SM_OKAY != _get_address(SM_SERVER_PORT_KEY, &addr))
{
DPRINTFE("Failed to get address");
return false;
}
int msg_size = snprintf(query, sizeof(query), json_fmt, reqid);
DPRINTFD("msg (%d:%d) to send %s", msg_size + 1, strlen(query), query);
if(0 > sendto(_sock, query, msg_size + 1, 0, (sockaddr*)&addr, sizeof(addr)))
{
DPRINTFE("Failed to send msg. Error %s", strerror(errno));
return false;
}
if(NULL != callback)
{
_callbacks.push_back(callback);
}
return true;
}
SmErrorT SmClusterHbsInfoMsg::open_socket()
{
struct addrinfo *address = NULL;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; // IPv4 only
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
struct sockaddr_in addr;
char client_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1];
char server_port[SM_CONFIGURATION_VALUE_MAX_CHAR + 1];
if( SM_OKAY != sm_configuration_table_get(SM_CLIENT_PORT_KEY, client_port, sizeof(client_port) - 1) )
{
DPRINTFE("Runtime error: system configuration %s undefined", SM_CLIENT_PORT_KEY);
return SM_FAILED;
}
if( SM_OKAY != sm_configuration_table_get(SM_SERVER_PORT_KEY, server_port, sizeof(server_port) - 1) )
{
DPRINTFE("Runtime error: system configuration %s undefined", SM_SERVER_PORT_KEY);
return SM_FAILED;
}else
{
int port = atoi(server_port);
if(0 > port)
{
DPRINTFE("Invalid configuration %s: %s", SM_SERVER_PORT_KEY, server_port);
return SM_FAILED;
}
}
int result = getaddrinfo(LOOPBACK_IP, client_port, &hints, &address);
if(result != 0)
{
DPRINTFE("Failed to get addrinfo %s:%s", LOOPBACK_IP, client_port);
return SM_FAILED;
}
memcpy(&addr, address->ai_addr, sizeof(addr));
freeaddrinfo(address);
address = NULL;
int sock = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
if( 0 > sock )
{
DPRINTFE("Failed to create sock. Error %s", strerror(errno));
return SM_FAILED;
}
int flags = fcntl( sock, F_GETFL, 0 );
if( 0 > flags )
{
DPRINTFE("Failed to get flags, error=%s.", strerror(errno));
close( sock );
return SM_FAILED;
}
if( 0 > fcntl( sock, F_SETFL, flags | O_NONBLOCK ) )
{
DPRINTFE("Failed to set flags, error=%s.", strerror(errno));
close( sock );
return SM_FAILED;
}
result = bind( sock, (struct sockaddr *) &addr, sizeof(struct sockaddr_in));
if(0 > result)
{
DPRINTFE("Failed to bind. error=%s", strerror( errno));
close( sock );
return SM_FAILED;
}
SmErrorT error = sm_selobj_register(sock, _cluster_hbs_info_msg_received, 0);
if(SM_OKAY != error)
{
DPRINTFE("Failed to register selobj");
close( sock );
return SM_FAILED;
}
_sock = sock;
return SM_OKAY;
}
SmErrorT SmClusterHbsInfoMsg::initialize()
{
SmErrorT error;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
int res = pthread_mutex_init(&_mutex, &attr);
if( 0 != res )
{
DPRINTFE("Failed to initialize mutex, error %d", res);
return SM_FAILED;
}
error = open_socket();
if(SM_OKAY != error)
{
DPRINTFE("Failed to open sock");
return SM_FAILED;
}
return SM_OKAY;
}
SmErrorT SmClusterHbsInfoMsg::finalize()
{
mutex_holder holder(&_mutex);
if(_sock > 0)
{
close(_sock);
_sock = -1;
}
pthread_mutex_destroy(&_mutex);
return SM_OKAY;
}
void SmClusterHbsInfoMsg::dump_hbs_record(FILE* fp)
{
struct timespec ts;
time_t t;
clock_gettime(CLOCK_REALTIME, &ts);
t = ts.tv_sec - _cluster_hbs_state_current.last_update;
fprintf(fp, "\ncluster hbs info\n");
if(0 == _cluster_hbs_state_current.last_update)
{
fprintf(fp, " Current state, no data received yet\n");
}else
{
fprintf(fp, " Current state, last updated %d seconds ago\n", (int)t);
fprintf(fp, " storage-0 is %s configured\n", _cluster_hbs_state_current.storage0_enabled ? "" : "not");
fprintf(fp, " From controller-0\n");
if(_cluster_hbs_state_current.storage0_enabled)
{
fprintf(fp, " storage-0 is %s responding\n", _cluster_hbs_state_current.controllers[0].storage0_responding ? "" : "not");
}
fprintf(fp, " %d nodes are responding\n", _cluster_hbs_state_current.controllers[0].number_of_node_reachable);
fprintf(fp, " From controller-1\n");
if(_cluster_hbs_state_current.storage0_enabled)
{
fprintf(fp, " storage-0 is %s responding\n", _cluster_hbs_state_current.controllers[1].storage0_responding ? "" : "not");
}
fprintf(fp, " %d nodes are responding\n", _cluster_hbs_state_current.controllers[1].number_of_node_reachable);
}
if(0 != _cluster_hbs_state_previous.last_update)
{
fprintf(fp, "\n Previous state, since %d seconds ago\n", (int)t);
fprintf(fp, " storage-0 is %s configured\n", _cluster_hbs_state_previous.storage0_enabled ? "" : "not");
fprintf(fp, " From controller-0\n");
if(_cluster_hbs_state_previous.storage0_enabled)
{
fprintf(fp, " storage-0 is %s responding\n", _cluster_hbs_state_previous.controllers[0].storage0_responding ? "" : "not");
}
fprintf(fp, " %d nodes are responding\n", _cluster_hbs_state_previous.controllers[0].number_of_node_reachable);
fprintf(fp, " From controller-1\n");
if(_cluster_hbs_state_previous.storage0_enabled)
{
fprintf(fp, " storage-0 is %s responding\n", _cluster_hbs_state_previous.controllers[1].storage0_responding ? "" : "not");
}
fprintf(fp, " %d nodes are responding\n", _cluster_hbs_state_previous.controllers[1].number_of_node_reachable);
}
}

View File

@ -0,0 +1,86 @@
//
// Copyright (c) 2018 Wind River Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
#ifndef __SM_CLUSTER_HBS_INFO_MSG_H__
#define __SM_CLUSTER_HBS_INFO_MSG_H__
#include <list>
#include <pthread.h>
#include <stdio.h>
#include "mtceHbsCluster.h"
#include "sm_types.h"
#include "sm_timer.h"
#include "sm_util_types.h"
// ****************************************************************************
// struct SmClusterHbsInfoT
// Store cluster hbs info
// ========================
struct _SmClusterHbsInfoT
{
bool storage0_responding;
int number_of_node_reachable;
_SmClusterHbsInfoT() : storage0_responding(false), number_of_node_reachable(0)
{
}
};
typedef struct _SmClusterHbsInfoT SmClusterHbsInfoT;
bool operator==(const SmClusterHbsInfoT& lhs, const SmClusterHbsInfoT& rhs);
bool operator!=(const SmClusterHbsInfoT& lhs, const SmClusterHbsInfoT& rhs);
const unsigned int max_controllers = 2;
// ****************************************************************************
// struct SmClusterHbsInfoT
// Store cluster hbs info aggregate data from all (max_controllers)
// controllers
// ========================
typedef struct
{
SmClusterHbsInfoT controllers[max_controllers];
bool storage0_enabled;
time_t last_update;
}SmClusterHbsStateT;
bool operator==(const SmClusterHbsStateT& lhs, const SmClusterHbsStateT& rhs);
bool operator!=(const SmClusterHbsStateT& lhs, const SmClusterHbsStateT& rhs);
typedef void(*cluster_hbs_query_ready_callback)();
// ****************************************************************************
// class SmClusterHbsInfoMsg -
// handle requesting/receiving and processing cluster hbs info from hbsAgent.
//
// hbsAgent sends hbs cluster info when there is a change in the cluster.
// Keep track of most up-to-date info prior to a failure occurs.
// Provide async query with callback when hbsAgent response is received
// ========================
class SmClusterHbsInfoMsg
{
public:
typedef std::list<cluster_hbs_query_ready_callback> hbs_query_respond_callback;
static SmErrorT initialize();
static SmErrorT finalize();
static const SmClusterHbsStateT& get_current_state();
static const SmClusterHbsStateT& get_previous_state();
static bool cluster_hbs_info_query(cluster_hbs_query_ready_callback callback = NULL);
static void dump_hbs_record(FILE* fp);
private:
static int _sock;
static unsigned short _last_reqid;
static pthread_mutex_t _mutex;
static SmClusterHbsStateT _cluster_hbs_state_current;
static SmClusterHbsStateT _cluster_hbs_state_previous;
static hbs_query_respond_callback _callbacks;
static SmErrorT open_socket();
static SmErrorT _get_address(const char* port_key, struct sockaddr_in* addr);
static void _cluster_hbs_info_msg_received( int selobj, int64_t user_data );
static bool _process_cluster_hbs_history(mtce_hbs_cluster_history_type history,
SmClusterHbsStateT& state);
};
#endif // __SM_CLUSTER_HBS_INFO_MSG_H__

View File

@ -36,6 +36,7 @@
#include "sm_failover_utils.h"
#include "sm_failover_fsm.h"
#include "sm_api.h"
#include "sm_cluster_hbs_info_msg.h"
#define SM_FAILOVER_STATE_TRANSITION_TIME_IN_MS 2000
#define SM_FAILOVER_MULTI_FAILURE_WAIT_TIMER_IN_MS 2000
@ -244,7 +245,7 @@ void sm_failover_lost_hello_msg()
}
// ****************************************************************************
// Failover - Hello msg restor
// Failover - Hello msg restore
// ==================
void sm_failover_hello_msg_restore()
{
@ -1291,6 +1292,19 @@ SmErrorT sm_failover_initialize( void )
sm_failover_audit_timeout,
0, &failover_audit_timer_id );
if(SM_OKAY != error)
{
DPRINTFE("Failed to register failover audit timer, error %s",
sm_error_str(error));
return SM_FAILED;
}
error = SmClusterHbsInfoMsg::initialize();
if(SM_OKAY != error)
{
DPRINTFE("Failed to initialize cluster hbs info messaging");
}
return SM_OKAY;
}
// ****************************************************************************
@ -1303,6 +1317,12 @@ SmErrorT sm_failover_finalize( void )
_total_interfaces = 0;
SmErrorT error;
error = SmClusterHbsInfoMsg::finalize();
if(SM_OKAY != error)
{
DPRINTFE("Failed to finalize cluster hbs info messaging");
}
sm_timer_deregister( failover_audit_timer_id );
if( NULL != _sm_db_handle )
{
@ -1323,6 +1343,7 @@ SmErrorT sm_failover_finalize( void )
return error;
}
pthread_mutex_destroy(&_mutex);
return SM_OKAY;
}
// ****************************************************************************

View File

@ -607,7 +607,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] )
sm_process_setup_signal_handler();
DPRINTFI( "Starting" );
DPRINTFI( "Starting. SW built at %s %s", __DATE__, __TIME__ );
if( sm_utils_process_running( SM_PROCESS_PID_FILENAME ) )
{

View File

@ -588,7 +588,7 @@ static int get_initial_throttle()
return MAX_SERVICE_EXPECTED;
}
char buf[SM_CONFIGURATION_KEY_MAX_CHAR + 1];
char buf[SM_CONFIGURATION_VALUE_MAX_CHAR + 1];
int size;
if( SM_OKAY == sm_configuration_table_get("ENABLING_THROTTLE", buf, sizeof(buf) - 1) )
{

View File

@ -27,6 +27,7 @@
#include "sm_failover.h"
#include "sm_service_domain_neighbor_fsm.h"
#include "sm_service_domain_fsm.h"
#include "sm_cluster_hbs_info_msg.h"
#define SM_TROUBLESHOOT_NAME "sm_troubleshoot"
@ -125,6 +126,7 @@ SmErrorT sm_troubleshoot_dump_data( const char reason[] )
sm_service_domain_dump_state( log ); fprintf( log, "\n" );
sm_service_domain_interface_dump_state( log ); fprintf( log, "\n" );
sm_domain_neighbor_fsm_dump( log );
SmClusterHbsInfoMsg::dump_hbs_record(log);
sm_timer_dump_data( log ); fprintf( log, "\n" );
sm_msg_dump_data( log ); fprintf( log, "\n" );

View File

@ -26,10 +26,10 @@ SRCS+=sm_eru_db.c
SRCS+=sm_util_types.c
OBJS = $(SRCS:.c=.o)
CCFLAGS= -fPIC -g -O2 -Wall -Werror
CCFLAGS= -fPIC -g -O2 -Wall -Werror -std=c++11
EXTRACCFLAGS= -D__STDC_FORMAT_MACROS -DSW_VERSION=\"$(SW_VERSION)\"
LDLIBS= -lsqlite3 -lglib-2.0 -lgmodule-2.0 -luuid -lrt -lpthread -std=c++11
LDLIBS= -lsqlite3 -lglib-2.0 -lgmodule-2.0 -luuid -lrt -lpthread
LDFLAGS = -shared -rdynamic
build: libsm_common.so libsm_watchdog_nfs.so sm_watchdog sm_eru sm_eru_dump