--- /dev/null
+/*
+ Copyright 2007 Chris Tallon
+
+ This file is part of VOMP.
+
+ VOMP is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ VOMP is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with VOMP; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "eventdispatcher.h"
+
+EventDispatcher::EventDispatcher()
+{
+#ifndef WIN32
+ pthread_mutex_init(&mutex, NULL);
+#else
+ mutex = CreateMutex(NULL, FALSE, NULL);
+#endif
+}
+
+void EventDispatcher::edRegister(EDReceiver* edr)
+{
+ edLock();
+ receivers.push_back(edr);
+ edUnlock();
+}
+
+bool EventDispatcher::edFindAndCall(void* userTag)
+{
+ edLock();
+
+ EDReceiver* edr;
+ EDRL::iterator i;
+ for(i = receivers.begin(); i != receivers.end(); i++)
+ {
+ if (ed_cb_find(*i, userTag))
+ {
+ edr = *i;
+ break; // found (by asking the EventDispatcher implementor to check if userTag is for *i
+ }
+ }
+
+ if ((i == receivers.end()) || edr->callinprogress || edr->nomorecalls)
+ {
+ edUnlock();
+ return false;
+ }
+
+ edr->callinprogress = true;
+ edUnlock();
+ bool edrType = edr->call(userTag);
+ edLock();
+ edr->callinprogress = false;
+
+ if (edrType == false) // it's a multicall
+ {
+ if (edr->nomorecalls) // External has called unRegister - probably the receiver
+ {
+ // wake up the thread waiting in unregister
+ #ifndef WIN32
+ pthread_cond_signal(&edr->cond);
+ #else
+ // FIXME
+ #endif
+ }
+ }
+ else // It's a single call. The receiver should be removed from the list. There will be a thread to wake up
+ {
+ for(i = receivers.begin(); i != receivers.end(); i++)
+ {
+ if (*i == edr)
+ {
+ receivers.erase(i);
+ break;
+ }
+ }
+ if (i == receivers.end()) abort(); // should never happen
+
+ #ifndef WIN32
+ pthread_cond_signal(&edr->cond);
+ #else
+ // FIXME
+ #endif
+ }
+
+ edUnlock();
+ return true;
+}
+
+void EventDispatcher::edUnregister(EDReceiver* edr)
+{
+ edLock();
+
+ EDRL::iterator i;
+ for(i = receivers.begin(); i != receivers.end(); i++)
+ {
+ if (*i == edr) break; // found
+ }
+
+ if (i == receivers.end()) abort(); // should never happen
+
+ if (!edr->callinprogress)
+ {
+ receivers.erase(i);
+ edUnlock();
+ return;
+ }
+
+ edr->nomorecalls = true;
+
+ // edUnlock, wait for callinprogres=false (cond to be signalled), lock
+#ifndef WIN32
+ pthread_cond_wait(&edr->cond, &mutex);
+#else
+ // FIXME
+#endif
+
+ for(i = receivers.begin(); i != receivers.end(); i++)
+ {
+ if (*i == edr) break; // found
+ }
+
+ if (i == receivers.end()) abort(); // should never happen
+
+ receivers.erase(i);
+ edUnlock();
+}
+
+// ---------------------------------------
+
+void EventDispatcher::edLock()
+{
+#ifndef WIN32
+ pthread_mutex_lock(&mutex);
+#else
+ WaitForSingleObject(mutex, INFINITE);
+#endif
+}
+
+void EventDispatcher::edUnlock()
+{
+#ifndef WIN32
+ pthread_mutex_unlock(&mutex);
+#else
+ ReleaseMutex(mutex);
+#endif
+}
+
+// ---------------------------------------
+
+void EventDispatcher::edSleepThisReceiver(EDReceiver* edr)
+{
+ // For blocking version, not callback version. Call with edLock locked
+
+#ifndef WIN32
+ pthread_cond_init(&edr->cond, NULL);
+ pthread_cond_wait(&edr->cond, &mutex);
+#else
+ // FIXME
+#endif
+}
+
+// -------------- EDReceiver implementation
+
+EDReceiver::EDReceiver()
+{
+ nomorecalls = false;
+ callinprogress = false;
+}
+
+
if (tcp->connectTo(serverIP, 3024))
{
connected = true;
+ threadStart();
return 1;
}
else
void VDR::disconnect()
{
+ threadCancel();
if (tcp) delete tcp;
tcp = NULL;
connected = false;
void VDR::threadMethod()
{
+ threadSetKillable();
+
+ UCHAR* packet;
+ ULONG packetLength;
+ VDR_ResponsePacket* vresp;
+
+ while(1)
+ {
+ packet = (UCHAR*)tcp->receivePacket(); // cancellation point
+
+ vresp = new VDR_ResponsePacket();
+ if (packet)
+ {
+ packetLength = (ULONG)tcp->getDataLength();
+ vresp->set(packet, packetLength);
+ }
+
+ if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
+ {
+ // If edFindAndCall returns true, edr was called and vresp was handed off.
+ // else, delete vresp here.
+ delete vresp;
+ }
+
+ // Who deletes vresp?
+ // If RR, the individual protocol functions must delete vresp.
+ // If stream, the data and length is taken out in ed_cb_call and vresp is deleted there.
+ }
+}
+
+bool VDR::ed_cb_find(EDReceiver* edr, void* userTag)
+{
+ // edr is a VDR_PacketReceiver object made in VDR::RequestResponse
+ // userTag is a VDR_ResponsePacket made in threadMethod
+
+ VDR_PacketReceiver* vdrpr = (VDR_PacketReceiver*)edr;
+ VDR_ResponsePacket* vresp = (VDR_ResponsePacket*)userTag;
+
+ // Is vresp for vdrpr ?
+
+ // Not written yet. will be true
+ if (vdrpr);
+ if (vresp);
+
+ return true;
}
VDR_ResponsePacket* VDR::RequestResponse(VDR_RequestPacket* vrp)
{
logger->log("VDR", Log::DEBUG, "RR");
- MUTEX_LOCK(&mutex);
-
+
if (!connected)
{
- MUTEX_UNLOCK(&mutex);
- return NULL;
+ VDR_ResponsePacket* vresp = new VDR_ResponsePacket();
+ return vresp; // "no-response" return
}
- waitingRequestThread = Thread_TYPE::thisThreadID();
+ // ED make new VDR and register
+ // make a VDR_PacketReceiver
+ // - init with serial number of request packet
+
+ VDR_PacketReceiver* vdrpr = new VDR_PacketReceiver();
+ vdrpr->receiverChannel = VDR::CHANNEL_REQUEST_RESPONSE;
+ vdrpr->requestSerialNumber = vrp->getSerial();
+ edRegister(vdrpr);
+
+ edLock();
if ((ULONG)tcp->sendPacket(vrp->getPtr(), vrp->getLen()) != vrp->getLen())
{
- disconnect();
- MUTEX_UNLOCK(&mutex);
- return NULL;
+ edUnlock();
+ VDR_ResponsePacket* vresp = new VDR_ResponsePacket();
+ return vresp; // "no-response" return
}
- UCHAR* packet = (UCHAR*)tcp->receivePacket();
- if (!packet)
- {
- disconnect();
- MUTEX_UNLOCK(&mutex);
- return NULL;
- }
- ULONG packetLength = (ULONG)tcp->getDataLength();
+ // Sleep and block this thread. The sleep unlocks the mutex
+ logger->log("VDR", Log::DEBUG, "RR sleep");
+ edSleepThisReceiver(vdrpr);
+ logger->log("VDR", Log::DEBUG, "RR unsleep");
+
+ // Woken because a response packet has arrived, mutex will be locked
- MUTEX_UNLOCK(&mutex);
+ edUnlock();
- VDR_ResponsePacket* vresp = new VDR_ResponsePacket();
- vresp->set(packet, packetLength);
+ VDR_ResponsePacket* toReturn = vdrpr->save_vresp;
+ delete vdrpr;
+ return toReturn;
+}
+
+/////////////////////////////////////////////////////////////////////////////
+
+// Here VDR takes a break for the VDR_PacketReceiver helper class
+
+bool VDR_PacketReceiver::call(void* userTag)
+{
+ if (receiverChannel == VDR::CHANNEL_REQUEST_RESPONSE)
+ {
+ // It's a RR. Save vresp and, signal the waiting thread and return.
+ // VDR::RequestResponse will be blocking waiting for this to happen.
+ // That function has a pointer to this object and can read save_vresp.
+ save_vresp = (VDR_ResponsePacket*)userTag;
+ return true; // Signals ED to remove edr from receivers and wake up edr thread
+ }
- return vresp;
+ if (receiverChannel == VDR::CHANNEL_STREAM)
+ {
+ // It's a stream packet.
+ streamReceiver->receiveData(NULL, 0);
+ delete (VDR_ResponsePacket*)userTag;
+ return false;
+ }
+
+ abort(); // unknown receiverChannel, should not happen
}
/////////////////////////////////////////////////////////////////////////////
if (!vrp.copyin((UCHAR*)mactemp, 6)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULONG vdrTime = vresp->extractULONG();
logger->log("VDR", Log::DEBUG, "vdrtime = %lu", vdrTime);
if (!vrp.init(VDR_GETRECORDINGLIST, true, 0)) return false;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return false;
+ if (vresp->noResponse()) { delete vresp; return false; }
ULONG totalSpace = vresp->extractULONG();
ULONG freeSpace = vresp->extractULONG();
if (!vrp.addString(fileName)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
int toReturn = (int)vresp->extractULONG();
delete vresp;
if (!vrp.addString(newPath)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
char* toReturn = NULL;
int success = (int)vresp->extractULONG();
if (!vrp.init(VDR_GETCHANNELLIST, true, 0)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
ChannelList* chanList = new ChannelList();
if (!vrp.addULONG(number)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
int toReturn = (int)vresp->extractULONG();
delete vresp;
if (!vrp.init(VDR_STOPSTREAMING, true, 0)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
int toReturn = (int)vresp->extractULONG();
delete vresp;
if (!vrp.addULONG(maxAmount)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
if (vresp->serverError())
{
if (!vrp.addString(fileName)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULLONG lengthBytes = vresp->extractULLONG();
ULONG lengthFrames = vresp->extractULONG();
if (!vrp.addULONG(frameNumber)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULLONG position = vresp->extractULLONG();
delete vresp;
if (!vrp.addULLONG(position)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULONG framenumber = vresp->extractULONG();
delete vresp;
if (!vrp.addULONG(direction)) return false;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return false;
+ if (vresp->noResponse()) { delete vresp; return false; }
if (vresp->serverError())
{
if (!vrp.addULONG(duration)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
// received a ulong(0) - schedules error in the plugin
if (vresp->serverError())
if (!vrp.addString(value)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
int toReturn = (int)vresp->extractULONG();
delete vresp;
if (!vrp.addString(key)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
char* toReturn = vresp->extractString();
delete vresp;
if (!vrp.init(VDR_GETTIMERS, true, 0)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
RecTimerList* recTimerList = new RecTimerList();
if (!vrp.addString(timerString)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULONG toReturn = vresp->extractULONG();
delete vresp;
if (!vrp.addString(fileName)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
if (vresp->serverError())
{
if (!vrp.init(VDR_RESCANRECORDING, true, 0)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULLONG lengthBytes = vresp->extractULLONG();
ULONG lengthFrames = vresp->extractULONG();
if (!vrp.addString(fileName)) return NULL;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
if (vresp->serverError())
{
if (!vrp.addULONG(channel->number)) return ;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return ;
+ if (vresp->noResponse()) { delete vresp; return ; }
// Format of response
// vpid
}
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return NULL;
+ if (vresp->noResponse()) { delete vresp; return NULL; }
if (vresp->serverError())
{
if (!vrp.addString(fileName)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
ULONG cmd = vresp->extractULONG();
ULONG lengthBytes = vresp->extractULONG();
if (!vrp.addULONG(delTimer->stopTime)) return 0;
VDR_ResponsePacket* vresp = RequestResponse(&vrp);
- if (!vresp) return 0;
+ if (vresp->noResponse()) { delete vresp; return 0; }
int toReturn = (int)vresp->extractULONG();
delete vresp;