diff --git a/mtce-common/cgts-mtce-common-1.0/guest/guestInstClass.h b/mtce-common/cgts-mtce-common-1.0/guest/guestInstClass.h index 308822d7..6b601e07 100644 --- a/mtce-common/cgts-mtce-common-1.0/guest/guestInstClass.h +++ b/mtce-common/cgts-mtce-common-1.0/guest/guestInstClass.h @@ -2,7 +2,7 @@ #define __INCLUDE_INSTBASECLASS_H__ /* - * Copyright (c) 2013-2016 Wind River Systems, Inc. + * Copyright (c) 2013-2018 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 * @@ -13,6 +13,7 @@ * Wind River CGTS Platform Guest Services "Instances Base Class Header" */ +#include #include "guestBase.h" /* for ... instInfo */ typedef enum @@ -77,6 +78,10 @@ class guestInstClass #define INST_TIMER_VOTE (4) #define INST_TIMER_MAX (5) + // number of continuous reads for an instance to deal with + // potential message burst + #define INST_MSG_READ_COUNT 5 + /** General Purpose instance timer */ // struct mtc_timer timer; struct mtc_timer vote_timer; @@ -155,6 +160,9 @@ class guestInstClass void manage_comm_loss ( void ); void mem_log_info ( void ); + void process_msg(json_object *jobj_msg, struct guestInstClass::inst * inst_ptr); + void parser(char *buf, ssize_t len, json_tokener* tok, int newline_found, struct guestInstClass::inst * inst_ptr); + void handle_virtio_serial_msg(char *buf, ssize_t len, json_tokener* tok, struct guestInstClass::inst * inst_ptr); public: diff --git a/mtce-common/cgts-mtce-common-1.0/guest/guestSvrHdlr.cpp b/mtce-common/cgts-mtce-common-1.0/guest/guestSvrHdlr.cpp index 6abdb8b4..c8ab31ca 100644 --- a/mtce-common/cgts-mtce-common-1.0/guest/guestSvrHdlr.cpp +++ b/mtce-common/cgts-mtce-common-1.0/guest/guestSvrHdlr.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2016 Wind River Systems, Inc. + * Copyright (c) 2015-2018 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 * @@ -683,26 +683,27 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) if ( inst_ptr->message_list.size() ) { struct json_object *jobj_msg = inst_ptr->message_list.front(); + inst_ptr->message_list.pop_front(); if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &inst_ptr->instance.version) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VERSION, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_REVISION, &inst_ptr->instance.revision) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_REVISION, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &inst_ptr->instance.msg_type) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_MSG_TYPE, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SEQUENCE, &inst_ptr->instance.sequence) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SEQUENCE, jobj_msg); - break; + return FAIL; } mlog1 ("%s:%s message - Seq:%x Ver:%d.%d Fd:%d\n", @@ -730,22 +731,22 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, &heartbeat_response) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, &heartbeat_health) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &corrective_action) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &log_msg) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg); - break; + return FAIL; } if ( heartbeat_response != inst_ptr->instance.heartbeat_challenge) @@ -848,9 +849,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) hb_get_state_name(inst_ptr->instance.hbState), inst_ptr->instance.sequence); } - inst_ptr->message_list.pop_front(); - json_object_put(jobj_msg); - break ; } else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) ) @@ -869,8 +867,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) /* Allow the heartbeat challenge response message log */ inst_ptr->instance.message_count = 0 ; beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ; - inst_ptr->message_list.pop_front(); - json_object_put(jobj_msg); } else { @@ -880,55 +876,55 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &inst_ptr->instance.invocation_id) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NAME, &instance_name) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NAME, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &inst_ptr->instance.corrective_action) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, &inst_ptr->instance.heartbeat_interval_ms) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_SECS, &inst_ptr->instance.vote_secs) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_SECS, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, &inst_ptr->instance.shutdown_notice_secs) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, &inst_ptr->instance.suspend_notice_secs) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, &inst_ptr->instance.resume_notice_secs) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESTART_SECS, &inst_ptr->instance.restart_secs) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESTART_SECS, jobj_msg); - break; + return FAIL; } inst_ptr->instance.name = instance_name; @@ -1019,9 +1015,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) inst_ptr->instance.heartbeat.b2b_misses = 0 ; inst_ptr->instance.heartbeat.failed = false ; send_challenge ( inst_ptr ) ; - /* send_challenge() will clear the message_list so no need to pop the msg here */ - json_object_put(jobj_msg); - inst_ptr->messageStage = INST_MESSAGE__RECEIVE ; } } @@ -1035,7 +1028,7 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &invocation_id) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg); - break; + return FAIL; } if ( invocation_id != inst_ptr->instance.invocation_id ) @@ -1060,25 +1053,24 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) if(jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_EVENT_TYPE, &event_type) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_EVENT_TYPE, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, ¬ification_type) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_RESULT, &vote_result) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_RESULT, jobj_msg); - break; + return FAIL; } if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &reject_reason) != PASS) { handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg); - break; + return FAIL; } - send_vote_notify_resp (get_ctrl_ptr()->hostname, inst_ptr->instance.uuid, notification_type, @@ -1104,9 +1096,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) mtcTimer_stop ( inst_ptr->vote_timer ); inst_ptr->vote_timer.ring = false ; } - /* Delete the message */ - inst_ptr->message_list.pop_front(); - json_object_put(jobj_msg); } else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_EXIT) ) { @@ -1119,9 +1108,6 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ; hbStatusChange ( &inst_ptr->instance, false ); - - inst_ptr->message_list.pop_front(); - json_object_put(jobj_msg); } else { @@ -1132,14 +1118,9 @@ int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr ) string log_err = "unsupported message type: "; log_err.append(inst_ptr->instance.msg_type); send_client_msg_nack(&inst_ptr->instance, log_err); - - /* Delete the message */ - inst_ptr->message_list.pop_front(); - json_object_put(jobj_msg); } + json_object_put(jobj_msg); } - - /* Global case break */ break ; } @@ -1188,9 +1169,6 @@ int guestInstClass::send_challenge ( struct guestInstClass::inst * inst_ptr ) bytes_sent, message.length() ); } - /* Clear the message queue and wait for the challenge response */ - inst_ptr->message_list.clear (); - /* Waiting on a response now */ inst_ptr->instance.heartbeat.waiting = true ; @@ -1290,9 +1268,6 @@ int guestInstClass::send_vote_notify ( string uuid ) bytes_sent, message.length() ); } - /* Clear the message queue and wait for the vote response */ - inst_ptr->message_list.clear (); - if ( inst_ptr->vote_timer.tid ) mtcTimer_stop ( inst_ptr->vote_timer ); mtcTimer_start ( inst_ptr->vote_timer, guestTimer_handler, inst_ptr->instance.vote_secs ); @@ -1439,7 +1414,6 @@ void guestInstClass::handle_parse_failure ( struct guestInstClass::inst * inst_p log_err.append(key); elog("%s %s\n", log_prefix(&inst_ptr->instance).c_str(), log_err.c_str()); send_client_msg_nack(&inst_ptr->instance, log_err); - inst_ptr->message_list.pop_front(); /* pop_front() only deletes the internal copy of jobj_msg in the message_list. The original object still needs to be released here */ json_object_put(jobj_msg); diff --git a/mtce-common/cgts-mtce-common-1.0/guest/guestSvrMsg.cpp b/mtce-common/cgts-mtce-common-1.0/guest/guestSvrMsg.cpp index 032c2bde..c5b4fde3 100644 --- a/mtce-common/cgts-mtce-common-1.0/guest/guestSvrMsg.cpp +++ b/mtce-common/cgts-mtce-common-1.0/guest/guestSvrMsg.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2016 Wind River Systems, Inc. + * Copyright (c) 2013-2018 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 * @@ -485,6 +485,151 @@ ssize_t guestInstClass::write_inst ( instInfo * instInfo_ptr, return (len); } +/********************************************************************************* + * + * Name : process_msg (guestInstClass::private) + * + * Purpose : process delimited message + * + *********************************************************************************/ +void guestInstClass::process_msg(json_object *jobj_msg, + struct guestInstClass::inst * inst_ptr) +{ + int version; + string msg_type; + string log_err = "failed to parse "; + guestInstClass * obj_ptr = get_instInv_ptr(); + + //parse incoming msg + if (jobj_msg == NULL) + { + wlog("%s\n", log_err.c_str()); + return; + } + + if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS) + { + // fail to parse the version + log_err.append(GUEST_HEARTBEAT_MSG_VERSION); + elog("%s\n", log_err.c_str()); + obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); + json_object_put(jobj_msg); + return; + } + + if ( version < GUEST_HEARTBEAT_MSG_VERSION_CURRENT) + { + char log_err_str[100]; + sprintf(log_err_str, "Bad version: %d, expect version: %d", + version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT); + elog("%s\n", log_err_str); + log_err = log_err_str; + obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); + json_object_put(jobj_msg); + return; + } + + if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS) + { + // fail to parse the msg_type + log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE); + elog("%s\n", log_err.c_str()); + obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); + json_object_put(jobj_msg); + return; + } + + /* Enqueue the message to its instance message list */ + inst_ptr->message_list.push_back(jobj_msg); +} + +/********************************************************************************* + * + * Name : parser (guestInstClass::private) + * + * Purpose : parse message segments and feed valid message to process_msg + * + *********************************************************************************/ +void guestInstClass::parser(char *buf, + ssize_t len, + json_tokener* tok, + int newline_found, + struct guestInstClass::inst * inst_ptr) +{ + json_object *jobj = json_tokener_parse_ex(tok, buf, len); + enum json_tokener_error jerr = json_tokener_get_error(tok); + + if (jerr == json_tokener_success) { + process_msg(jobj, inst_ptr); + return; + } + + else if (jerr == json_tokener_continue) { + // partial JSON is parsed , continue to read from socket. + if (newline_found) { + // if newline was found in the middle of the buffer, the message + // should be completed at this point. Throw out incomplete message + // by resetting tokener. + json_tokener_reset(tok); + } + } + else + { + // parsing error + json_tokener_reset(tok); + } +} + +/********************************************************************************* + * + * Name : handle_virtio_serial_msg (guestInstClass::private) + * + * Purpose : handle delimitation and assembly of message stream + * + * Description: Multiple messages from the host can be bundled together into a + * single "read" so we need to check message boundaries and handle + * breaking the message apart. Assume a valid message does not + * contain newline '\n', and newline is added to the beginning and + * end of each message by the sender to delimit the boundaries. + * + *********************************************************************************/ +void guestInstClass::handle_virtio_serial_msg( + char *buf, + ssize_t len, + json_tokener* tok, + struct guestInstClass::inst * inst_ptr) +{ + char *newline; + ssize_t len_head; + +next: + if (len <= 0) + return; + + // search for newline as delimiter + newline = (char *)memchr((char *)buf, '\n', len); + + if (newline) { + // split buffer to head and tail at the location of newline. + // feed the head to the parser and recursively process the tail. + len_head = newline-buf; + + // parse head + if (len_head > 0) + parser(buf, len_head, tok, 1, inst_ptr); + + // start of the tail: skip newline + buf += len_head+1; + // length of the tail: deduct 1 for the newline character + len -= len_head+1; + + // continue to process the tail. + goto next; + } + else { + parser(buf, len, tok, 0, inst_ptr); + } +} /********************************************************************************* * @@ -506,8 +651,6 @@ void guestInstClass::readInst ( void ) waitd.tv_sec = 0; waitd.tv_usec = GUEST_SOCKET_TO; - struct json_object *jobj_msg = NULL; - /* Initialize the master fd_set */ FD_ZERO(&instance_readfds); @@ -560,16 +703,13 @@ void guestInstClass::readInst ( void ) /* Search through all the instances for watched channels */ for ( struct inst * inst_ptr = inst_head ; inst_ptr != NULL ; inst_ptr = inst_ptr->next ) { - mlog1 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(), + mlog2 ("%s monitoring %d\n", inst_ptr->instance.inst.c_str(), inst_ptr->instance.chan_fd ); /* Service guestServer messages towards the local IP */ if (FD_ISSET(inst_ptr->instance.chan_fd, &instance_readfds) ) { - bool message_present ; - int count ; - string last_message_type ; - char vm_message[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ; + char buf[GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE] ; string name ; if( inst_ptr->instance.inst.empty() ) @@ -577,14 +717,12 @@ void guestInstClass::readInst ( void ) else name = inst_ptr->instance.inst ; - count = 0 ; - last_message_type = GUEST_HEARTBEAT_MSG_INIT_ACK ; - - do + struct json_tokener* tok = json_tokener_new(); + + for ( int i = 0; i < INST_MSG_READ_COUNT; i++ ) { - message_present = false ; - rc = read ( inst_ptr->instance.chan_fd, vm_message, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE); - mlog3 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd ); + rc = read ( inst_ptr->instance.chan_fd, buf, GUEST_HEARTBEAT_MSG_MAX_MSG_SIZE); + mlog2 ("%s read channel: bytes:%d, fd:%d\n", name.c_str(), rc,inst_ptr->instance.chan_fd ); if ( rc < 0 ) { if ( errno == EINTR ) @@ -632,67 +770,12 @@ void guestInstClass::readInst ( void ) else { inst_ptr->instance.failure_count = 0 ; - jobj_msg = json_tokener_parse(vm_message); - int version; - string msg_type; - string log_err = "failed to parse "; - guestInstClass * obj_ptr = get_instInv_ptr(); - - //parse incoming msg - if (jobj_msg == NULL) - { - wlog("failed to parse msg\n"); - continue; - } - - if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &version) != PASS) - { - // fail to parse the version - log_err.append(GUEST_HEARTBEAT_MSG_VERSION); - elog("%s\n", log_err.c_str()); - obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); - json_object_put(jobj_msg); - continue; - } - - if ( version != GUEST_HEARTBEAT_MSG_VERSION_CURRENT) - { - char log_err_str[100]; - sprintf(log_err_str, "Bad version: %d, expect version: %d", - version, GUEST_HEARTBEAT_MSG_VERSION_CURRENT); - elog("%s\n", log_err_str); - log_err = log_err_str; - obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); - json_object_put(jobj_msg); - continue; - } - - message_present = true ; - if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &msg_type) != PASS) - { - // fail to parse the msg_type - log_err.append(GUEST_HEARTBEAT_MSG_MSG_TYPE); - elog("%s\n", log_err.c_str()); - obj_ptr->send_client_msg_nack(&inst_ptr->instance, log_err); - json_object_put(jobj_msg); - continue; - } - - mlog2 ("%s '%s' message\n", name.c_str(), msg_type.c_str()); - - /* Try and purge out old init messages */ - if (!msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) && - !msg_type.compare(last_message_type) ) - { - inst_ptr->message_list.pop_back(); - ilog ("%s deleting stale init message\n", name.c_str()); - } - /* Enqueue the message to its instance message list */ - inst_ptr->message_list.push_back(jobj_msg); - last_message_type = msg_type ; + mlog2 ("%s handling message buf: %s\n", name.c_str(), buf ); + handle_virtio_serial_msg(buf, rc, tok, inst_ptr); } } - } while ( ( message_present == true ) && ( ++count<10 ) ) ; + } + json_tokener_free(tok); } if (( inst_ptr->next == NULL ) || ( inst_ptr == inst_tail )) break ;