Worker pool on NetBurner

The NetBurner OS is a multitasking OS, thus it provides the possibility to use multiple tasks. For the WS4D-gSOAP stack it is interesting to support a worker pool to serve web service requests. There exist two possibilities with worker pools: (1) worker pool for a specific hosted service or (2) worker pool for all (hosting and hosted) services.

The main difference between these approaches lies in the soap data handling. For solution (2) the whole soap handle has to be copied (soap_copy_context) while for solution (1) it is sufficient to copy the socket from the soap handle to the worker's handle.

The following code snippets are for the solution (1) with one worker pool for one specific hosted service (mb_service) while the other hosted service (cnf_service) is handled in the main loop with the discovery and hosting handle:

struct soap hosting, discovery, mb_service, cnf_service;
struct dpws_s device;

#define MAX_SOAP_TASKS (4)
struct soap soap_task[MAX_SOAP_TASKS]; // each task needs a runtime context
OS_SEM soap_task_semaphore[MAX_SOAP_TASKS];

asm( " .align 4 " );
DWORD SoapTaskStack[MAX_SOAP_TASKS][USER_TASK_STK_SIZE] __attribute__( ( aligned( 4 ) ) );

void mb_serve_request_and_clean_up(void *pd) {
  int offset = (int) pd;
  struct soap *soap = &soap_task[offset];
  OS_SEM *my_sem = &soap_task_semaphore[offset];
  int (*serve_requests[]) (struct soap *soap) = SOAP_SERVE_SET(mb1_serve_request);
  int serve_requests_cnt = 1;
  int ret;
  for (;;) {
    debug(LOG_DEBUG, "waiting for requests (soap task %d)", offset);
    OSSemPend(my_sem, 0);
    debug(LOG_DEBUG, "serve of accepted handle in task (soap task %d)", offset);
    if ((ret = dpws_mserve(soap, serve_requests_cnt, serve_requests))) {
      if (ret == SOAP_EOF) {
        debug(LOG_INFO, "pipe broken (soap task %d)", offset);
      } else {
        debug(LOG_ERROR, "mb1_serve_request in task failed (soap task %d)", offset);
        soap_print_fault(soap, stderr);
        soap_print_fault_location(soap, stderr);
      }
    }
    soap_end(soap); // dealloc data and clean up
    soap->socket = SOAP_INVALID_SOCKET;
  }
}

void UserMain(void *pd) {

  // ...



  // init soap_task
  for (i = 0; i < MAX_SOAP_TASKS; i++) {
    struct soap *r = soap_copy_context(&soap_task[i], &mb_service);
    if (!r) {
      debug(LOG_ERROR, "soap_copy_context failed");
      return;
    }
    if (OSSemInit(&soap_task_semaphore[i], 0)) {
      debug(LOG_ERROR, "OSSemInet() failed");
      return;
    }
    if (OSTaskCreatewName(mb_serve_request_and_clean_up,
        (void *) i,
        &SoapTaskStack[i][USER_TASK_STK_SIZE], // stack top
        SoapTaskStack[i], // stack bottom
        MAIN_PRIO - i, // priority (50, 49, 48, ...)
        "MB service task")) {
          debug(LOG_ERROR, "Task with that prio %d already exists", MAIN_PRIO-i);
    }
  }


  debug(LOG_INFO, "ready to serve web service requests ...");
  for (;;) {
    struct soap *handle = NULL;
    int (*serve_requests[]) (struct soap *soap) = SOAP_SERVE_SET(cnf1_serve_request, NULL);
    int serve_requests_cnt = 1;
    struct soap *soap_set[] = SOAP_HANDLE_SET(&hosting, &discovery, &cnf_service, NULL);
    int soap_set_cnt = 3;
    long timeout_ms = 1000;
    int ret;
    int soap_task_offset = -1;

    // check for free soap_task (mb_client)
    for (i = 0; i < MAX_SOAP_TASKS; i++) {
      if (soap_task[i].socket == SOAP_INVALID_SOCKET) {
        soap_task_offset = i;
        serve_requests[serve_requests_cnt++] = mb1_serve_request;
        soap_set[soap_set_cnt++] = &mb_service;
        break;
      }
    }
    // waiting for new messages
    debug(LOG_DEBUG, "waiting for web service request");
    handle = dpws_maccept(&device, timeout_ms, soap_set_cnt, soap_set);

    // handle web services request
    if (handle == &mb_service) {
      debug(LOG_DEBUG, "mb request -> extra task (%d)", soap_task_offset);
      if (soap_valid_socket(mb_service.socket))  {
        debug(LOG_INFO, "Task %d accepts socket %d connection from IP %d.%d.%d.%d", soap_task_offset, mb_service.socket,
            (handle->ip >> 24)&0xFF, (handle->ip >> 16)&0xFF, (handle->ip >> 8)&0xFF, handle->ip&0xFF);
        if (soap_task[soap_task_offset].socket > 0) {
          debug(LOG_ERROR, "socket in use");
        } else {
          soap_task[soap_task_offset].socket = mb_service.socket; // new socket fd
          if (OSSemPost(&soap_task_semaphore[soap_task_offset])) {
            debug(LOG_ERROR, "OSSemPost() failed in offset %d", soap_task_offset);
          }
        }
      }
    } else if (handle) {
      debug(LOG_DEBUG, "in-loop service request");
      if ((ret = dpws_mserve(handle, serve_requests_cnt, serve_requests))) {
        if (ret == SOAP_EOF) {
          debug(LOG_INFO, "pipe broken in in-loop request");
        } else {
          debug(LOG_ERROR, "mserve failed for in-loop request");
          soap_print_fault(handle, stderr);
          soap_print_fault_location(handle, stderr);
        }
      }
      debug(LOG_DEBUG, "in-loop web service request served successfully");
      soap_end(handle); // deallocate data and clean up
    }
  } // end main loop

}

Hope, this code snippets helps.