/* * Copyright (c) 2013, 2016 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 * */ /** * @file * Wind River CGTS Platform Controller Maintenance HTTP Utilities. * * Public Interfaces: tied to the nodeLinkClass * * nodeLinkClass::workQueue_enqueue * nodeLinkClass::doneQueue_dequeue * * * Private Helper Utilities: * * _get_work_state_str * _get_event_log_prefix_string * * */ using namespace std; #include "nodeClass.h" /* for ... maintenance class nodeLinkClass */ #include "mtcHttpUtil.h" /* this module header */ #include "mtcNodeHdlrs.h" /* for ... mtcTimer_handl */ #include "nodeUtil.h" /* for ... common Node Utilities */ #define QUEUE_OVERLOAD (40) string _get_work_state_str ( httpStages_enum state ) { if ( state == HTTP__TRANSMIT ) return ("Tx "); else if ( state == HTTP__RECEIVE ) return (" Rx"); else if ( state == HTTP__FAILURE ) return (" Er "); else if ( state == HTTP__RECEIVE_WAIT ) return ("Wait"); else { elog ("Invalid Http Work Queue State: %d\n", state ); return ("----"); } } void nodeLinkClass::workQueue_dump ( struct nodeLinkClass::node * node_ptr ) { if ( node_ptr->libEvent_work_fifo.size() ) { syslog ( LOG_INFO, "\n"); syslog ( LOG_INFO, "+------+-------+--------------+---------+---------------+----------------------+\n"); syslog ( LOG_INFO, "| Mode | Seq | Hostname | Service | Request | IP Address Port | Payload ...\n"); syslog ( LOG_INFO, "+------+-------+--------------+---------+---------------+----------------------+\n"); for ( node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); node_ptr->libEvent_work_fifo_ptr != node_ptr->libEvent_work_fifo.end(); node_ptr->libEvent_work_fifo_ptr ++ ) { syslog ( LOG_INFO, "| %-4s | %5d | %-12s | %-7s | %-13s | %15s:%d | %s\n", _get_work_state_str(node_ptr->libEvent_work_fifo_ptr->state).c_str(), node_ptr->libEvent_work_fifo_ptr->sequence, node_ptr->libEvent_work_fifo_ptr->hostname.c_str(), node_ptr->libEvent_work_fifo_ptr->service.c_str(), node_ptr->libEvent_work_fifo_ptr->operation.c_str(), node_ptr->libEvent_work_fifo_ptr->ip.c_str(), node_ptr->libEvent_work_fifo_ptr->port, node_ptr->libEvent_work_fifo_ptr->payload.c_str()); } syslog ( LOG_INFO, "+------+-------+--------------+---------+--------------+----------------------+\n"); } else { dlog ("%s work queue is empty\n", node_ptr->hostname.c_str()); } } void nodeLinkClass::workQueue_dump_all ( void ) { struct node * ptr = static_cast(NULL) ; /* check for empty list condition */ if ( head == NULL ) { syslog ( LOG_INFO, "No inventory\n"); return ; } /* Now search the node list */ for ( ptr = head ; ptr != NULL ; ptr = ptr->next ) { workQueue_dump ( ptr ); } } void nodeLinkClass::doneQueue_dump ( struct nodeLinkClass::node * node_ptr ) { if ( node_ptr->libEvent_done_fifo.size() ) { for ( node_ptr->libEvent_done_fifo_ptr = node_ptr->libEvent_done_fifo.begin(); node_ptr->libEvent_done_fifo_ptr != node_ptr->libEvent_done_fifo.end(); node_ptr->libEvent_done_fifo_ptr ++ ) { syslog ( LOG_INFO, "%15s httpReq doneQueue:%5d - %s '%s' -> Status:%d\n", node_ptr->libEvent_done_fifo_ptr->hostname.c_str(), node_ptr->libEvent_done_fifo_ptr->sequence, node_ptr->libEvent_done_fifo_ptr->service.c_str(), node_ptr->libEvent_done_fifo_ptr->operation.c_str(), node_ptr->libEvent_done_fifo_ptr->status ); } } } void nodeLinkClass::doneQueue_dump_all ( void ) { struct node * ptr = static_cast(NULL) ; /* check for empty list condition */ if ( head == NULL ) { syslog ( LOG_INFO, "\nNo inventory\n"); return ; } /* Now search the node list */ for ( ptr = head ; ptr != NULL ; ptr = ptr->next ) { doneQueue_dump ( ptr ); doneQueue_purge ( ptr ); } } /* *********************************************************************** * * Name : nodeLinkClass::workQueue_enqueue * * Description: Adds the next sequence number to the supplied event * reference, creates a log prefix based on the event's * hostname, service, operation and sequence number * (to avoid repeated recreation) and then copies that * event to the work queue. * * @param event is a reference to the callers libEvent. * @return an integer with value of PASS. * * ************************************************************************/ int nodeLinkClass::workQueue_enqueue ( libEvent & event ) { char seq_str[64] ; memset ( &seq_str[0], 0 , 64 ); GET_NODE_PTR(event.hostname) ; event.sequence = node_ptr->oper_sequence++ ; sprintf ( &seq_str[0], "%d", event.sequence ); event.log_prefix = event.hostname ; event.log_prefix.append (" "); event.log_prefix.append (event.service) ; //event.log_prefix.append (" '"); //event.log_prefix.append (event.operation) ; //event.log_prefix.append ("' seq:"); event.log_prefix.append (" seq:"); event.log_prefix.append (seq_str) ; node_ptr->libEvent_work_fifo.push_back(event); qlog ("%s Enqueued\n", event.log_prefix.c_str()); return (PASS) ; } /* *********************************************************************** * * Name : nodeLinkClass::doneQueue_dequeue * * Description: Searches the done queue for the event matching the supplied * event reference , specifically the sequence number. If found * it pulls the execution status information and then proceeds * to remove it from the done queue. * * If the event is found then the event status is returned. * if not found then a RETRY is returned. * If the done event status is RETRY then a FAIL is returned since * it should not be on the done queue with a retry status. * * @param event is a reference to the callers libEvent. * @return an integer with values of PASS, FAIL, RETRY * * ************************************************************************/ int nodeLinkClass::doneQueue_dequeue ( libEvent & event ) { int rc = FAIL ; bool found = false ; GET_NODE_PTR(event.hostname) ; for ( node_ptr->libEvent_done_fifo_ptr = node_ptr->libEvent_done_fifo.begin(); node_ptr->libEvent_done_fifo_ptr != node_ptr->libEvent_done_fifo.end(); node_ptr->libEvent_done_fifo_ptr++ ) { if ( node_ptr->libEvent_done_fifo_ptr->sequence == event.sequence ) { ilog ("%s fetched from done queue\n", node_ptr->libEvent_done_fifo_ptr->log_prefix.c_str()); /* get on the response data */ event.http_status = node_ptr->libEvent_done_fifo_ptr->http_status ; event.status = node_ptr->libEvent_done_fifo_ptr->status ; event.active = node_ptr->libEvent_done_fifo_ptr->active ; event.value = node_ptr->libEvent_done_fifo_ptr->value ; event.result = node_ptr->libEvent_done_fifo_ptr->result ; event.response = node_ptr->libEvent_done_fifo_ptr->response ; event.response_len= node_ptr->libEvent_done_fifo_ptr->response_len ; node_ptr->libEvent_done_fifo.erase(node_ptr->libEvent_done_fifo_ptr); found = true ; if ( event.status == RETRY ) { slog ("%s over riding rety to fail\n", node_ptr->libEvent_done_fifo_ptr->log_prefix.c_str() ); event.status = FAIL ; } rc = event.status ; break ; } } if ( found == false ) { qlog ("%s not found in done queue\n", event.log_prefix.c_str()); rc = RETRY ; } return (rc); } /* *********************************************************************** * * Name : nodeLinkClass::workQueue_process * * Description: This is a Per Host Finite State Machine (FSM) that * processes the work queue for the supplied host's * node pointer. * * Constructs: * * node_ptr->libEvent_work_fifo - the current work queue/fifo * node_ptr->libEvent_done_fifo - queue/fifo of completed requests * * Operations: * * requests are added to the libEvent_work_fifo with workQueue_enqueue. * requests are removed from the libEvent_done_fifo with workQueue_dequeue. * * Behavior: * * In process libEvents are copied from the callers work queue to * its thisReq. * * Completed events including execution status are copied to the host's * done fifo. * * Failed events may be retried up to max_retries as specified by * the callers libEvent. * * @param event is a reference to the callers libEvent. * * @return an integer with values of PASS, FAIL, RETRY * * ************************************************************************/ int nodeLinkClass::workQueue_process ( struct nodeLinkClass::node * node_ptr ) { int rc = PASS ; /* handle purging the done queue on the last */ if ( node_ptr->libEvent_done_fifo.size() > 0 ) { /* Manage the done queue so that it does not grow forever * if command producers do not come back and dequeue their * responses */ if ( node_ptr->libEvent_done_fifo.size() > 10 ) { qlog ("%s Done Queue has %ld elements\n", node_ptr->hostname.c_str(), node_ptr->libEvent_done_fifo.size()); /* TODO: look at the status of the commands and print a log of those that failed */ /* Remove the first 8 - its a fifo the first ones at the front are the oldest */ for ( int i=0 ; i < 8 ; i++ ) { node_ptr->libEvent_done_fifo.pop_front(); } qlog ("%s Done Queue has %ld elements remaining\n", node_ptr->hostname.c_str(), node_ptr->libEvent_done_fifo.size()); } } if ( node_ptr->libEvent_work_fifo.empty() ) { // qlog_throttled ( node_ptr->no_work_log_throttle, 300, // "%s Idle ... \n", // node_ptr->hostname.c_str()); node_ptr->no_work_log_throttle = 0 ; return (PASS); } if ( daemon_get_cfg_ptr()->debug_work & 8 ) { // workQueue_print ( node_ptr ) ; syslog ( LOG_INFO, "\n"); syslog ( LOG_INFO, "+------+-------+--------------+---------+---------------+-----+----------------------+\n"); syslog ( LOG_INFO, "| Mode | Seq | Hostname | Service | Request | Tmo | IP Address Port | payload ...\n"); syslog ( LOG_INFO, "+------+-------+--------------+---------+---------------+-----+----------------------+\n"); for ( node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); node_ptr->libEvent_work_fifo_ptr != node_ptr->libEvent_work_fifo.end(); node_ptr->libEvent_work_fifo_ptr ++ ) { syslog ( LOG_INFO, "| %-4s | %5d | %-12s | %-7s | %-13s | %3d | %15s:%d | %s\n", _get_work_state_str(node_ptr->libEvent_work_fifo_ptr->state).c_str(), node_ptr->libEvent_work_fifo_ptr->sequence, node_ptr->libEvent_work_fifo_ptr->hostname.c_str(), node_ptr->libEvent_work_fifo_ptr->service.c_str(), node_ptr->libEvent_work_fifo_ptr->operation.c_str(), node_ptr->libEvent_work_fifo_ptr->timeout, node_ptr->libEvent_work_fifo_ptr->ip.c_str(), node_ptr->libEvent_work_fifo_ptr->port, node_ptr->libEvent_work_fifo_ptr->payload.c_str()); } syslog ( LOG_INFO, "+------+-------+--------------+---------+--------------+-----+----------------------+\n"); } int size = node_ptr->libEvent_work_fifo.size() ; if ( size > QUEUE_OVERLOAD ) { elog ( "%s work queue overload ; clearing %d entries\n", node_ptr->hostname.c_str(), size ); workQueue_purge ( node_ptr ); return (FAIL); } if ( node_ptr->libEvent_work_fifo.empty() ) { slog ("%s unexpected empty 'libEvent_work_fifo_ptr' (should have %d elements)\n", node_ptr->hostname.c_str(), size ); workQueue_purge ( node_ptr ); return (FAIL_NULL_POINTER); } node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); switch ( node_ptr->libEvent_work_fifo_ptr->state ) { case HTTP__TRANSMIT: { node_ptr->thisReq = node_ptr->libEvent_work_fifo.front(); qlog ("%s Transmitted\n", node_ptr->thisReq.log_prefix.c_str() ); rc = mtcHttpUtil_api_request ( node_ptr->thisReq ) ; if ( rc ) { node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__FAILURE ; } else { node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__RECEIVE_WAIT ; if ( node_ptr->http_timer.tid ) mtcTimer_stop ( node_ptr->http_timer ); rc = mtcTimer_start_msec ( node_ptr->http_timer, mtcTimer_handler, HTTP_RECEIVE_WAIT_MSEC ); if ( rc != PASS ) { elog ("%s failed to start http command timer ; failing command\n", node_ptr->thisReq.log_prefix.c_str()); node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__FAILURE ; } } break ; } case HTTP__RECEIVE_WAIT: { if ( node_ptr->http_timer.ring == true ) { if ( node_ptr->http_timer.error == true ) { slog ("%s timer handler ran while still in start utility ; handled ...\n", node_ptr->thisReq.log_prefix.c_str()); node_ptr->http_timer.error = false ; } if (( node_ptr->http_timer._guard != 0x12345678 ) || ( node_ptr->http_timer.guard_ != 0x77654321 )) { slog ("%s timer struct guard barrier detected corruption\n", node_ptr->thisReq.log_prefix.c_str()); } node_ptr->http_timer.ring = false ; node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__RECEIVE ; } break ; } case HTTP__RECEIVE: { /* Try and receive the response */ if ( node_ptr->thisReq.base == NULL ) { slog ("%s has unexpected null HTTP request base pointer\n", node_ptr->thisReq.log_prefix.c_str()); node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__FAILURE ; break ; } int msec_timeout = (node_ptr->thisReq.timeout*1000); int wait_time = (++node_ptr->thisReq.rx_retry_cnt)*HTTP_RECEIVE_WAIT_MSEC ; rc = mtcHttpUtil_receive ( node_ptr->thisReq ); if ( rc == RETRY ) { node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__RECEIVE_WAIT ; mtcTimer_start_msec ( node_ptr->http_timer, mtcTimer_handler, HTTP_RECEIVE_WAIT_MSEC ); if ((wait_time > (msec_timeout/4)) && ( node_ptr->thisReq.low_wm == false ) ) { qlog1 ("%s reached lower (1/4) timeout watermark (%d msec)\n", node_ptr->thisReq.log_prefix.c_str(), wait_time ); node_ptr->libEvent_work_fifo_ptr->low_wm = node_ptr->thisReq.low_wm = true ; break ; } else if ((wait_time > (msec_timeout/2)) && ( node_ptr->thisReq.med_wm == false )) { qlog ("%s reached mid (1/2) timeout watermark (%d msec)\n", node_ptr->thisReq.log_prefix.c_str(), wait_time); node_ptr->libEvent_work_fifo_ptr->med_wm = node_ptr->thisReq.med_wm = true ; break ; } else if (( wait_time > ((msec_timeout/4)*3)) && ( node_ptr->thisReq.high_wm == false )) { wlog ("%s reached high (3/4) timeout watermark (%d msec)\n", node_ptr->thisReq.log_prefix.c_str(), wait_time ); node_ptr->libEvent_work_fifo_ptr->high_wm = node_ptr->thisReq.high_wm = true ; break ; } else { /* Only print every 16 starting with 2 */ if ( (node_ptr->thisReq.rx_retry_cnt & 0xF) == 2 ) { qlog ("%s rx_retry_cnt:%d\n", node_ptr->thisReq.log_prefix.c_str(), node_ptr->thisReq.rx_retry_cnt ); } break ; } } if ( rc != PASS ) { node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__FAILURE ; } else { if ( node_ptr->thisReq.cur_retries ) { ilog ("%s Completed (after %d retries) (took %d of %d msecs)\n", node_ptr->thisReq.log_prefix.c_str(), node_ptr->thisReq.cur_retries, wait_time, node_ptr->thisReq.timeout*1000); } else { qlog ("%s Completed (took %d of %d msecs)\n", node_ptr->thisReq.log_prefix.c_str(), wait_time, node_ptr->thisReq.timeout*1000); } node_ptr->thisReq.exec_time_msec = wait_time ; node_ptr->thisReq.rx_retry_cnt = 0 ; mtcHttpUtil_free_conn ( node_ptr->thisReq ); mtcHttpUtil_free_base ( node_ptr->thisReq ); /* Don't add success responses to non-critical commands like * "update uptime" and "update task" to the done queue */ if ( !node_ptr->thisReq.noncritical ) { /* Copy done event to the done queue */ node_ptr->libEvent_done_fifo.push_back(node_ptr->thisReq); } /* Pop that done event off the work queue */ node_ptr->libEvent_work_fifo.pop_front(); } break ; } case HTTP__FAILURE: { bool want_retry = false ; mtcHttpUtil_free_conn ( node_ptr->thisReq ); mtcHttpUtil_free_base ( node_ptr->thisReq ); node_ptr->http_retries_cur++ ; node_ptr->thisReq.cur_retries++ ; if ( node_ptr->thisReq.noncritical == true ) { if ( node_ptr->thisReq.cur_retries > node_ptr->thisReq.max_retries ) { node_ptr->oper_failures++ ; wlog ("%s retry conjestion abort of non-critical command (%d:%d)\n", node_ptr->thisReq.log_prefix.c_str(), node_ptr->thisReq.cur_retries, node_ptr->thisReq.max_retries ); /* Pop this aborted event off the work queue */ node_ptr->libEvent_work_fifo.pop_front(); } else { want_retry = true ; } } /* other wise its critical and we are going for the retries */ else if ( node_ptr->thisReq.cur_retries >= node_ptr->thisReq.max_retries ) { node_ptr->oper_failures++ ; elog ("%s Failed (rc:%d) - (%d of %d) (work->%s) (Critical:%s) (Total Fails:%d)\n", node_ptr->thisReq.log_prefix.c_str(), node_ptr->thisReq.status, node_ptr->thisReq.cur_retries, node_ptr->thisReq.max_retries, node_ptr->thisReq.noncritical ? "drop" : "done", node_ptr->thisReq.noncritical ? "No" : "Yes", node_ptr->oper_failures ); if ( node_ptr->thisReq.noncritical == false ) { /* Copy done event to the done queue */ node_ptr->libEvent_done_fifo.push_back(node_ptr->thisReq); } /* Pop that done event off the work queue */ node_ptr->libEvent_work_fifo.pop_front(); } else { want_retry = true ; } if ( want_retry ) { wlog ("%s Failed (rc:%d) - (%d of %d) (Timeout=%d) (Critical:%s)\n", node_ptr->thisReq.log_prefix.c_str(), node_ptr->thisReq.status, node_ptr->thisReq.cur_retries, node_ptr->thisReq.max_retries, node_ptr->thisReq.timeout, node_ptr->thisReq.noncritical ? "No" : "Yes" ); node_ptr->thisReq.response.clear(); node_ptr->thisReq.status = PASS ; node_ptr->thisReq.http_status = 0 ; node_ptr->thisReq.active = false ; node_ptr->thisReq.response_len= 0 ; /* * If this is an inventory request ... * * 1. Init the inv struct * 2. increase the timeout if is a critical command * * */ if ( node_ptr->thisReq.service.find("mtcInvApi") != std::string::npos ) { node_inv_init ( node_ptr->thisReq.inv_info ) ; if ( node_ptr->thisReq.noncritical == false ) { int temp = node_ptr->libEvent_work_fifo_ptr->timeout ; /* * Increase and update the timeout value for critical commands * in hope that it will succeed on he next go around. */ node_ptr->libEvent_work_fifo_ptr->timeout += get_mtcInv_ptr()->sysinv_timeout ; dlog ("%s timeout extended from %d to %d secs\n", node_ptr->thisReq.log_prefix.c_str(), temp, node_ptr->libEvent_work_fifo_ptr->timeout ); } } /* Save the retry count */ node_ptr->libEvent_work_fifo_ptr->cur_retries = node_ptr->thisReq.cur_retries ; node_ptr->libEvent_work_fifo_ptr->state = node_ptr->thisReq.state = HTTP__TRANSMIT ; } break ; } default: { slog ("%s Bad libEvent work state (%d) ; clearing work/done queue\n", node_ptr->hostname.c_str(), node_ptr->libEvent_work_fifo_ptr->state ); node_ptr->libEvent_work_fifo.clear(); node_ptr->libEvent_done_fifo.clear(); rc = FAIL_BAD_CASE ; } } return (rc) ; } /* *********************************************************************** * * Name : nodeLinkClass::workQueue_del_cmd * * Description: To handle the pathalogical case where an event seems to * have timed out at the callers level then this interface * can be called to delete it from the work queue. * * @param node_ptr so that the hosts work queue can be found * @param sequence to specify the specific sequence number to remove * @return always PASS since there is nothing the caller can or needs * to do if the command is not present. * */ int nodeLinkClass::workQueue_del_cmd ( struct nodeLinkClass::node * node_ptr, int sequence ) { bool found = false ; for ( node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); node_ptr->libEvent_work_fifo_ptr != node_ptr->libEvent_work_fifo.end(); node_ptr->libEvent_work_fifo_ptr++ ) { if ( node_ptr->libEvent_work_fifo_ptr->sequence == sequence ) { wlog ("%s force removed from work queue\n", node_ptr->libEvent_work_fifo_ptr->log_prefix.c_str()); node_ptr->libEvent_work_fifo.erase(node_ptr->libEvent_work_fifo_ptr); found = true ; break ; } } if ( found == false ) { wlog ("%s command Seq:%d not found in work queue\n", node_ptr->hostname.c_str(), sequence ); } return(PASS); } /* *********************************************************************** * * Name : nodeLinkClass::doneQueue_purge * * Description: Removes all items from the done queue. * * Returns a failure, the sequence number of the first command * in the done queue that did not PASS. * */ int nodeLinkClass::doneQueue_purge ( struct nodeLinkClass::node * node_ptr ) { int rc = PASS ; int size = node_ptr->libEvent_done_fifo.size() ; if ( size ) { int index = 0 ; for ( node_ptr->libEvent_done_fifo_ptr = node_ptr->libEvent_done_fifo.begin(); node_ptr->libEvent_done_fifo_ptr != node_ptr->libEvent_done_fifo.end(); node_ptr->libEvent_done_fifo_ptr++ ) { index++ ; if ( node_ptr->libEvent_done_fifo_ptr->status ) { /* Don't report noncritical command failure status. * Such commands might be "update uptime" and "update task" * and we don't want them to fail operations */ if ( !node_ptr->libEvent_done_fifo_ptr->noncritical ) { elog ("%s critical operation failed (rc:%d)\n", node_ptr->libEvent_done_fifo_ptr->log_prefix.c_str(), node_ptr->libEvent_done_fifo_ptr->status); if ( ! node_ptr->libEvent_done_fifo_ptr->payload.empty() ) { elog ("%s ... %s\n", node_ptr->hostname.c_str(), node_ptr->libEvent_done_fifo_ptr->payload.c_str()); } /* Save sequence of first failed priority command */ if ( rc == PASS ) { rc = node_ptr->libEvent_done_fifo_ptr->sequence ; } } else { wlog ("%s noncritical operation failed (rc:%d)\n", node_ptr->libEvent_done_fifo_ptr->log_prefix.c_str(), node_ptr->libEvent_done_fifo_ptr->status); if ( ! node_ptr->libEvent_done_fifo_ptr->payload.empty() ) { wlog ("%s ... %s\n", node_ptr->hostname.c_str(), node_ptr->libEvent_done_fifo_ptr->payload.c_str()); } } } } if ( rc == PASS ) { qlog ("%s all (%d) priority queued operations passed (qlog)\n", node_ptr->hostname.c_str(), size ); } qlog ("%s purging %d items from doneQueue\n", node_ptr->hostname.c_str(), size ); node_ptr->libEvent_done_fifo.clear(); } return (rc); } /* *********************************************************************** * * Name : nodeLinkClass::workQueue_purge * * Description: Removes all items from the work queue. * */ int nodeLinkClass::workQueue_purge ( struct nodeLinkClass::node * node_ptr ) { int size = node_ptr->libEvent_work_fifo.size() ; if ( size ) { /* TODO: find out how to force close a connection. * Don't free the connection if it is in the receiving state or * we might get a segfault * There is only ever one connection open at a time for a specific host * so its only 'thisReq' we need to worry about. */ if ( node_ptr->libEvent_work_fifo_ptr->state != HTTP__RECEIVE ) { mtcHttpUtil_free_conn ( node_ptr->thisReq ); mtcHttpUtil_free_base ( node_ptr->thisReq ); } wlog ("%s purging %d items from workQueue\n", node_ptr->hostname.c_str(), size ); for ( node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); node_ptr->libEvent_work_fifo_ptr != node_ptr->libEvent_work_fifo.end(); node_ptr->libEvent_work_fifo_ptr++ ) { if ( node_ptr->libEvent_work_fifo_ptr->state == HTTP__TRANSMIT ) { wlog ("%s ... was not executed\n", node_ptr->libEvent_work_fifo_ptr->log_prefix.c_str()); } else { wlog ("%s ... did not complete (%s)\n", node_ptr->libEvent_work_fifo_ptr->log_prefix.c_str(), _get_work_state_str(node_ptr->libEvent_work_fifo_ptr->state).c_str()); } } node_ptr->libEvent_work_fifo.clear(); } else { qlog ("%s all work done\n", node_ptr->hostname.c_str()); } // node_ptr->libEvent_work_fifo_ptr->state = HTTP__TRANSMIT ; return (PASS); } int nodeLinkClass::workQueue_done ( struct nodeLinkClass::node * node_ptr ) { int rc = PASS ; /* have we timed out waiting the work queue to deplete */ if ( node_ptr->mtcTimer.ring == true ) { qlog ( "%s Ring handler\n" , node_ptr->hostname.c_str()); node_ptr->mtcTimer.ring = false ; /* Search through work queue and don't fail if the * only requests remaining is an uptime */ for ( node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); node_ptr->libEvent_work_fifo_ptr != node_ptr->libEvent_work_fifo.end(); node_ptr->libEvent_work_fifo_ptr++ ) { /* Don't report work queue timeout if there are only noncritical * commands left in the work queue. Such commands might be * "update uptime" and "update task" */ if ( !node_ptr->libEvent_work_fifo_ptr->noncritical ) { rc = FAIL_WORKQ_TIMEOUT ; } } if ( rc != PASS ) { elog ("%s timeout on work queue complete\n", node_ptr->hostname.c_str()); } } /* We still doing enable work ? */ else if ( node_ptr->libEvent_work_fifo.size () == 0 ) { qlog ( "%s Empty Work Queue\n" , node_ptr->hostname.c_str()); /* O.K. the work queue is done - cancel the timer */ if ( node_ptr->mtcTimer.tid ) mtcTimer_stop ( node_ptr->mtcTimer ); /* Error logs are generated inside */ rc = doneQueue_purge ( node_ptr ); } else { qlog ( "%s Retry\n" , node_ptr->hostname.c_str()); rc = RETRY ; } #ifdef WANT_FIT_TESTING if (( rc == PASS ) && ( daemon_want_fit ( FIT_CODE__WORK_QUEUE, node_ptr->hostname ))) rc = FAIL_FIT ; #endif return (rc); } /* *********************************************************************** * * Name : nodeLinkClass::workQueue_present * * Description: Checks to see if this libEvent is in the work queue. * * @return true if present otherwise false * */ bool nodeLinkClass::workQueue_present ( libEvent & event ) { nodeLinkClass::node * node_ptr = this->getNode (event.hostname); if ( node_ptr != NULL ) { if ( node_ptr->libEvent_work_fifo.size() ) { for ( node_ptr->libEvent_work_fifo_ptr = node_ptr->libEvent_work_fifo.begin(); node_ptr->libEvent_work_fifo_ptr != node_ptr->libEvent_work_fifo.end(); node_ptr->libEvent_work_fifo_ptr++ ) { if ( node_ptr->libEvent_work_fifo_ptr->sequence == event.sequence ) { qlog ("%s ... found in work queue\n", node_ptr->libEvent_work_fifo_ptr->log_prefix.c_str()); return (true); } } } } wlog ("%s ... not found in work queue\n", event.log_prefix.c_str()); return (false); }