Rewrite virtio message handler for guest heartbeat

Due to race conditions, multiple messages might be received from a
single read by guestServer. guestServer in this case would only handle
the first message and discard the remaining ones.

In this particular issue, guestServer received a heartbeat challenge
response message and a vote notification response (reject) message from
a single read, and the latter message was discarded.

This fix rewrites message handler for virtio serial channel to handle
segmented and multiple messages. It uses newline character to deliminate
messages so it assumes any newline characters in client log message are
removed.

Change-Id: Ic6f0509c98fcedf3631f4d210f753c32c37aa442
Signed-off-by: Jack Ding <jack.ding@windriver.com>
This commit is contained in:
Jack Ding 2018-05-25 12:52:51 -04:00
parent 89e4e574e8
commit 94cdbb73d4
3 changed files with 191 additions and 126 deletions

View File

@ -2,7 +2,7 @@
#define __INCLUDE_INSTBASECLASS_H__
/*
* Copyright (c) 2013-2016 Wind River Systems, Inc.
* Copyright (c) 2013-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
@ -13,6 +13,7 @@
* Wind River CGTS Platform Guest Services "Instances Base Class Header"
*/
#include <json-c/json.h>
#include "guestBase.h" /* for ... instInfo */
typedef enum
@ -77,6 +78,10 @@ class guestInstClass
#define INST_TIMER_VOTE (4)
#define INST_TIMER_MAX (5)
// number of continuous reads for an instance to deal with
// potential message burst
#define INST_MSG_READ_COUNT 5
/** General Purpose instance timer */
// struct mtc_timer timer;
struct mtc_timer vote_timer;
@ -155,6 +160,9 @@ class guestInstClass
void manage_comm_loss ( void );
void mem_log_info ( void );
void process_msg(json_object *jobj_msg, struct guestInstClass::inst * inst_ptr);
void parser(char *buf, ssize_t len, json_tokener* tok, int newline_found, struct guestInstClass::inst * inst_ptr);
void handle_virtio_serial_msg(char *buf, ssize_t len, json_tokener* tok, struct guestInstClass::inst * inst_ptr);
public:

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2016 Wind River Systems, Inc.
* Copyright (c) 2015-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
@ -683,26 +683,27 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if ( inst_ptr->message_list.size() )
{
struct json_object *jobj_msg = inst_ptr->message_list.front();
inst_ptr->message_list.pop_front();
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &inst_ptr->instance.version) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VERSION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_REVISION, &inst_ptr->instance.revision) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_REVISION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &inst_ptr->instance.msg_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_MSG_TYPE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SEQUENCE, &inst_ptr->instance.sequence) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SEQUENCE, jobj_msg);
break;
return FAIL;
}
mlog1 ("%s:%s message - Seq:%x Ver:%d.%d Fd:%d\n",
@ -730,22 +731,22 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, &heartbeat_response) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, &heartbeat_health) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &corrective_action) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &log_msg) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg);
break;
return FAIL;
}
if ( heartbeat_response != inst_ptr->instance.heartbeat_challenge)
@ -848,9 +849,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
hb_get_state_name(inst_ptr->instance.hbState),
inst_ptr->instance.sequence);
}
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
break ;
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) )
@ -869,8 +867,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
/* Allow the heartbeat challenge response message log */
inst_ptr->instance.message_count = 0 ;
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
else
{
@ -880,55 +876,55 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &inst_ptr->instance.invocation_id) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NAME, &instance_name) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NAME, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &inst_ptr->instance.corrective_action) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, &inst_ptr->instance.heartbeat_interval_ms) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_SECS, &inst_ptr->instance.vote_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, &inst_ptr->instance.shutdown_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, &inst_ptr->instance.suspend_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, &inst_ptr->instance.resume_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESTART_SECS, &inst_ptr->instance.restart_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESTART_SECS, jobj_msg);
break;
return FAIL;
}
inst_ptr->instance.name = instance_name;
@ -1019,9 +1015,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
inst_ptr->instance.heartbeat.failed = false ;
send_challenge ( inst_ptr ) ;
/* send_challenge() will clear the message_list so no need to pop the msg here */
json_object_put(jobj_msg);
inst_ptr->messageStage = INST_MESSAGE__RECEIVE ;
}
}
@ -1035,7 +1028,7 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &invocation_id) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg);
break;
return FAIL;
}
if ( invocation_id != inst_ptr->instance.invocation_id )
@ -1060,25 +1053,24 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
if(jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_EVENT_TYPE, &event_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_EVENT_TYPE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, &notification_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_RESULT, &vote_result) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_RESULT, jobj_msg);
break;
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &reject_reason) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg);
break;
return FAIL;
}
send_vote_notify_resp (get_ctrl_ptr()->hostname,
inst_ptr->instance.uuid,
notification_type,
@ -1104,9 +1096,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
mtcTimer_stop ( inst_ptr->vote_timer );
inst_ptr->vote_timer.ring = false ;
}
/* Delete the message */
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_EXIT) )
{
@ -1119,9 +1108,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
hbStatusChange ( &inst_ptr->instance, false );
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
else
{
@ -1132,14 +1118,9 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
string log_err = "unsupported message type: ";
log_err.append(inst_ptr->instance.msg_type);
send_client_msg_nack(&inst_ptr->instance, log_err);
/* Delete the message */
inst_ptr->message_list.pop_front();
json_object_put(jobj_msg);
}
json_object_put(jobj_msg);
}
/* Global case break */
break ;
}
@ -1188,9 +1169,6 @@ int guestInstClass::send_challenge ( struct guestInstClass::inst * inst_ptr )
bytes_sent, message.length() );
}
/* Clear the message queue and wait for the challenge response */
inst_ptr->message_list.clear ();
/* Waiting on a response now */
inst_ptr->instance.heartbeat.waiting = true ;
@ -1290,9 +1268,6 @@ int guestInstClass::send_vote_notify ( string uuid )
bytes_sent, message.length() );
}
/* Clear the message queue and wait for the vote response */
inst_ptr->message_list.clear ();
if ( inst_ptr->vote_timer.tid )
mtcTimer_stop ( inst_ptr->vote_timer );
mtcTimer_start ( inst_ptr->vote_timer, guestTimer_handler, inst_ptr->instance.vote_secs );
@ -1439,7 +1414,6 @@ void guestInstClass::handle_parse_failure ( struct guestInstClass::inst * inst_p
log_err.append(key);
elog("%s %s\n", log_prefix(&inst_ptr->instance).c_str(), log_err.c_str());
send_client_msg_nack(&inst_ptr->instance, log_err);
inst_ptr->message_list.pop_front();
/* pop_front() only deletes the internal copy of jobj_msg in the message_list.
The original object still needs to be released here */
json_object_put(jobj_msg);

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2016 Wind River Systems, Inc.
* Copyright (c) 2013-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
@ -485,6 +485,151 @@ ssize_t guestInstClass::write_inst ( instInfo * instInfo_ptr,
return (len);
}
/*********************************************************************************
*
* Name : process_msg (guestInstClass::private)
*
* Purpose : process delimited message
*
*********************************************************************************/
void guestInstClass::process_msg(json_object *jobj_msg,
struct guestInstClass::inst * inst_ptr)
{
int version;
string msg_type;
string log_err = "failed to parse ";
guestInstClass * obj_ptr = get_instInv_ptr();
//parse incoming msg
if (jobj_msg == NULL)
{
wlog("%s\n", log_err.c_str());
return;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS)
{
// fail to parse the version
log_err.append(GUEST_HEARTBEAT_MSG_VERSION);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
return;
}
if ( version < GUEST_HEARTBEAT_MSG_VERSION_CURRENT)
{
char log_err_str[100];
sprintf(log_err_str, "Bad version: %d, expect version: %d",
version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT);
elog("%s\n", log_err_str);
log_err = log_err_str;
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
return;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS)
{
// fail to parse the msg_type
log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
return;
}
/* Enqueue the message to its instance message list */
inst_ptr->message_list.push_back(jobj_msg);
}
/*********************************************************************************
*
* Name : parser (guestInstClass::private)
*
* Purpose : parse message segments and feed valid message to process_msg
*
*********************************************************************************/
void guestInstClass::parser(char *buf,
ssize_t len,
json_tokener* tok,
int newline_found,
struct guestInstClass::inst * inst_ptr)
{
json_object *jobj = json_tokener_parse_ex(tok, buf, len);
enum json_tokener_error jerr = json_tokener_get_error(tok);
if (jerr == json_tokener_success) {
process_msg(jobj, inst_ptr);
return;
}
else if (jerr == json_tokener_continue) {
// partial JSON is parsed , continue to read from socket.
if (newline_found) {
// if newline was found in the middle of the buffer, the message
// should be completed at this point. Throw out incomplete message
// by resetting tokener.
json_tokener_reset(tok);
}
}
else
{
// parsing error
json_tokener_reset(tok);
}
}
/*********************************************************************************
*
* Name : handle_virtio_serial_msg (guestInstClass::private)
*
* Purpose : handle delimitation and assembly of message stream
*
* Description: Multiple messages from the host can be bundled together into a
* single "read" so we need to check message boundaries and handle
* breaking the message apart. Assume a valid message does not
* contain newline '\n', and newline is added to the beginning and
* end of each message by the sender to delimit the boundaries.
*
*********************************************************************************/
void guestInstClass::handle_virtio_serial_msg(
char *buf,
ssize_t len,
json_tokener* tok,
struct guestInstClass::inst * inst_ptr)
{
char *newline;
ssize_t len_head;
next:
if (len <= 0)
return;
// search for newline as delimiter
newline = (char *)memchr((char *)buf, '\n', len);
if (newline) {
// split buffer to head and tail at the location of newline.
// feed the head to the parser and recursively process the tail.
len_head = newline-buf;
// parse head
if (len_head > 0)
parser(buf, len_head, tok, 1, inst_ptr);
// start of the tail: skip newline
buf += len_head+1;
// length of the tail: deduct 1 for the newline character
len -= len_head+1;
// continue to process the tail.
goto next;
}
else {
parser(buf, len, tok, 0, inst_ptr);
}
}
/*********************************************************************************
*
@ -506,8 +651,6 @@ void guestInstClass::readInst ( void )
waitd.tv_sec = 0;
waitd.tv_usec = GUEST_SOCKET_TO;
struct json_object *jobj_msg = NULL;
/* Initialize the master fd_set */
FD_ZERO(&instance_readfds);
@ -560,16 +703,13 @@ void guestInstClass::readInst ( void )
/* Search through all the instances for watched channels */
for ( struct inst * inst_ptr = inst_head ; inst_ptr != NULL ; inst_ptr = inst_ptr->next )
{
mlog1 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(),
mlog2 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(),
inst_ptr->instance.chan_fd );
/* Service guestServer messages towards the local IP */
if (FD_ISSET(inst_ptr->instance.chan_fd, &instance_readfds) )
{
bool message_present ;
int count ;
string last_message_type ;
char vm_message[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ;
char buf[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ;
string name ;
if( inst_ptr->instance.inst.empty() )
@ -577,14 +717,12 @@ void guestInstClass::readInst ( void )
else
name = inst_ptr->instance.inst ;
count = 0 ;
last_message_type = GUEST_HEARTBEAT_MSG_INIT_ACK ;
do
struct json_tokener* tok = json_tokener_new();
for ( int i = 0; i < INST_MSG_READ_COUNT; i++ )
{
message_present = false ;
rc = read ( inst_ptr->instance.chan_fd, vm_message, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE);
mlog3 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd );
rc = read ( inst_ptr->instance.chan_fd, buf, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE);
mlog2 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd );
if ( rc < 0 )
{
if ( errno == EINTR )
@ -632,67 +770,12 @@ void guestInstClass::readInst ( void )
else
{
inst_ptr->instance.failure_count = 0 ;
jobj_msg = json_tokener_parse(vm_message);
int version;
string msg_type;
string log_err = "failed to parse ";
guestInstClass * obj_ptr = get_instInv_ptr();
//parse incoming msg
if (jobj_msg == NULL)
{
wlog("failed to parse msg\n");
continue;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS)
{
// fail to parse the version
log_err.append(GUEST_HEARTBEAT_MSG_VERSION);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
continue;
}
if ( version != GUEST_HEARTBEAT_MSG_VERSION_CURRENT)
{
char log_err_str[100];
sprintf(log_err_str, "Bad version: %d, expect version: %d",
version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT);
elog("%s\n", log_err_str);
log_err = log_err_str;
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
continue;
}
message_present = true ;
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS)
{
// fail to parse the msg_type
log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE);
elog("%s\n", log_err.c_str());
obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err);
json_object_put(jobj_msg);
continue;
}
mlog2 ("%s '%s' message\n", name.c_str(), msg_type.c_str());
/* Try and purge out old init messages */
if (!msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) &&
!msg_type.compare(last_message_type) )
{
inst_ptr->message_list.pop_back();
ilog ("%s deleting stale init message\n", name.c_str());
}
/* Enqueue the message to its instance message list */
inst_ptr->message_list.push_back(jobj_msg);
last_message_type = msg_type ;
mlog2 ("%s handling message buf: %s\n", name.c_str(), buf );
handle_virtio_serial_msg(buf, rc, tok, inst_ptr);
}
}
} while ( ( message_present == true ) && ( ++count<10 ) ) ;
}
json_tokener_free(tok);
}
if (( inst_ptr->next == NULL ) || ( inst_ptr == inst_tail ))
break ;