From 216d4498be4c89420e6c9a55d81b91cee0f36c61 Mon Sep 17 00:00:00 2001 From: Chris Tallon Date: Tue, 13 May 2008 17:13:32 +0000 Subject: [PATCH] A bit more on client side timeouts --- vdr.cc | 207 +++++++++++++++++++++++++++++++++++++-------------------- vdr.h | 11 ++- 2 files changed, 144 insertions(+), 74 deletions(-) diff --git a/vdr.cc b/vdr.cc index c228fd8..31f0d47 100644 --- a/vdr.cc +++ b/vdr.cc @@ -183,6 +183,8 @@ void VDR::setReceiveWindow(size_t size) void VDR::threadMethod() { + logger->log("VDR", Log::DEBUG, "VDR RUN"); + threadSetKillable(); // FIXME - change this to deal with the EDRs ULONG channelID; @@ -197,6 +199,8 @@ void VDR::threadMethod() VDR_ResponsePacket* vresp; int timeoutCount = 0; + ULONG lastKAsent = 0; + ULONG timeNow = 0; while(1) { @@ -206,67 +210,46 @@ void VDR::threadMethod() // Error or timeout. - // Thsi is the simple version of timeout system until I work out how to make it better - // e.g. different timeout lengths for different requests, a decent keepalive system - // running in parallel etc etc. + /* This is the simple version of timeout system until I work out how to make it better + e.g. different timeout lengths for different requests, a decent keepalive system + running in parallel etc etc. + + Ignore the stream channel + Start with lastPacketReceived = 0 + When a packet comes in, set lastPacketReceived = time(NULL) + loop + { + if last packet is over 20s ago + { + if lastKAsent is less than 20s ago, continue + + if lastKAsent = 0, send a KA, continue + + if lastKAsent is over 20s ago, connection is dead, kill it + } + } + */ logger->log("VDR", Log::DEBUG, "Net read timeout %i", timeoutCount); - if (!tcp->isConnected())// FIXME disabled for now until it works ... !! || (timeoutCount > 10)) // 20s + if (!tcp->isConnected()) { connectionDied(); return; } // return to stop this thread + + timeoutCount = 0; // disable it for now + + if (timeoutCount >= 10) //20s { - // timeout, kill it - - connected = false; // though actually it could still be connected until someone calls vdr->disconnect - - // Need to wake up any waiting channel 1 request-response threads - // Normally this is done by a packet coming in with channelid and requestid - // Instead, go through the list and for each channel 1 edr, make an empty vresp - // An empty vresp will have userData == NULL, which means vresp->noResponse() == true - - // If it's a stream receiver, generate a stream packet with flag == connection_lost - - edLock(); - VDR_PacketReceiver* vdrpr; - while(receivers.size()) + timeNow = (ULONG)time(NULL); + + if (lastKAsent > (timeNow - 20)) continue; + + if (lastKAsent == 0) { - vdrpr = (VDR_PacketReceiver*) *(receivers.begin()); - if (vdrpr->receiverChannel == CHANNEL_REQUEST_RESPONSE) - { - vresp = new VDR_ResponsePacket(); - vresp->setResponse(vdrpr->requestSerialNumber, NULL, 0); - logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for request serial %lu", vdrpr->requestSerialNumber); - edUnlock(); - if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) - { - // If edFindAndCall returns true, edr was called and vresp was handed off. - // else, delete vresp here. - logger->log("VDR", Log::ERR, "Timeouts: no waiting thread found for request serial %lu !!!", vdrpr->requestSerialNumber); - delete vresp; - } - edLock(); - } - else if (vdrpr->receiverChannel == CHANNEL_STREAM) - { - vresp->setStream(vdrpr->streamID, 2 /* connection-lost flag */ , NULL, 0); - logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for streamid %lu", vdrpr->streamID); - edUnlock(); - if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) - { - // If edFindAndCall returns true, edr was called and vresp was handed off. - // else, delete vresp here. - logger->log("VDR", Log::ERR, "Timeouts: no waiting stream receiver found for streamid %lu !!!", vdrpr->streamID); - delete vresp; - } - edLock(); - } + if (!sendKA(timeNow)) { connectionDied(); return; } + lastKAsent = timeNow; + continue; } - edUnlock(); - // Ok, all event receviers should be dealt with. just in case there weren't any, inform command - - Command::getInstance()->connectionLost(); - // return and stop this thread - return; + if (lastKAsent <= (timeNow - 20)) { connectionDied(); return; } } continue; @@ -277,9 +260,7 @@ void VDR::threadMethod() timeoutCount = 0; channelID = ntohl(channelID); - - vresp = new VDR_ResponsePacket(); - + if (channelID == CHANNEL_REQUEST_RESPONSE) { if (!tcp->readData((UCHAR*)&requestID, sizeof(ULONG))) break; @@ -287,18 +268,24 @@ void VDR::threadMethod() if (!tcp->readData((UCHAR*)&userDataLength, sizeof(ULONG))) break; userDataLength = ntohl(userDataLength); if (userDataLength > 5000000) break; // how big can these packets get? + userData = NULL; if (userDataLength > 0) { userData = (UCHAR*)malloc(userDataLength); if (!userData) break; if (!tcp->readData(userData, userDataLength)) break; } - else - { - userData = NULL; - } + + vresp = new VDR_ResponsePacket(); vresp->setResponse(requestID, userData, userDataLength); logger->log("VDR", Log::DEBUG, "Rxd a response packet, requestID=%lu, len=%lu", requestID, userDataLength); + + if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) + { + // If edFindAndCall returns true, edr was called and vresp was handed off. + // else, delete vresp here. + delete vresp; + } } else if (channelID == CHANNEL_STREAM) { @@ -310,39 +297,104 @@ void VDR::threadMethod() if (!tcp->readData((UCHAR*)&userDataLength, sizeof(ULONG))) break; userDataLength = ntohl(userDataLength); + userData = NULL; if (userDataLength > 0) { userData = (UCHAR*)malloc(userDataLength); if (!userData) break; if (!tcp->readData(userData, userDataLength)) break; } - else - { - userData = NULL; - } + + vresp = new VDR_ResponsePacket(); vresp->setStream(streamID, flag, userData, userDataLength); // logger->log("VDR", Log::DEBUG, "Rxd a stream packet, streamID=%lu, flag=%lu, len=%lu", streamID, flag, userDataLength); + + if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) + { + // If edFindAndCall returns true, edr was called and vresp was handed off. + // else, delete vresp here. + delete vresp; + } + } + else if (channelID == CHANNEL_KEEPALIVE) + { + ULONG KAreply = 0; + if (!tcp->readData((UCHAR*)&KAreply, sizeof(ULONG))) break; + KAreply = (ULONG)ntohl(KAreply); + if (KAreply == lastKAsent) // successful KA response + { + lastKAsent = 0; + timeoutCount = 0; + } } else { logger->log("VDR", Log::ERR, "Rxd a response packet on channel %lu !!", channelID); - delete vresp; break; } - if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) - { - // If edFindAndCall returns true, edr was called and vresp was handed off. - // else, delete vresp here. - delete vresp; - } - // Who deletes vresp? // If RR, the individual protocol functions must delete vresp. // If stream, the data and length is taken out in ed_cb_call and vresp is deleted there. } } +void VDR::connectionDied() +{ + // Called from within threadMethod to do cleanup if it decides the connection has died + + connected = false; // though actually it could still be connected until someone calls vdr->disconnect + + // Need to wake up any waiting channel 1 request-response threads + // Normally this is done by a packet coming in with channelid and requestid + // Instead, go through the list and for each channel 1 edr, make an empty vresp + // An empty vresp will have userData == NULL, which means vresp->noResponse() == true + + // If it's a stream receiver, generate a stream packet with flag == connection_lost + + edLock(); + VDR_PacketReceiver* vdrpr; + VDR_ResponsePacket* vresp; + while(receivers.size()) + { + vdrpr = (VDR_PacketReceiver*) *(receivers.begin()); + if (vdrpr->receiverChannel == CHANNEL_REQUEST_RESPONSE) + { + vresp = new VDR_ResponsePacket(); + vresp->setResponse(vdrpr->requestSerialNumber, NULL, 0); + logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for request serial %lu", vdrpr->requestSerialNumber); + edUnlock(); + if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) + { + // If edFindAndCall returns true, edr was called and vresp was handed off. + // else, delete vresp here. + logger->log("VDR", Log::ERR, "Timeouts: no waiting thread found for request serial %lu !!!", vdrpr->requestSerialNumber); + delete vresp; + } + edLock(); + } + else if (vdrpr->receiverChannel == CHANNEL_STREAM) + { + vresp = new VDR_ResponsePacket(); + vresp->setStream(vdrpr->streamID, 2 /* connection-lost flag */ , NULL, 0); + logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for streamid %lu", vdrpr->streamID); + edUnlock(); + if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) + { + // If edFindAndCall returns true, edr was called and vresp was handed off. + // else, delete vresp here. + logger->log("VDR", Log::ERR, "Timeouts: no waiting stream receiver found for streamid %lu !!!", vdrpr->streamID); + delete vresp; + } + edLock(); + } + } + edUnlock(); + // Ok, all event receviers should be dealt with. just in case there weren't any, inform command + + Command::getInstance()->connectionLost(); +} + bool VDR::ed_cb_find(EDReceiver* edr, void* userTag) { // edr is a VDR_PacketReceiver object made in VDR::RequestResponse @@ -408,6 +460,15 @@ VDR_ResponsePacket* VDR::RequestResponse(VDR_RequestPacket* vrp) return vdrpr.save_vresp; } +bool VDR::sendKA(ULONG timeStamp) +{ + char buffer[8]; + *(ULONG*)&buffer[0] = htonl(CHANNEL_KEEPALIVE); + *(ULONG*)&buffer[4] = htonl(timeStamp); + if ((ULONG)tcp->sendData(buffer, 8) != 8) return false; + return true; +} + ///////////////////////////////////////////////////////////////////////////// // Here VDR takes a break for the VDR_PacketReceiver helper class diff --git a/vdr.h b/vdr.h index 26c5b60..6c75dc4 100644 --- a/vdr.h +++ b/vdr.h @@ -18,6 +18,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ + +// FIXME - This and the protocol are overly complicated now. Sorry. +// I'll clean it up in a couple of releases time... + + #ifndef VDR_H #define VDR_H @@ -111,7 +116,8 @@ class VDR : public Thread_TYPE, public EventDispatcher const static ULONG CHANNEL_REQUEST_RESPONSE = 1; const static ULONG CHANNEL_STREAM = 2; - + const static ULONG CHANNEL_KEEPALIVE = 3; + VDR(); ~VDR(); static VDR* getInstance(); @@ -200,6 +206,9 @@ class VDR : public Thread_TYPE, public EventDispatcher VDR_ResponsePacket* RequestResponse(VDR_RequestPacket* request); UCHAR* getBlock(ULLONG position, UINT maxAmount, UINT* amountReceived, ULONG cmd); + void connectionDied(); + bool sendKA(ULONG timeStamp); + Log* logger; int initted; int findingServer; -- 2.39.2