Merge "Rewrite virtio message handler for guest heartbeat"

This commit is contained in:
Zuul 2018-06-25 13:01:17 +00:00 committed by Gerrit Code Review
commit bd36605a6d
3 changed files with 191 additions and 126 deletions

View File

@ -2,7 +2,7 @@
#define __INCLUDE_INSTBASECLASS_H__
/*
* Copyright (c) 2013-2016 Wind River Systems, Inc.
* Copyright (c) 2013-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
@ -13,6 +13,7 @@
* Wind River CGTS Platform Guest Services "Instances Base Class Header"
*/
#include <json-c/json.h>
#include "guestBase.h" /* for ... instInfo */
typedef enum
@ -77,6 +78,10 @@ class guestInstClass
#define INST_TIMER_VOTE (4)
#define INST_TIMER_MAX (5)
// number of continuous reads for an instance to deal with
// potential message burst
#define INST_MSG_READ_COUNT 5
/** General Purpose instance timer */
// struct mtc_timer timer;
struct mtc_timer vote_timer;
@ -155,6 +160,9 @@ class guestInstClass
void manage_comm_loss ( void );
void mem_log_info ( void );
void process_msg(json_object *jobj_msg, struct guestInstClass::inst * inst_ptr);
void parser(char *buf, ssize_t len, json_tokener* tok, int newline_found, struct guestInstClass::inst * inst_ptr);
void handle_virtio_serial_msg(char *buf, ssize_t len, json_tokener* tok, struct guestInstClass::inst * inst_ptr);
public:

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Wind River Systems, Inc.
* Copyright (c) 2015-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
@ -683,26 +683,27 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if ( inst_ptr->message_list.size() )
{
struct json_object *jobj_msg = inst_ptr->message_list.front();
inst_ptr->message_list.pop_front();
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &inst_ptr->instance.version) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VERSION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_REVISION, &inst_ptr->instance.revision) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_REVISION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &inst_ptr->instance.msg_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_MSG_TYPE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SEQUENCE, &inst_ptr->instance.sequence) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SEQUENCE, jobj_msg);
break;
return FAIL;
}
mlog1 ("%s:%s message - Seq:%x Ver:%d.%d Fd:%d\n",
@ -730,22 +731,22 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, &heartbeat_response) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, &heartbeat_health) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &corrective_action) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &log_msg) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg);
break;
return FAIL;
}
if ( heartbeat_response != inst_ptr->instance.heartbeat_challenge)
@ -848,9 +849,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
hb_get_state_name(inst_ptr->instance.hbState),
inst_ptr->instance.sequence);
}
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
break ;
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) )
@ -869,8 +867,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
/* Allow the heartbeat challenge response message log */
inst_ptr->instance.message_count = 0 ;
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
else
{
@ -880,55 +876,55 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &inst_ptr->instance.invocation_id) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NAME, &instance_name) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NAME, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &inst_ptr->instance.corrective_action) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, &inst_ptr->instance.heartbeat_interval_ms) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_SECS, &inst_ptr->instance.vote_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, &inst_ptr->instance.shutdown_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, &inst_ptr->instance.suspend_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, &inst_ptr->instance.resume_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESTART_SECS, &inst_ptr->instance.restart_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESTART_SECS, jobj_msg);
break;
return FAIL;
}
inst_ptr->instance.name = instance_name;
@ -1019,9 +1015,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
inst_ptr->instance.heartbeat.failed = false ;
send_challenge ( inst_ptr ) ;
/* send_challenge() will clear the message_list so no need to pop the msg here */
json_object_put(jobj_msg);
inst_ptr->messageStage = INST_MESSAGE__RECEIVE ;
}
}
@ -1035,7 +1028,7 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &invocation_id) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg);
break;
return FAIL;
}
if ( invocation_id != inst_ptr->instance.invocation_id )
@ -1060,25 +1053,24 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if(jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_EVENT_TYPE, &event_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_EVENT_TYPE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, &notification_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_RESULT, &vote_result) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_RESULT, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &reject_reason) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg);
break;
return FAIL;
}
send_vote_notify_resp (get_ctrl_ptr()->hostname,
inst_ptr->instance.uuid,
notification_type,
@ -1104,9 +1096,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
mtcTimer_stop ( inst_ptr->vote_timer );
inst_ptr->vote_timer.ring = false ;
}
/* Delete the message */
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_EXIT) )
{
@ -1119,9 +1108,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
hbStatusChange ( &inst_ptr->instance, false );
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
else
{
@ -1132,14 +1118,9 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
string log_err = "unsupported message type: ";
log_err.append(inst_ptr->instance.msg_type);
send_client_msg_nack(&inst_ptr->instance, log_err);
/* Delete the message */
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
json_object_put(jobj_msg);
}
/* Global case break */
break ;
}
@ -1188,9 +1169,6 @@ int guestInstClass::send_challenge ( struct guestInstClass::inst * inst_ptr )
bytes_sent, message.length() );
}
/* Clear the message queue and wait for the challenge response */
inst_ptr->message_list.clear ();
/* Waiting on a response now */
inst_ptr->instance.heartbeat.waiting = true ;
@ -1290,9 +1268,6 @@ int guestInstClass::send_vote_notify ( string uuid )
bytes_sent, message.length() );
}
/* Clear the message queue and wait for the vote response */
inst_ptr->message_list.clear ();
if ( inst_ptr->vote_timer.tid )
mtcTimer_stop ( inst_ptr->vote_timer );
mtcTimer_start ( inst_ptr->vote_timer, guestTimer_handler, inst_ptr->instance.vote_secs );
@ -1439,7 +1414,6 @@ void guestInstClass::handle_parse_failure ( struct guestInstClass::inst * inst_p
log_err.append(key);
elog("%s %s\n", log_prefix(&inst_ptr->instance).c_str(), log_err.c_str());
send_client_msg_nack(&inst_ptr->instance, log_err);
inst_ptr->message_list.pop_front();
/* pop_front() only deletes the internal copy of jobj_msg in the message_list.
The original object still needs to be released here */
json_object_put(jobj_msg);

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2016 Wind River Systems, Inc.
* Copyright (c) 2013-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
@ -485,6 +485,151 @@ ssize_t guestInstClass::write_inst ( instInfo * instInfo_ptr,
return (len);
}
/*********************************************************************************
*
* Name : process_msg (guestInstClass::private)
*
* Purpose : process delimited message
*
*********************************************************************************/
void guestInstClass::process_msg(json_object *jobj_msg,
struct guestInstClass::inst * inst_ptr)
{
int version;
string msg_type;
string log_err = "failed to parse ";
guestInstClass * obj_ptr = get_instInv_ptr();
//parse incoming msg
if (jobj_msg == NULL)
{
wlog("%s\n", log_err.c_str());
return;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS)
{
// fail to parse the version
log_err.append(GUEST_HEARTBEAT_MSG_VERSION);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
return;
}
if ( version < GUEST_HEARTBEAT_MSG_VERSION_CURRENT)
{
char log_err_str[100];
sprintf(log_err_str, "Bad version: %d, expect version: %d",
version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT);
elog("%s\n", log_err_str);
log_err = log_err_str;
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
return;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS)
{
// fail to parse the msg_type
log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
return;
}
/* Enqueue the message to its instance message list */
inst_ptr->message_list.push_back(jobj_msg);
}
/*********************************************************************************
*
* Name : parser (guestInstClass::private)
*
* Purpose : parse message segments and feed valid message to process_msg
*
*********************************************************************************/
void guestInstClass::parser(char *buf,
ssize_t len,
json_tokener* tok,
int newline_found,
struct guestInstClass::inst * inst_ptr)
{
json_object *jobj = json_tokener_parse_ex(tok, buf, len);
enum json_tokener_error jerr = json_tokener_get_error(tok);
if (jerr == json_tokener_success) {
process_msg(jobj, inst_ptr);
return;
}
else if (jerr == json_tokener_continue) {
// partial JSON is parsed , continue to read from socket.
if (newline_found) {
// if newline was found in the middle of the buffer, the message
// should be completed at this point. Throw out incomplete message
// by resetting tokener.
json_tokener_reset(tok);
}
}
else
{
// parsing error
json_tokener_reset(tok);
}
}
/*********************************************************************************
*
* Name : handle_virtio_serial_msg (guestInstClass::private)
*
* Purpose : handle delimitation and assembly of message stream
*
* Description: Multiple messages from the host can be bundled together into a
* single "read" so we need to check message boundaries and handle
* breaking the message apart. Assume a valid message does not
* contain newline '\n', and newline is added to the beginning and
* end of each message by the sender to delimit the boundaries.
*
*********************************************************************************/
void guestInstClass::handle_virtio_serial_msg(
char *buf,
ssize_t len,
json_tokener* tok,
struct guestInstClass::inst * inst_ptr)
{
char *newline;
ssize_t len_head;
next:
if (len <= 0)
return;
// search for newline as delimiter
newline = (char *)memchr((char *)buf, '\n', len);
if (newline) {
// split buffer to head and tail at the location of newline.
// feed the head to the parser and recursively process the tail.
len_head = newline-buf;
// parse head
if (len_head > 0)
parser(buf, len_head, tok, 1, inst_ptr);
// start of the tail: skip newline
buf += len_head+1;
// length of the tail: deduct 1 for the newline character
len -= len_head+1;
// continue to process the tail.
goto next;
}
else {
parser(buf, len, tok, 0, inst_ptr);
}
}
/*********************************************************************************
*
@ -506,8 +651,6 @@ void guestInstClass::readInst ( void )
waitd.tv_sec = 0;
waitd.tv_usec = GUEST_SOCKET_TO;
struct json_object *jobj_msg = NULL;
/* Initialize the master fd_set */
FD_ZERO(&instance_readfds);
@ -560,16 +703,13 @@ void guestInstClass::readInst ( void )
/* Search through all the instances for watched channels */
for ( struct inst * inst_ptr = inst_head ; inst_ptr != NULL ; inst_ptr = inst_ptr->next )
{
mlog1 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(),
mlog2 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(),
inst_ptr->instance.chan_fd );
/* Service guestServer messages towards the local IP */
if (FD_ISSET(inst_ptr->instance.chan_fd, &instance_readfds) )
{
bool message_present ;
int count ;
string last_message_type ;
char vm_message[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ;
char buf[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ;
string name ;
if( inst_ptr->instance.inst.empty() )
@ -577,14 +717,12 @@ void guestInstClass::readInst ( void )
else
name = inst_ptr->instance.inst ;
count = 0 ;
last_message_type = GUEST_HEARTBEAT_MSG_INIT_ACK ;
do
struct json_tokener* tok = json_tokener_new();
for ( int i = 0; i < INST_MSG_READ_COUNT; i++ )
{
message_present = false ;
rc = read ( inst_ptr->instance.chan_fd, vm_message, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE);
mlog3 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd );
rc = read ( inst_ptr->instance.chan_fd, buf, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE);
mlog2 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd );
if ( rc < 0 )
{
if ( errno == EINTR )
@ -632,67 +770,12 @@ void guestInstClass::readInst ( void )
else
{
inst_ptr->instance.failure_count = 0 ;
jobj_msg = json_tokener_parse(vm_message);
int version;
string msg_type;
string log_err = "failed to parse ";
guestInstClass * obj_ptr = get_instInv_ptr();
//parse incoming msg
if (jobj_msg == NULL)
{
wlog("failed to parse msg\n");
continue;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS)
{
// fail to parse the version
log_err.append(GUEST_HEARTBEAT_MSG_VERSION);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
continue;
}
if ( version != GUEST_HEARTBEAT_MSG_VERSION_CURRENT)
{
char log_err_str[100];
sprintf(log_err_str, "Bad version: %d, expect version: %d",
version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT);
elog("%s\n", log_err_str);
log_err = log_err_str;
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
continue;
}
message_present = true ;
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS)
{
// fail to parse the msg_type
log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
continue;
}
mlog2 ("%s '%s' message\n", name.c_str(), msg_type.c_str());
/* Try and purge out old init messages */
if (!msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) &&
!msg_type.compare(last_message_type) )
{
inst_ptr->message_list.pop_back();
ilog ("%s deleting stale init message\n", name.c_str());
}
/* Enqueue the message to its instance message list */
inst_ptr->message_list.push_back(jobj_msg);
last_message_type = msg_type ;
mlog2 ("%s handling message buf: %s\n", name.c_str(), buf );
handle_virtio_serial_msg(buf, rc, tok, inst_ptr);
}
}
} while ( ( message_present == true ) && ( ++count<10 ) ) ;
}
json_tokener_free(tok);
}
if (( inst_ptr->next == NULL ) || ( inst_ptr == inst_tail ))
break ;