Added Marten's queueing for incoming packets
authorChris Tallon <chris@vomp.tv>
Fri, 13 Jun 2008 16:33:33 +0000 (16:33 +0000)
committerChris Tallon <chris@vomp.tv>
Fri, 13 Jun 2008 16:33:33 +0000 (16:33 +0000)
vompclientrrproc.c
vompclientrrproc.h

index 66a2a2e7ba33465b0a7b9b93aa726db6b9b11ab1..de107962f08080c4d9b8100511151608c21480f9 100644 (file)
@@ -59,25 +59,19 @@ bool VompClientRRProc::recvRequest(RequestPacket* newRequest)
 {
   /*
      Accept a new request
-     Currently only one at once is supported but this
-     could be upgraded to a queueing system
+     Now we have a queue system is used,
+     since on rare occasion the client fire two request at once
+     e.g. heavily channel switching 
+     then processing only a single request would cause a deadlock in the client
+     Marten
   */
 
   threadLock();
-
-  if (req)
-  {
-    log->log("RRProc", Log::ERR, "recvReq err 1");     
-    threadUnlock();
-    return false; 
-  }
-
-  req = newRequest;
+  req_queue.push(newRequest);
   threadSignalNoLock();
-
   log->log("RRProc", Log::DEBUG, "recvReq set req and signalled");     
-  
   threadUnlock();
+
   return true;
 }
 
@@ -86,27 +80,46 @@ void VompClientRRProc::threadMethod()
   threadLock();
   log->log("RRProc", Log::DEBUG, "threadMethod startup");     
 
-  while(1)  
+  if (req_queue.size() != 0)
   {
-    if (req)
-    {
-      log->log("RRProc", Log::ERR, "threadMethod err 1");     
-      threadUnlock();
-      return;
-    }
+    log->log("RRProc", Log::ERR, "threadMethod err 1");     
+    threadUnlock();
+    return;
+  }
     
+  while(1)  
+  {
     log->log("RRProc", Log::DEBUG, "threadMethod waiting");     
-    threadWaitForSignal();
-    if (!req)
+    threadWaitForSignal();  // unlocks, waits, relocks
+    if (req_queue.size() == 0)
     {
       log->log("RRProc", Log::INFO, "threadMethod err 2 or quit");     
       threadUnlock();
       return;
     }
     
-    log->log("RRProc", Log::DEBUG, "thread woken with req");     
-  
-    processPacket();
+    // signalled with something in queue
+    
+    log->log("RRProc", Log::DEBUG, "thread woken with req, queue size: %i", req_queue.size());
+
+    while (req_queue.size()) 
+    {
+      log->log("RRProc", Log::DEBUG, "thread while");
+      req = req_queue.front();
+      req_queue.pop();
+      
+      threadUnlock(); // allow recvRequest to be queuing packets while we are working on this one
+      
+      if (!processPacket())
+      {
+        log->log("RRProc", Log::ERR, "processPacket exited with fail");     
+        return;
+      }
+      
+      threadLock();
+    } 
+    
+    // locked and run out of packets to process
   }  
 }
 
@@ -117,6 +130,11 @@ bool VompClientRRProc::processPacket()
   {
     log->log("RRProc", Log::ERR, "response packet init fail");     
     delete resp; 
+    
+    if (req->data) free(req->data);
+    delete req;
+    req = NULL;
+
     return false;
   }
     
index 895f6af6557fce1ee3cce35f4d3cb84eb8e9cdda..31d0a14d5378130d87c3b26b3b5fbfedc22d49a8 100644 (file)
@@ -23,6 +23,9 @@
 
 #include "thread.h"
 #include "responsepacket.h"
+#include <queue>
+
+using namespace std;
 
 class VompClient;
 class Log;
@@ -39,6 +42,8 @@ class RequestPacket
     ULONG dataLength;
 };
 
+typedef queue<RequestPacket*> RequestPacketQueue;
+
 class VompClientRRProc : public Thread
 {
   public:
@@ -85,6 +90,7 @@ class VompClientRRProc : public Thread
 
     VompClient& x;
     RequestPacket* req;
+    RequestPacketQueue req_queue;
     ResponsePacket* resp;
     
     Log* log;