void VDR::threadMethod()
{
+ logger->log("VDR", Log::DEBUG, "VDR RUN");
+
threadSetKillable(); // FIXME - change this to deal with the EDRs
ULONG channelID;
VDR_ResponsePacket* vresp;
int timeoutCount = 0;
+ ULONG lastKAsent = 0;
+ ULONG timeNow = 0;
while(1)
{
// Error or timeout.
- // Thsi is the simple version of timeout system until I work out how to make it better
- // e.g. different timeout lengths for different requests, a decent keepalive system
- // running in parallel etc etc.
+ /* This is the simple version of timeout system until I work out how to make it better
+ e.g. different timeout lengths for different requests, a decent keepalive system
+ running in parallel etc etc.
+
+ Ignore the stream channel
+ Start with lastPacketReceived = 0
+ When a packet comes in, set lastPacketReceived = time(NULL)
+ loop
+ {
+ if last packet is over 20s ago
+ {
+ if lastKAsent is less than 20s ago, continue
+
+ if lastKAsent = 0, send a KA, continue
+
+ if lastKAsent is over 20s ago, connection is dead, kill it
+ }
+ }
+ */
logger->log("VDR", Log::DEBUG, "Net read timeout %i", timeoutCount);
- if (!tcp->isConnected())// FIXME disabled for now until it works ... !! || (timeoutCount > 10)) // 20s
+ if (!tcp->isConnected()) { connectionDied(); return; } // return to stop this thread
+
+ timeoutCount = 0; // disable it for now
+
+ if (timeoutCount >= 10) //20s
{
- // timeout, kill it
-
- connected = false; // though actually it could still be connected until someone calls vdr->disconnect
-
- // Need to wake up any waiting channel 1 request-response threads
- // Normally this is done by a packet coming in with channelid and requestid
- // Instead, go through the list and for each channel 1 edr, make an empty vresp
- // An empty vresp will have userData == NULL, which means vresp->noResponse() == true
-
- // If it's a stream receiver, generate a stream packet with flag == connection_lost
-
- edLock();
- VDR_PacketReceiver* vdrpr;
- while(receivers.size())
+ timeNow = (ULONG)time(NULL);
+
+ if (lastKAsent > (timeNow - 20)) continue;
+
+ if (lastKAsent == 0)
{
- vdrpr = (VDR_PacketReceiver*) *(receivers.begin());
- if (vdrpr->receiverChannel == CHANNEL_REQUEST_RESPONSE)
- {
- vresp = new VDR_ResponsePacket();
- vresp->setResponse(vdrpr->requestSerialNumber, NULL, 0);
- logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for request serial %lu", vdrpr->requestSerialNumber);
- edUnlock();
- if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
- {
- // If edFindAndCall returns true, edr was called and vresp was handed off.
- // else, delete vresp here.
- logger->log("VDR", Log::ERR, "Timeouts: no waiting thread found for request serial %lu !!!", vdrpr->requestSerialNumber);
- delete vresp;
- }
- edLock();
- }
- else if (vdrpr->receiverChannel == CHANNEL_STREAM)
- {
- vresp->setStream(vdrpr->streamID, 2 /* connection-lost flag */ , NULL, 0);
- logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for streamid %lu", vdrpr->streamID);
- edUnlock();
- if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
- {
- // If edFindAndCall returns true, edr was called and vresp was handed off.
- // else, delete vresp here.
- logger->log("VDR", Log::ERR, "Timeouts: no waiting stream receiver found for streamid %lu !!!", vdrpr->streamID);
- delete vresp;
- }
- edLock();
- }
+ if (!sendKA(timeNow)) { connectionDied(); return; }
+ lastKAsent = timeNow;
+ continue;
}
- edUnlock();
- // Ok, all event receviers should be dealt with. just in case there weren't any, inform command
-
- Command::getInstance()->connectionLost();
- // return and stop this thread
- return;
+ if (lastKAsent <= (timeNow - 20)) { connectionDied(); return; }
}
continue;
timeoutCount = 0;
channelID = ntohl(channelID);
-
- vresp = new VDR_ResponsePacket();
-
+
if (channelID == CHANNEL_REQUEST_RESPONSE)
{
if (!tcp->readData((UCHAR*)&requestID, sizeof(ULONG))) break;
if (!tcp->readData((UCHAR*)&userDataLength, sizeof(ULONG))) break;
userDataLength = ntohl(userDataLength);
if (userDataLength > 5000000) break; // how big can these packets get?
+ userData = NULL;
if (userDataLength > 0)
{
userData = (UCHAR*)malloc(userDataLength);
if (!userData) break;
if (!tcp->readData(userData, userDataLength)) break;
}
- else
- {
- userData = NULL;
- }
+
+ vresp = new VDR_ResponsePacket();
vresp->setResponse(requestID, userData, userDataLength);
logger->log("VDR", Log::DEBUG, "Rxd a response packet, requestID=%lu, len=%lu", requestID, userDataLength);
+
+ if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+ {
+ // If edFindAndCall returns true, edr was called and vresp was handed off.
+ // else, delete vresp here.
+ delete vresp;
+ }
}
else if (channelID == CHANNEL_STREAM)
{
if (!tcp->readData((UCHAR*)&userDataLength, sizeof(ULONG))) break;
userDataLength = ntohl(userDataLength);
+ userData = NULL;
if (userDataLength > 0)
{
userData = (UCHAR*)malloc(userDataLength);
if (!userData) break;
if (!tcp->readData(userData, userDataLength)) break;
}
- else
- {
- userData = NULL;
- }
+
+ vresp = new VDR_ResponsePacket();
vresp->setStream(streamID, flag, userData, userDataLength);
// logger->log("VDR", Log::DEBUG, "Rxd a stream packet, streamID=%lu, flag=%lu, len=%lu", streamID, flag, userDataLength);
+
+ if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+ {
+ // If edFindAndCall returns true, edr was called and vresp was handed off.
+ // else, delete vresp here.
+ delete vresp;
+ }
+ }
+ else if (channelID == CHANNEL_KEEPALIVE)
+ {
+ ULONG KAreply = 0;
+ if (!tcp->readData((UCHAR*)&KAreply, sizeof(ULONG))) break;
+ KAreply = (ULONG)ntohl(KAreply);
+ if (KAreply == lastKAsent) // successful KA response
+ {
+ lastKAsent = 0;
+ timeoutCount = 0;
+ }
}
else
{
logger->log("VDR", Log::ERR, "Rxd a response packet on channel %lu !!", channelID);
- delete vresp;
break;
}
- if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
- {
- // If edFindAndCall returns true, edr was called and vresp was handed off.
- // else, delete vresp here.
- delete vresp;
- }
-
// Who deletes vresp?
// If RR, the individual protocol functions must delete vresp.
// If stream, the data and length is taken out in ed_cb_call and vresp is deleted there.
}
}
+void VDR::connectionDied()
+{
+ // Called from within threadMethod to do cleanup if it decides the connection has died
+
+ connected = false; // though actually it could still be connected until someone calls vdr->disconnect
+
+ // Need to wake up any waiting channel 1 request-response threads
+ // Normally this is done by a packet coming in with channelid and requestid
+ // Instead, go through the list and for each channel 1 edr, make an empty vresp
+ // An empty vresp will have userData == NULL, which means vresp->noResponse() == true
+
+ // If it's a stream receiver, generate a stream packet with flag == connection_lost
+
+ edLock();
+ VDR_PacketReceiver* vdrpr;
+ VDR_ResponsePacket* vresp;
+ while(receivers.size())
+ {
+ vdrpr = (VDR_PacketReceiver*) *(receivers.begin());
+ if (vdrpr->receiverChannel == CHANNEL_REQUEST_RESPONSE)
+ {
+ vresp = new VDR_ResponsePacket();
+ vresp->setResponse(vdrpr->requestSerialNumber, NULL, 0);
+ logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for request serial %lu", vdrpr->requestSerialNumber);
+ edUnlock();
+ if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+ {
+ // If edFindAndCall returns true, edr was called and vresp was handed off.
+ // else, delete vresp here.
+ logger->log("VDR", Log::ERR, "Timeouts: no waiting thread found for request serial %lu !!!", vdrpr->requestSerialNumber);
+ delete vresp;
+ }
+ edLock();
+ }
+ else if (vdrpr->receiverChannel == CHANNEL_STREAM)
+ {
+ vresp = new VDR_ResponsePacket();
+ vresp->setStream(vdrpr->streamID, 2 /* connection-lost flag */ , NULL, 0);
+ logger->log("VDR", Log::DEBUG, "Timeouts: created blank response packet for streamid %lu", vdrpr->streamID);
+ edUnlock();
+ if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+ {
+ // If edFindAndCall returns true, edr was called and vresp was handed off.
+ // else, delete vresp here.
+ logger->log("VDR", Log::ERR, "Timeouts: no waiting stream receiver found for streamid %lu !!!", vdrpr->streamID);
+ delete vresp;
+ }
+ edLock();
+ }
+ }
+ edUnlock();
+ // Ok, all event receviers should be dealt with. just in case there weren't any, inform command
+
+ Command::getInstance()->connectionLost();
+}
+
bool VDR::ed_cb_find(EDReceiver* edr, void* userTag)
{
// edr is a VDR_PacketReceiver object made in VDR::RequestResponse
return vdrpr.save_vresp;
}
+bool VDR::sendKA(ULONG timeStamp)
+{
+ char buffer[8];
+ *(ULONG*)&buffer[0] = htonl(CHANNEL_KEEPALIVE);
+ *(ULONG*)&buffer[4] = htonl(timeStamp);
+ if ((ULONG)tcp->sendData(buffer, 8) != 8) return false;
+ return true;
+}
+
/////////////////////////////////////////////////////////////////////////////
// Here VDR takes a break for the VDR_PacketReceiver helper class