/* * Copyright (c) 2013-2018 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 * */ /** * @file * Wind River CGTS Platform Guest Heartbeat Server Daemon on Compute */ #include #include #include #include #include #include #include #include #include #include #include /* for hostent */ #include #include #include #include #include #include #include /* for close and usleep */ #include /* for realtime scheduling api */ #include using namespace std; #include "nodeBase.h" #include "daemon_ini.h" /* Ini Parser Header */ #include "daemon_common.h" /* Common definitions and types for daemons */ #include "daemon_option.h" /* Common options for daemons */ #include "nodeUtil.h" /* for ... common utilities */ #include "jsonUtil.h" /* for ... jason utilities */ #include "nodeTimers.h" /* for ... maintenance timers */ #include "nodeMacro.h" /* for ... CREATE_NONBLOCK_INET_UDP_RX_SOCKET */ #include "nodeEvent.h" /* for ... set_inotify_watch, set_inotify_close */ #include "guestBase.h" #include "guestInstClass.h" /* for ... guestUtil_inst_init */ #include "guestUtil.h" /* for ... guestUtil_inst_init */ #include "guestSvrUtil.h" /* for ... hb_get_message_type_name */ #include "guestSvrMsg.h" /* for ... this module header */ extern void hbStatusChange ( instInfo * instInfo_ptr, bool status ); extern void beatStateChange ( instInfo * instInfo_ptr , hb_state_t newState ); /***************************************************************************** * * Name : guestSvrMsg_hdr_init * * Purpose: Initialize the message header. Example output: * {"version":2,"revision":1,"msg_type":"init","sequence":29, * The rest of the message should be appended to it. * *****************************************************************************/ string guestSvrMsg_hdr_init (string channel, string msg_type) { instInfo * instInfo_ptr = get_instInv_ptr()->get_inst (channel); string msg = "\n{\""; msg.append(GUEST_HEARTBEAT_MSG_VERSION); msg.append("\":"); msg.append(int_to_string(GUEST_HEARTBEAT_MSG_VERSION_CURRENT)); msg.append(",\""); msg.append(GUEST_HEARTBEAT_MSG_REVISION); msg.append("\":"); msg.append(int_to_string(GUEST_HEARTBEAT_MSG_REVISION_CURRENT)); msg.append(",\""); msg.append(GUEST_HEARTBEAT_MSG_MSG_TYPE); msg.append("\":\""); msg.append(msg_type); msg.append("\",\""); msg.append(GUEST_HEARTBEAT_MSG_SEQUENCE); msg.append("\":"); msg.append(int_to_string(++(instInfo_ptr->sequence))); msg.append(","); // store msg_type in instance structure so that it is available to handle timeout instInfo_ptr->msg_type = msg_type; return msg; } /** * Manages the fault reporting state * - returns current reporting state * */ bool manage_reporting_state ( instInfo * instInfo_ptr, string state) { if (!state.compare("enabled")) { if ( instInfo_ptr->heartbeat.reporting == false ) { ilog ("%s heartbeat reporting '%s' by guestAgent\n", log_prefix(instInfo_ptr).c_str(), state.c_str()); instInfo_ptr->heartbeat.reporting = true ; instInfo_ptr->message_count = 0 ; } } else { if ( instInfo_ptr->heartbeat.reporting == true ) { ilog ("%s heartbeat reporting '%s' by guestAgent\n", log_prefix(instInfo_ptr).c_str(), state.c_str()); instInfo_ptr->heartbeat.reporting = false ; instInfo_ptr->message_count = 0 ; hbStatusChange ( instInfo_ptr, false) ; /* heartbeating is now false */ beatStateChange ( instInfo_ptr, hbs_server_waiting_init ) ; } } return instInfo_ptr->heartbeat.reporting ; } /***************************************************************************** * * Name : guestAgent_qry_handler * * Purpose: Loop over all the instances and return their uuid, hostname, * reporting state, heartbneating status and timeout values. * * { "hostname":"compute-1", "instances": [{"uuid":"","heartbeat":"", status":"}, timeouts ...]} * *****************************************************************************/ int guestInstClass::guestAgent_qry_handler ( void ) { int rc = PASS ; /* check for empty list condition */ if ( inst_head ) { struct inst * inst_ptr = static_cast(NULL) ; for ( inst_ptr = inst_head ; ; inst_ptr = inst_ptr->next ) { string payload = guestUtil_set_inst_info ( get_ctrl_ptr()->hostname , &inst_ptr->instance ); jlog ("%s Query Instance Response:%ld:%s\n", log_prefix(&inst_ptr->instance).c_str(), payload.size(), payload.c_str() ); if (( rc=send_to_guestAgent ( MTC_CMD_QRY_INST, payload.data())) != PASS ) { wlog ("%s failed to send query instance response to guestAgent\n", log_prefix(&inst_ptr->instance).c_str()); } /* Deal with exit case */ if (( inst_ptr->next == NULL ) || ( inst_ptr == inst_tail )) { break ; } } } return (rc); } /***************************************************************************** * * Name : recv_from_guestAgent * * Purpose: Handle guestAgent commands * * MTC_EVENT_LOOPBACK * MTC_CMD_QRY_INST * MTC_CMD_DEL_INST * MTC_CMD_MOD_INST * MTC_CMD_ADD_INST * MTC_CMD_MOD_HOST * * ***************************************************************************/ int recv_from_guestAgent ( unsigned int cmd, char * buf_ptr ) { int rc = PASS ; mlog1 ("Cmd:%x - %s\n", cmd, buf_ptr); if ( cmd == MTC_EVENT_LOOPBACK ) { /* TODO: Send message back */ return (rc) ; } else if ( cmd == MTC_CMD_QRY_INST ) { if ( ( rc = get_instInv_ptr()->qry_inst ()) != PASS ) { elog ("failed to send hosts instance info\n"); } return (rc) ; } else if ( cmd == MTC_CMD_VOTE_INST || cmd == MTC_CMD_NOTIFY_INST ) { string source; string uuid; string event; rc = FAIL_KEY_VALUE_PARSE ; /* default to parse error */ if (( rc = jsonUtil_get_key_val ( buf_ptr, "source", source )) != PASS) { elog ("failed to extract 'source' (cmd:%x %s)\n", cmd , buf_ptr ); } else if (( rc = jsonUtil_get_key_val ( buf_ptr, "uuid", uuid )) != PASS) { elog ("failed to extract 'uuid' (cmd:%x %s)\n", cmd , buf_ptr); } else if (( rc = jsonUtil_get_key_val ( buf_ptr, "event", event )) != PASS) { elog ("failed to extract 'event' key (cmd:%x %s)\n", cmd , buf_ptr); } else { // send message to guest Client instInfo * instInfo_ptr = get_instInv_ptr()->get_inst(uuid); if ( instInfo_ptr ) { /* If this is a resume then we need to reconnect to the channel */ if ( !event.compare(GUEST_HEARTBEAT_MSG_EVENT_RESUME) ) { /* issue a reconnect if we are not connected the hartbeating has not started */ if (( instInfo_ptr->connected == false ) || ( instInfo_ptr->heartbeating == false )) { // instInfo_ptr->connect_wait_in_secs = 10 ; get_instInv_ptr()->reconnect_start ( instInfo_ptr->uuid.data() ); } } instInfo_ptr->event_type = event; if (MTC_CMD_VOTE_INST == cmd) { // for voting instInfo_ptr->notification_type = GUEST_HEARTBEAT_MSG_NOTIFY_REVOCABLE ; ilog ("%s sending revocable '%s' vote\n", log_prefix(instInfo_ptr).c_str(), event.c_str()); } else { // for notification instInfo_ptr->notification_type = GUEST_HEARTBEAT_MSG_NOTIFY_IRREVOCABLE ; ilog ("%s sending irrevocable '%s' notify\n", log_prefix(instInfo_ptr).c_str(), event.c_str()); } get_instInv_ptr()->send_vote_notify(uuid) ; rc = PASS ; } else { wlog ("%s is unknown\n", uuid.c_str()); } } } else { string source ; string uuid ; string service ; string state ; rc = FAIL_KEY_VALUE_PARSE ; /* default to parse error */ if (( rc = jsonUtil_get_key_val ( buf_ptr, "source", source )) != PASS) { elog ("failed to extract 'source' (cmd:%x %s)\n", cmd , buf_ptr ); } else if (( rc = jsonUtil_get_key_val ( buf_ptr, "uuid", uuid )) != PASS) { elog ("failed to extract 'uuid' (cmd:%x %s)\n", cmd , buf_ptr); } else if (( rc = jsonUtil_get_key_val ( buf_ptr, "service", service )) != PASS) { elog ("failed to extract 'service' key (cmd:%x %s)\n", cmd , buf_ptr); } else if (( rc = jsonUtil_get_key_val ( buf_ptr, "state", state )) != PASS) { elog ("failed to extract 'state' (cmd:%x %s)\n", cmd , buf_ptr ); } else { rc = RETRY ; switch ( cmd ) { case MTC_CMD_DEL_INST: { ilog ("%s delete\n", uuid.c_str()); if ( get_instInv_ptr()->del_inst( uuid ) == PASS ) { rc = PASS ; } else { dlog ("%s delete failed ; uuid lookup\n", uuid.c_str()); rc = FAIL_NOT_FOUND ; } if (daemon_get_cfg_ptr()->debug_level ) get_instInv_ptr()->print_instances (); break ; } case MTC_CMD_ADD_INST: case MTC_CMD_MOD_INST: { instInfo * instInfo_ptr = get_instInv_ptr()->get_inst ( uuid ); if ( instInfo_ptr ) { manage_reporting_state ( instInfo_ptr, state ); rc = PASS ; } /* if true then the current channel was not found and we need to add it */ if ( rc == RETRY ) { instInfo instance ; guestUtil_inst_init (&instance); instance.uuid = uuid ; ilog ("%s add with %s reporting %s\n", uuid.c_str(), service.c_str(), state.c_str()); get_instInv_ptr()->add_inst ( uuid, instance ); instInfo * instInfo_ptr = get_instInv_ptr()->get_inst ( uuid ); manage_reporting_state ( instInfo_ptr, state ); } if (daemon_get_cfg_ptr()->debug_level ) get_instInv_ptr()->print_instances(); break ; } case MTC_CMD_MOD_HOST: { guestInstClass * obj_ptr = get_instInv_ptr() ; string reporting_state = "" ; rc = jsonUtil_get_key_val ( buf_ptr, "heartbeat", reporting_state ) ; if ( rc != PASS) { elog ("failed to extract heartbeat reporting state (rc=%d)\n", rc ); wlog ("... disabling 'heartbeat' fault reporting due to error\n"); obj_ptr->reporting = false ; rc = FAIL_JSON_PARSE ; } else if ( !reporting_state.compare("enabled") ) { ilog ("Enabling host level 'heartbeat' fault reporting\n"); obj_ptr->reporting = true ; } else { ilog ("Disabling host level 'heartbeat' fault reporting\n"); obj_ptr->reporting = false ; } break ; } default: { elog ("unsupported command (%x)\n", cmd ); } } } } return (rc); } /**************************************************************************** * * Name : send_to_guestAgent * * Purpose : Send a command and buffer to the guestAgent * * Description: If the guestAgent IP is not known the message is dropped * and a retry is returned. Otherwise the supplied message is * sent to the guestAgent running on the controller. * * **************************************************************************/ int send_to_guestAgent ( unsigned int cmd, const char * buf_ptr ) { int bytes = 0; ctrl_type * ctrl_ptr = get_ctrl_ptr () ; int rc = PASS ; mtc_message_type mtc_cmd ; memset (&mtc_cmd,0,sizeof(mtc_message_type)); memcpy ( &mtc_cmd.buf[0], buf_ptr, strlen(buf_ptr)); bytes = sizeof(mtc_message_type) ; if ( ctrl_ptr->address_peer.empty()) { mlog2 ("controller address unknown ; dropping message (%x:%s)", cmd , buf_ptr ); return RETRY ; } mlog1 ("Sending: %s:%d Cmd:%x:%s\n", ctrl_ptr->address_peer.c_str(), ctrl_ptr->sock.agent_rx_port, cmd, buf_ptr ); mtc_cmd.cmd = cmd ; /* rc = message size */ rc = ctrl_ptr->sock.server_tx_sock->write((char *)&mtc_cmd, bytes,ctrl_ptr->address_peer.c_str()); if ( 0 > rc ) { elog("failed to send (%d:%m)\n", errno ); rc = FAIL_SOCKET_SENDTO ; } else { mlog1 ("Transmit to %14s port %d\n", ctrl_ptr->address_peer.c_str(), ctrl_ptr->sock.server_tx_sock->get_dst_addr()->getPort()); print_mtc_message ( &mtc_cmd ); rc = PASS ; } return (rc); } /********************************************************************************* * * Name : write_inst (guestInstClass::public) * * Purpose: Send a message to the specified VM instance. * *********************************************************************************/ ssize_t guestInstClass::write_inst ( instInfo * instInfo_ptr, const char * message, size_t size) { string name = log_prefix(instInfo_ptr); errno = 0 ; size_t len = write ( instInfo_ptr->chan_fd, message, size ); if ( len != size ) { if ( errno ) { wlog_throttled ( instInfo_ptr->failure_count, 100, "%s failed to send '%s' (seq:%x) (%d:%m)\n", name.c_str(), instInfo_ptr->msg_type.c_str(), instInfo_ptr->sequence, errno ); if ( errno == EPIPE ) { instInfo_ptr->connected = false ; instInfo_ptr->connect_wait_in_secs = DEFAULT_CONNECT_WAIT ; get_instInv_ptr()->reconnect_start ( instInfo_ptr->uuid.data() ); } len = 0 ; } else { wlog_throttled ( instInfo_ptr->failure_count, 100, "%s send '%s' (seq:%x) (len:%ld)\n", name.c_str(), instInfo_ptr->msg_type.c_str(), instInfo_ptr->sequence, len); } } else { instInfo_ptr->failure_count = 0 ; mlog("%s send '%s' (seq:%x)\n", name.c_str(), instInfo_ptr->msg_type.c_str(), instInfo_ptr->sequence ); } return (len); } /********************************************************************************* * * Name : process_msg (guestInstClass::private) * * Purpose : process delimited message * *********************************************************************************/ void guestInstClass::process_msg(json_object *jobj_msg, struct guestInstClass::inst * inst_ptr) { int version; string msg_type; string log_err = "failed to parse "; guestInstClass * obj_ptr = get_instInv_ptr(); //parse incoming msg if (jobj_msg == NULL) { wlog("%s\n", log_err.c_str()); return; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS) { // fail to parse the version log_err.append(GUEST_HEARTBEAT_MSG_VERSION); elog("%s\n", log_err.c_str()); obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); json_object_put(jobj_msg); return; } if ( version < GUEST_HEARTBEAT_MSG_VERSION_CURRENT) { char log_err_str[100]; sprintf(log_err_str, "Bad version: %d, expect version: %d", version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT); elog("%s\n", log_err_str); log_err = log_err_str; obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); json_object_put(jobj_msg); return; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS) { // fail to parse the msg_type log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE); elog("%s\n", log_err.c_str()); obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); json_object_put(jobj_msg); return; } /* Enqueue the message to its instance message list */ inst_ptr->message_list.push_back(jobj_msg); } /********************************************************************************* * * Name : parser (guestInstClass::private) * * Purpose : parse message segments and feed valid message to process_msg * *********************************************************************************/ void guestInstClass::parser(char *buf, ssize_t len, json_tokener* tok, int newline_found, struct guestInstClass::inst * inst_ptr) { json_object *jobj = json_tokener_parse_ex(tok, buf, len); enum json_tokener_error jerr = json_tokener_get_error(tok); if (jerr == json_tokener_success) { process_msg(jobj, inst_ptr); return; } else if (jerr == json_tokener_continue) { // partial JSON is parsed , continue to read from socket. if (newline_found) { // if newline was found in the middle of the buffer, the message // should be completed at this point. Throw out incomplete message // by resetting tokener. json_tokener_reset(tok); } } else { // parsing error json_tokener_reset(tok); } } /********************************************************************************* * * Name : handle_virtio_serial_msg (guestInstClass::private) * * Purpose : handle delimitation and assembly of message stream * * Description: Multiple messages from the host can be bundled together into a * single "read" so we need to check message boundaries and handle * breaking the message apart. Assume a valid message does not * contain newline '\n', and newline is added to the beginning and * end of each message by the sender to delimit the boundaries. * *********************************************************************************/ void guestInstClass::handle_virtio_serial_msg( char *buf, ssize_t len, json_tokener* tok, struct guestInstClass::inst * inst_ptr) { char *newline; ssize_t len_head; next: if (len <= 0) return; // search for newline as delimiter newline = (char *)memchr((char *)buf, '\n', len); if (newline) { // split buffer to head and tail at the location of newline. // feed the head to the parser and recursively process the tail. len_head = newline-buf; // parse head if (len_head > 0) parser(buf, len_head, tok, 1, inst_ptr); // start of the tail: skip newline buf += len_head+1; // length of the tail: deduct 1 for the newline character len -= len_head+1; // continue to process the tail. goto next; } else { parser(buf, len, tok, 0, inst_ptr); } } /********************************************************************************* * * Name : readInst (guestInstClass::private) * * Purpose : try to receive a single message from all instances. * * Description: Each received message is enqueued into the associated * instance's message queue. * *********************************************************************************/ int fail_count = 0 ; void guestInstClass::readInst ( void ) { int rc ; std::list socks ; waitd.tv_sec = 0; waitd.tv_usec = GUEST_SOCKET_TO; /* Initialize the master fd_set */ FD_ZERO(&instance_readfds); socks.clear(); for ( struct inst * inst_ptr = inst_head ; inst_ptr != NULL ; inst_ptr = inst_ptr->next ) { if ( inst_ptr->instance.connected ) { socks.push_front( inst_ptr->instance.chan_fd ); FD_SET(inst_ptr->instance.chan_fd, &instance_readfds); } if (( inst_ptr->next == NULL ) || ( inst_ptr == inst_tail )) break ; } /* if there are no connected instance channels then exit */ if ( socks.empty() ) { return ; } /* Call select() and wait only up to SOCKET_WAIT */ socks.sort(); rc = select( socks.back()+1, &instance_readfds, NULL, NULL, &waitd); if (( rc <= 0 ) || ( rc > (int)socks.size())) { /* Check to see if the select call failed. */ if ( rc > (int)socks.size()) { wlog_throttled ( fail_count, 100, "select return exceeds current file descriptors (%ld:%d)\n", socks.size(), rc ); } /* ... but filter Interrupt signal */ else if (( rc < 0 ) && ( errno != EINTR )) { wlog_throttled ( fail_count, 100, "socket select failed (%d:%m)\n", errno); } else { mlog3 ("nothing received from %ld instances; socket timeout (%d:%m)\n", socks.size(), errno ); } } else { fail_count = 0 ; mlog2 ("trying to receive for %ld instances\n", socks.size()); /* Search through all the instances for watched channels */ for ( struct inst * inst_ptr = inst_head ; inst_ptr != NULL ; inst_ptr = inst_ptr->next ) { mlog2 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(), inst_ptr->instance.chan_fd ); /* Service guestServer messages towards the local IP */ if (FD_ISSET(inst_ptr->instance.chan_fd, &instance_readfds) ) { char buf[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ; string name ; if( inst_ptr->instance.inst.empty() ) name = inst_ptr->instance.uuid ; else name = inst_ptr->instance.inst ; struct json_tokener* tok = json_tokener_new(); for ( int i = 0; i < INST_MSG_READ_COUNT; i++ ) { rc = read ( inst_ptr->instance.chan_fd, buf, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE); mlog2 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd ); if ( rc < 0 ) { if ( errno == EINTR ) { wlog_throttled ( inst_ptr->instance.failure_count, 100, "%s EINTR\n", name.c_str()); } else if ( errno == ECONNRESET ) { wlog ("%s connection reset ... closing\n", name.c_str()); /* Close the connection if we get a 'connection reset by peer' errno */ guestUtil_close_channel ( &inst_ptr->instance ); /* An element of the list is removed - need to break out */ } else if ( errno != EAGAIN ) { wlog_throttled ( inst_ptr->instance.failure_count, 100, "%s error (%d:%m)\n", name.c_str(), errno ); } else { mlog3 ("%s no more messages\n", name.c_str()); } break ; } else if ( rc == 0 ) { mlog3 ("%s no message\n" , name.c_str()); break ; } else { if ( rc < GUEST_HEARTBEAT_MSG_MIN_MSG_SIZE ) { wlog_throttled ( inst_ptr->instance.failure_count, 100, "%s message size %d is smaller than minimal %d; dropping\n", name.c_str(), rc, GUEST_HEARTBEAT_MSG_MIN_MSG_SIZE); } else if ( inst_ptr->message_list.size() > MAX_MESSAGES ) { wlog_throttled ( inst_ptr->instance.failure_count, 100, "%s message queue overflow (max:%d) ; dropping\n", name.c_str(), MAX_MESSAGES ); } else { inst_ptr->instance.failure_count = 0 ; mlog2 ("%s handling message buf: %s\n", name.c_str(), buf ); handle_virtio_serial_msg(buf, rc, tok, inst_ptr); } } } json_tokener_free(tok); } if (( inst_ptr->next == NULL ) || ( inst_ptr == inst_tail )) break ; } } }