]> git.vomp.tv Git - vompclient.git/commitdiff
A bit more on client side timeouts
authorChris Tallon <chris@vomp.tv>
Tue, 13 May 2008 17:13:32 +0000 (17:13 +0000)
committerChris Tallon <chris@vomp.tv>
Tue, 13 May 2008 17:13:32 +0000 (17:13 +0000)
vdr.cc
vdr.h

diff --git a/vdr.cc b/vdr.cc
index c228fd85d0e86efd211f39b4c33fba18685aacea..31f0d471415d79da34b31c20df5c849e38a6eee7 100644 (file)
--- a/vdr.cc
+++ b/vdr.cc
@@ -183,6 +183,8 @@ void VDR::setReceiveWindow(size_t size)
 
 void VDR::threadMethod()
 {
+  logger->log("VDR", Log::DEBUG, "VDR RUN");  
+
   threadSetKillable(); // FIXME - change this to deal with the EDRs
   
   ULONG channelID;
@@ -197,6 +199,8 @@ void VDR::threadMethod()
   VDR_ResponsePacket* vresp;
   
   int timeoutCount = 0;
+  ULONG lastKAsent = 0;
+  ULONG timeNow = 0;
   
   while(1) 
   {  
@@ -206,67 +210,46 @@ void VDR::threadMethod()
     
       // Error or timeout.
 
-      // Thsi is the simple version of timeout system until I work out how to make it better
-      // e.g. different timeout lengths for different requests, a decent keepalive system
-      // running in parallel etc etc.
+      /* This is the simple version of timeout system until I work out how to make it better
+         e.g. different timeout lengths for different requests, a decent keepalive system
+         running in parallel etc etc.
+        
+         Ignore the stream channel
+         Start with lastPacketReceived = 0
+         When a packet comes in, set lastPacketReceived = time(NULL)
+         loop
+         {
+           if last packet is over 20s ago
+           {
+             if lastKAsent is less than 20s ago, continue
+             
+             if lastKAsent = 0, send a KA, continue
+             
+             if lastKAsent is over 20s ago, connection is dead, kill it
+           }
+         }         
+      */
 
       logger->log("VDR", Log::DEBUG, "Net read timeout %i", timeoutCount);
 
-      if (!tcp->isConnected())// FIXME disabled for now until it works ... !! || (timeoutCount > 10)) // 20s
+      if (!tcp->isConnected()) { connectionDied(); return; } // return to stop this thread
+      
+      timeoutCount = 0; // disable it for now
+      
+      if (timeoutCount >= 10) //20s
       {
-        // timeout, kill it
-
-        connected = false; // though actually it could still be connected until someone calls vdr->disconnect
-
-        // Need to wake up any waiting channel 1 request-response threads
-        // Normally this is done by a packet coming in with channelid and requestid      
-        // Instead, go through the list and for each channel 1 edr, make an empty vresp
-        // An empty vresp will have userData == NULL, which means vresp->noResponse() == true
-
-        // If it's a stream receiver, generate a stream packet with flag == connection_lost
-
-        edLock();
-        VDR_PacketReceiver* vdrpr;
-        while(receivers.size())
+        timeNow = (ULONG)time(NULL);
+      
+        if (lastKAsent > (timeNow - 20)) continue;
+        
+        if (lastKAsent == 0)
         {
-          vdrpr = (VDR_PacketReceiver*) *(receivers.begin());
-          if (vdrpr->receiverChannel == CHANNEL_REQUEST_RESPONSE)
-          {
-            vresp = new VDR_ResponsePacket();
-            vresp->setResponse(vdrpr->requestSerialNumber, NULL, 0);
-            logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for request serial %lu", vdrpr->requestSerialNumber);
-            edUnlock();
-            if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
-            {
-              // If edFindAndCall returns true, edr was called and vresp was handed off.
-              // else, delete vresp here.
-              logger->log("VDR", Log::ERR, "Timeouts: no waiting thread found for request serial %lu !!!", vdrpr->requestSerialNumber);
-              delete vresp;
-            }
-            edLock();
-          }
-          else if (vdrpr->receiverChannel == CHANNEL_STREAM)
-          {
-            vresp->setStream(vdrpr->streamID, 2 /* connection-lost flag */ , NULL, 0);
-            logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for streamid %lu", vdrpr->streamID);
-            edUnlock();
-            if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
-            {
-              // If edFindAndCall returns true, edr was called and vresp was handed off.
-              // else, delete vresp here.
-              logger->log("VDR", Log::ERR, "Timeouts: no waiting stream receiver found for streamid %lu !!!", vdrpr->streamID);
-              delete vresp;
-            }
-            edLock();  
-          }
+          if (!sendKA(timeNow)) { connectionDied(); return; }
+          lastKAsent = timeNow;
+          continue;
         }
-        edUnlock();
-        // Ok, all event receviers should be dealt with. just in case there weren't any, inform command
-        
-        Command::getInstance()->connectionLost();
         
-        // return and stop this thread
-        return;
+        if (lastKAsent <= (timeNow - 20)) { connectionDied(); return; }
       }
      
       continue;      
@@ -277,9 +260,7 @@ void VDR::threadMethod()
     timeoutCount = 0;
     
     channelID = ntohl(channelID);
-
-    vresp = new VDR_ResponsePacket();  
-    
+   
     if (channelID == CHANNEL_REQUEST_RESPONSE)
     {
       if (!tcp->readData((UCHAR*)&requestID, sizeof(ULONG))) break;
@@ -287,18 +268,24 @@ void VDR::threadMethod()
       if (!tcp->readData((UCHAR*)&userDataLength, sizeof(ULONG))) break;
       userDataLength = ntohl(userDataLength);
       if (userDataLength > 5000000) break; // how big can these packets get?
+      userData = NULL;
       if (userDataLength > 0)
       {
         userData = (UCHAR*)malloc(userDataLength);
         if (!userData) break;
         if (!tcp->readData(userData, userDataLength)) break;
       }
-      else
-      {
-        userData = NULL;
-      }
+
+      vresp = new VDR_ResponsePacket();  
       vresp->setResponse(requestID, userData, userDataLength);
       logger->log("VDR", Log::DEBUG, "Rxd a response packet, requestID=%lu, len=%lu", requestID, userDataLength);
+
+      if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+      {
+        // If edFindAndCall returns true, edr was called and vresp was handed off.
+        // else, delete vresp here.
+        delete vresp;
+      }
     }
     else if (channelID == CHANNEL_STREAM)
     {
@@ -310,39 +297,104 @@ void VDR::threadMethod()
 
       if (!tcp->readData((UCHAR*)&userDataLength, sizeof(ULONG))) break; 
       userDataLength = ntohl(userDataLength);
+      userData = NULL;
       if (userDataLength > 0)
       {
         userData = (UCHAR*)malloc(userDataLength);
         if (!userData) break;
         if (!tcp->readData(userData, userDataLength)) break;
       }
-      else
-      {
-        userData = NULL;
-      }
+
+      vresp = new VDR_ResponsePacket();    
       vresp->setStream(streamID, flag, userData, userDataLength);
 //      logger->log("VDR", Log::DEBUG, "Rxd a stream packet, streamID=%lu, flag=%lu, len=%lu", streamID, flag, userDataLength);
+
+      if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+      {
+        // If edFindAndCall returns true, edr was called and vresp was handed off.
+        // else, delete vresp here.
+        delete vresp;
+      }
+    }
+    else if (channelID == CHANNEL_KEEPALIVE)
+    {
+      ULONG KAreply = 0;
+      if (!tcp->readData((UCHAR*)&KAreply, sizeof(ULONG))) break;
+      KAreply = (ULONG)ntohl(KAreply);
+      if (KAreply == lastKAsent) // successful KA response
+      {
+        lastKAsent = 0;
+        timeoutCount = 0;
+      }
     }
     else
     {
       logger->log("VDR", Log::ERR, "Rxd a response packet on channel %lu !!", channelID);
-      delete vresp;
       break;
     }
 
-    if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
-    {
-      // If edFindAndCall returns true, edr was called and vresp was handed off.
-      // else, delete vresp here.
-      delete vresp;
-    }
-    
     // Who deletes vresp?
     // If RR, the individual protocol functions must delete vresp.
     // If stream, the data and length is taken out in ed_cb_call and vresp is deleted there.
   }
 }
 
+void VDR::connectionDied()
+{
+  // Called from within threadMethod to do cleanup if it decides the connection has died
+
+  connected = false; // though actually it could still be connected until someone calls vdr->disconnect
+
+  // Need to wake up any waiting channel 1 request-response threads
+  // Normally this is done by a packet coming in with channelid and requestid      
+  // Instead, go through the list and for each channel 1 edr, make an empty vresp
+  // An empty vresp will have userData == NULL, which means vresp->noResponse() == true
+
+  // If it's a stream receiver, generate a stream packet with flag == connection_lost
+
+  edLock();
+  VDR_PacketReceiver* vdrpr;
+  VDR_ResponsePacket* vresp;
+  while(receivers.size())
+  {
+    vdrpr = (VDR_PacketReceiver*) *(receivers.begin());
+    if (vdrpr->receiverChannel == CHANNEL_REQUEST_RESPONSE)
+    {
+      vresp = new VDR_ResponsePacket();
+      vresp->setResponse(vdrpr->requestSerialNumber, NULL, 0);
+      logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for request serial %lu", vdrpr->requestSerialNumber);
+      edUnlock();
+      if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+      {
+        // If edFindAndCall returns true, edr was called and vresp was handed off.
+        // else, delete vresp here.
+        logger->log("VDR", Log::ERR, "Timeouts: no waiting thread found for request serial %lu !!!", vdrpr->requestSerialNumber);
+        delete vresp;
+      }
+      edLock();
+    }
+    else if (vdrpr->receiverChannel == CHANNEL_STREAM)
+    {
+      vresp = new VDR_ResponsePacket();
+      vresp->setStream(vdrpr->streamID, 2 /* connection-lost flag */ , NULL, 0);
+      logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for streamid %lu", vdrpr->streamID);
+      edUnlock();
+      if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+      {
+        // If edFindAndCall returns true, edr was called and vresp was handed off.
+        // else, delete vresp here.
+        logger->log("VDR", Log::ERR, "Timeouts: no waiting stream receiver found for streamid %lu !!!", vdrpr->streamID);
+        delete vresp;
+      }
+      edLock();  
+    }
+  }
+  edUnlock();
+  // Ok, all event receviers should be dealt with. just in case there weren't any, inform command
+  
+  Command::getInstance()->connectionLost();
+}
+
 bool VDR::ed_cb_find(EDReceiver* edr, void* userTag)
 {
   // edr is a VDR_PacketReceiver object made in VDR::RequestResponse
@@ -408,6 +460,15 @@ VDR_ResponsePacket* VDR::RequestResponse(VDR_RequestPacket* vrp)
   return vdrpr.save_vresp;
 }
 
+bool VDR::sendKA(ULONG timeStamp)
+{
+  char buffer[8];
+  *(ULONG*)&buffer[0] = htonl(CHANNEL_KEEPALIVE);
+  *(ULONG*)&buffer[4] = htonl(timeStamp);
+  if ((ULONG)tcp->sendData(buffer, 8) != 8) return false;
+  return true;
+}
+
 /////////////////////////////////////////////////////////////////////////////
 
 // Here VDR takes a break for the VDR_PacketReceiver helper class
diff --git a/vdr.h b/vdr.h
index 26c5b60b5a79337ffb5416aca0a37d52740f5c07..6c75dc4507ad399bed08bc04004c20adc2289f25 100644 (file)
--- a/vdr.h
+++ b/vdr.h
     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 */
 
+
+// FIXME - This and the protocol are overly complicated now. Sorry.
+//         I'll clean it up in a couple of releases time...
+
+
 #ifndef VDR_H
 #define VDR_H
 
@@ -111,7 +116,8 @@ class VDR : public Thread_TYPE, public EventDispatcher
   
     const static ULONG CHANNEL_REQUEST_RESPONSE = 1;
     const static ULONG CHANNEL_STREAM = 2;
-  
+    const static ULONG CHANNEL_KEEPALIVE = 3;
+      
     VDR();
     ~VDR();
     static VDR* getInstance();
@@ -200,6 +206,9 @@ class VDR : public Thread_TYPE, public EventDispatcher
     VDR_ResponsePacket* RequestResponse(VDR_RequestPacket* request);
     UCHAR* getBlock(ULLONG position, UINT maxAmount, UINT* amountReceived, ULONG cmd);
     
+    void connectionDied();
+    bool sendKA(ULONG timeStamp);
+    
     Log* logger;
     int initted;
     int findingServer;