From 9619ae3842eeba0f07d4c0e2aeafd226c3d18c56 Mon Sep 17 00:00:00 2001 From: Chris Tallon Date: Tue, 9 Aug 2005 00:48:25 +0000 Subject: [PATCH] Replacement transceiver and use of new thread class (from client!) --- Makefile | 4 +- mvpclient.c | 64 ++++++------- mvpclient.h | 7 +- mvpreceiver.c | 180 +++++++++++++++++++++++++++++++++++++ mvpreceiver.h | 95 ++++++++++++++++++++ mvpserver.c | 54 ++++++----- mvpserver.h | 11 +-- ringbuffer.c | 4 +- thread.c | 92 +++++++++++++++++++ thread.h | 57 ++++++++++++ transceiver.c | 245 -------------------------------------------------- transceiver.h | 77 ---------------- udpreplier.c | 34 ++----- udpreplier.h | 12 +-- 14 files changed, 501 insertions(+), 435 deletions(-) create mode 100755 mvpreceiver.c create mode 100755 mvpreceiver.h create mode 100755 thread.c create mode 100755 thread.h delete mode 100644 transceiver.c delete mode 100644 transceiver.h diff --git a/Makefile b/Makefile index 88a268b..af13c1e 100644 --- a/Makefile +++ b/Makefile @@ -47,8 +47,8 @@ DEFINES += -DPLUGIN_NAME_I18N='"$(PLUGIN)"' ### The object files (add further files here): OBJS = $(PLUGIN).o dsock.o mvpserver.o udpreplier.o mvpclient.o tcp.o \ - transceiver.o remux/ts2ps.o remux/ts2es.o remux/tsremux.o ringbuffer.o \ - recplayer.o config.o log.o + remux/ts2ps.o remux/ts2es.o remux/tsremux.o ringbuffer.o \ + recplayer.o config.o log.o thread.o mvpreceiver.o libdvbmpeg/libdvbmpegtools.a: libdvbmpeg/*.c libdvbmpeg/*.cc libdvbmpeg/*.h libdvbmpeg/*.hh diff --git a/mvpclient.c b/mvpclient.c index ef19bbf..1fe4cd2 100644 --- a/mvpclient.c +++ b/mvpclient.c @@ -23,7 +23,7 @@ MVPClient::MVPClient(int tsocket) : tcp(tsocket) { - cm = NULL; + lp = NULL; rp = NULL; recordingManager = NULL; log = Log::getInstance(); @@ -65,11 +65,10 @@ MVPClient::MVPClient(int tsocket) MVPClient::~MVPClient() { log->log("Client", Log::DEBUG, "MVP client destructor"); - if (cm) + if (lp) { - cm->Stop(); - delete cm; - cm = NULL; + delete lp; + lp = NULL; } else if (rp) { @@ -417,49 +416,32 @@ void MVPClient::processStartStreamingChannel(unsigned char* data, int length) return; } + lp = MVPReceiver::create(channel); -// static cDevice *GetDevice(const cChannel *Channel, int Priority = -1, bool *NeedsDetachReceivers = NULL); - ///< Returns a device that is able to receive the given Channel at the - ///< given Priority. - ///< See ProvidesChannel() for more information on how - ///< priorities are handled, and the meaning of NeedsDetachReceivers. - - bool NeedsDetachReceivers; - cDevice* device = cDevice::GetDevice(channel, 0, &NeedsDetachReceivers); - if (!device) + if (!lp) { - log->log("Client", Log::DEBUG, "No device found to receive this channel at this priority"); sendULONG(0); + return; } - else if (NeedsDetachReceivers) + + if (!lp->init()) { - // can't really happen since we stream with priority zero. if a rec has pri zero maybe - log->log("Client", Log::DEBUG, "Needs detach receivers"); + delete lp; + lp = NULL; sendULONG(0); + return; } - else - { - cm = new cMediamvpTransceiver(channel, 0, 0, device); - device->AttachReceiver(cm); - sendULONG(1); - } - -////// MVPReceiver* m = new MVPReceiver(channel->Vpid(), channel->Apid1()); -// cm = new cMediamvpTransceiver(channel, 0, 0, cDevice::ActualDevice()); -// cDevice::ActualDevice()->AttachReceiver(cm); -//// //cDevice::ActualDevice()->SwitchChannel(channel, false); - -// sendULONG(1); + sendULONG(1); } void MVPClient::processStopStreaming(unsigned char* data, int length) { log->log("Client", Log::DEBUG, "STOP STREAMING RECEIVED"); - if (cm) + if (lp) { - delete cm; - cm = NULL; + delete lp; + lp = NULL; } else if (rp) { @@ -476,7 +458,7 @@ void MVPClient::processStopStreaming(unsigned char* data, int length) void MVPClient::processGetBlock(unsigned char* data, int length) { - if (!cm && !rp) + if (!lp && !rp) { log->log("Client", Log::DEBUG, "Get block called when no streaming happening!"); return; @@ -490,10 +472,18 @@ void MVPClient::processGetBlock(unsigned char* data, int length) unsigned char sendBuffer[amount + 4]; unsigned long amountReceived = 0; // compiler moan. - if (cm) + if (lp) { log->log("Client", Log::DEBUG, "getting from live"); - amountReceived = cm->getBlock(&sendBuffer[4], amount); + amountReceived = lp->getBlock(&sendBuffer[4], amount); + + if (!amountReceived) + { + // vdr has possibly disconnected the receiver + log->log("Client", Log::DEBUG, "VDR has disconnected the live receiver"); + delete lp; + lp = NULL; + } } else if (rp) { diff --git a/mvpclient.h b/mvpclient.h index c5cbcc1..3658d00 100644 --- a/mvpclient.h +++ b/mvpclient.h @@ -34,7 +34,7 @@ #include #include "tcp.h" -#include "transceiver.h" +#include "mvpreceiver.h" #include "recplayer.h" #include "config.h" @@ -53,7 +53,9 @@ class MVPClient int initted; TCP tcp; Config config; - cMediamvpTransceiver* cm; + MVPReceiver* lp; + + cRecordings* recordingManager; RecPlayer* rp; Log* log; @@ -74,7 +76,6 @@ class MVPClient cChannel* channelFromNumber(unsigned long channelNumber); void writeResumeData(); void cleanConfig(); - void sendULONG(ULONG ul); void testChannelSchedule(unsigned char* data, int length); diff --git a/mvpreceiver.c b/mvpreceiver.c new file mode 100755 index 0000000..56337c0 --- /dev/null +++ b/mvpreceiver.c @@ -0,0 +1,180 @@ +#include "mvpreceiver.h" + +MVPReceiver* MVPReceiver::create(cChannel* channel) +{ + bool NeedsDetachReceivers; + cDevice* device = cDevice::GetDevice(channel, 0, &NeedsDetachReceivers); + + if (!device) + { + Log::getInstance()->log("MVPReceiver", Log::DEBUG, "No device found to receive this channel at this priority"); + return NULL; + } + + if (NeedsDetachReceivers) + { + // can't really happen since we stream with priority zero. if a rec has pri zero maybe + Log::getInstance()->log("MVPReceiver", Log::DEBUG, "Needs detach receivers"); + return NULL; + } + + MVPReceiver* m = new MVPReceiver(channel, device); + return m; +} + +MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device) +#if VDRVERSNUM < 10300 +: cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid()) +#else +: cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids()) +#endif +{ + logger = Log::getInstance(); + vdrActivated = false; + inittedOK = 0; + remuxer = NULL; + unprocessed = NULL; + + // Init + + // Get the remuxer for audio or video + +#if VDRVERSNUM < 10300 + if ((channel->Vpid() == 0) || (channel->Vpid() == 1) || (channel->Vpid() == 0x1FFF)) + { + remuxer = new cTS2ESRemux(channel->Apid1()); + logger->log("MVPReceiver", Log::DEBUG, "Created new < 1.3 TS->ES"); + } + else + { + remuxer = new cTS2PSRemux(channel->Vpid(), channel->Apid1(), 0, 0, 0, 0); + logger->log("MVPReceiver", Log::DEBUG, "Created new < 1.3 TS->PS"); + } +#else + if ((channel->Vpid() == 0) || (channel->Vpid() == 1) || (channel->Vpid() == 0x1FFF)) + { + remuxer = new cTS2ESRemux(channel->Apid(0)); + logger->log("MVPReceiver", Log::DEBUG, "Created new > 1.3 TS->ES"); + } + else + { + remuxer = new cTS2PSRemux(channel->Vpid(), channel->Apid(0), 0, 0, 0, 0); + logger->log("MVPReceiver", Log::DEBUG, "Created new > 1.3 TS->PS"); + } +#endif + + unprocessed = new cRingBufferLinear(1000000, TS_SIZE * 2, false); + + if (!processed.init(1000000)) return; + pthread_mutex_init(&processedRingLock, NULL); + + if (!threadStart()) return; + + // OK + + inittedOK = 1; + device->SwitchChannel(channel, false); + device->AttachReceiver(this); +} + +int MVPReceiver::init() +{ + return inittedOK; +} + +MVPReceiver::~MVPReceiver() +{ + Detach(); + if (threadIsActive()) threadCancel(); + if (unprocessed) delete unprocessed; + if (remuxer) delete remuxer; +} + +void MVPReceiver::Activate(bool on) +{ + vdrActivated = on; + if (on) logger->log("MVPReceiver", Log::DEBUG, "VDR active"); + else logger->log("MVPReceiver", Log::DEBUG, "VDR inactive"); +} + +bool MVPReceiver::isVdrActivated() +{ + return vdrActivated; +} + +void MVPReceiver::Receive(UCHAR* data, int length) +{ + static int receiveCount = 0; + +// int p = unprocessed->Put(data, length); +// if (p != length) printf("Buffer overrun\n"); + + unprocessed->Put(data, length); + + if (++receiveCount == 15) + { + threadSignal(); + receiveCount = 0; + } +} + +void MVPReceiver::threadMethod() +{ + int amountGot; + UCHAR* dataGot; + + int remuxTook; + UCHAR* remuxedData; + int outputSize; + + while(1) + { + threadWaitForSignal(); + + while(1) + { + dataGot = unprocessed->Get(amountGot); + if (dataGot && (amountGot > 0)) + { + outputSize = 0; + remuxTook = amountGot; + remuxedData = remuxer->Process(dataGot, remuxTook, outputSize); + unprocessed->Del(remuxTook); + + pthread_mutex_lock(&processedRingLock); + processed.put(remuxedData, outputSize); + pthread_mutex_unlock(&processedRingLock); + +// printf("Got from unprocessed: %i, Got from remux: %p %i, consumed: %i\n", +// amountGot, remuxedData, outputSize, remuxTook); + } + else + { + break; + } + } + } +} + +unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount) +{ + pthread_mutex_lock(&processedRingLock); + + int numTries = 0; + + while ((unsigned long)processed.getContent() < amount) + { + pthread_mutex_unlock(&processedRingLock); + if (++numTries == 10) // 5s + { + logger->log("MVPReceiver", Log::DEBUG, "getBlock timeout"); + return 0; + } + usleep(500000); + pthread_mutex_lock(&processedRingLock); + } + + unsigned long amountReceived = processed.get(buffer, amount); + pthread_mutex_unlock(&processedRingLock); + return amountReceived; +} diff --git a/mvpreceiver.h b/mvpreceiver.h new file mode 100755 index 0000000..2d1440d --- /dev/null +++ b/mvpreceiver.h @@ -0,0 +1,95 @@ +/* + Copyright 2004-2005 Chris Tallon + + This file is part of VOMP. + + VOMP is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + VOMP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with VOMP; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#ifndef MVPRECEIVER_H +#define MVPRECEIVER_H + +#include +#include +#include +#include + +#include "tsremux.h" +#include "ts2es.h" +#include "ts2ps.h" + +#include "log.h" +#include "thread.h" +#include "ringbuffer.h" + +class MVPReceiver : public cReceiver, public Thread +{ + public: + static MVPReceiver* create(cChannel*); + virtual ~MVPReceiver(); + int init(); + unsigned long getBlock(unsigned char* buffer, unsigned long amount); + bool isVdrActivated(); + + private: + MVPReceiver(cChannel* channel, cDevice* device); + void threadMethod(); + + Log* logger; + bool vdrActivated; + int inittedOK; + cTSRemux* remuxer; + cRingBufferLinear* unprocessed; // A VDR ring buffer used for the unprocessed data + // it doesn't delete until told and does its own locking + Ringbuffer processed; // A simpler deleting ringbuffer for processed data + pthread_mutex_t processedRingLock; // needs outside locking + + // cReciever stuff + void Activate(bool On); + void Receive(UCHAR *Data, int Length); +}; + +#endif + + +/* + cReceiver docs from the header file + + void Activate(bool On); + // This function is called just before the cReceiver gets attached to + // (On == true) or detached from (On == false) a cDevice. It can be used + // to do things like starting/stopping a thread. + // It is guaranteed that Receive() will not be called before Activate(true). + void Receive(uchar *Data, int Length); + // This function is called from the cDevice we are attached to, and + // delivers one TS packet from the set of PIDs the cReceiver has requested. + // The data packet must be accepted immediately, and the call must return + // as soon as possible, without any unnecessary delay. Each TS packet + // will be delivered only ONCE, so the cReceiver must make sure that + // it will be able to buffer the data if necessary. + +*/ + +/* + + cDevice docs + + static cDevice *GetDevice(const cChannel *Channel, int Priority = -1, bool *NeedsDetachReceivers = NULL); + ///< Returns a device that is able to receive the given Channel at the + ///< given Priority. + ///< See ProvidesChannel() for more information on how + ///< priorities are handled, and the meaning of NeedsDetachReceivers. + +*/ diff --git a/mvpserver.c b/mvpserver.c index 46fb62f..82d9afb 100644 --- a/mvpserver.c +++ b/mvpserver.c @@ -20,34 +20,24 @@ #include "mvpserver.h" -// undeclared function -void MVPServerStartThread(void *arg) -{ - MVPServer *m = (MVPServer *)arg; - m->run2(); -} - - MVPServer::MVPServer() { - runThread = 0; - running = 0; } MVPServer::~MVPServer() { - if (running) stop(); + if (threadIsActive()) stop(); } int MVPServer::stop() { - if (!running) return 0; + if (!threadIsActive()) return 0; - log.shutdown(); - udpr.stop(); + threadCancel(); + log.log("MVPServer", Log::INFO, "Stopped MVPServer thread"); - pthread_cancel(runThread); - pthread_join(runThread, NULL); + udpr.stop(); + log.shutdown(); close(listeningSocket); @@ -56,24 +46,32 @@ int MVPServer::stop() int MVPServer::run() { - if (running) return 1; + if (threadIsActive()) return 1; - log.init(Log::DEBUG, "/tmp/vompserver.log", 0); + log.init(Log::DEBUG, "/tmp/vompserver.log", 1); + if (!udpr.run()) + { + log.log("MVPServer", Log::CRIT, "Could not start UDP replier"); + log.shutdown(); + return 0; + } - if (udpr.run() == 0) return 0; + // start thread here + if (!threadStart()) + { + log.log("MVPServer", Log::CRIT, "Could not start MVPServer thread"); + udpr.stop(); + log.shutdown(); + return 0; + } - if (pthread_create(&runThread, NULL, (void*(*)(void*))MVPServerStartThread, (void *)this) == -1) return 0; log.log("MVPServer", Log::DEBUG, "MVPServer run success"); return 1; } -void MVPServer::run2() +void MVPServer::threadMethod() { - // Thread stuff - // I don't want signals and I want to die as soon as I am cancelled because I'll be in accept() - sigset_t sigset; - sigfillset(&sigset); - pthread_sigmask(SIG_BLOCK, &sigset, NULL); + // I want to die as soon as I am cancelled because I'll be in accept() pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); @@ -86,7 +84,7 @@ void MVPServer::run2() listeningSocket = socket(AF_INET, SOCK_STREAM, 0); if (listeningSocket < 0) { - log.log("MVPServer", Log::DEBUG, "Could not get TCP socket in vompserver"); + log.log("MVPServer", Log::CRIT, "Could not get TCP socket in vompserver"); return; } @@ -95,7 +93,7 @@ void MVPServer::run2() if (bind(listeningSocket,(struct sockaddr *)&address,sizeof(address)) < 0) { - log.log("MVPServer", Log::DEBUG, "Could not bind to socket in vompserver"); + log.log("MVPServer", Log::CRIT, "Could not bind to socket in vompserver"); close(listeningSocket); return; } diff --git a/mvpserver.h b/mvpserver.h index 0dbcac3..a14b1f3 100644 --- a/mvpserver.h +++ b/mvpserver.h @@ -29,22 +29,19 @@ #include "defines.h" #include "udpreplier.h" #include "mvpclient.h" +#include "thread.h" -class MVPServer +class MVPServer : public Thread { public: MVPServer(); - ~MVPServer(); + virtual ~MVPServer(); int run(); int stop(); - // not for external use - void run2(); - private: - pthread_t runThread; - int running; + void threadMethod(); Log log; UDPReplier udpr; diff --git a/ringbuffer.c b/ringbuffer.c index abe7d7d..410d1af 100644 --- a/ringbuffer.c +++ b/ringbuffer.c @@ -31,10 +31,10 @@ Ringbuffer::Ringbuffer() Ringbuffer::~Ringbuffer() { - free(buffer); + if (buffer) free(buffer); + buffer = NULL; capacity = 0; content = 0; - buffer = NULL; start = NULL; end = NULL; } diff --git a/thread.c b/thread.c new file mode 100755 index 0000000..75b4281 --- /dev/null +++ b/thread.c @@ -0,0 +1,92 @@ +/* + Copyright 2004-2005 Chris Tallon + + This file is part of VOMP. + + VOMP is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + VOMP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with VOMP; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "thread.h" + +// Undeclared functions, only for use in this file to start the thread +void threadInternalStart(void *arg) +{ + // I don't want signals + sigset_t sigset; + sigfillset(&sigset); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + Thread *t = (Thread *)arg; + t->threadInternalStart2(); +} + +void Thread::threadInternalStart2() +{ + threadMethod(); +} + +int Thread::threadStart() +{ + pthread_cond_init(&threadCond, NULL); + pthread_mutex_init(&threadCondMutex, NULL); + + threadActive = 1; + if (pthread_create(&pthread, NULL, (void*(*)(void*))threadInternalStart, (void *)this) == -1) return 0; + return 1; +} + +void Thread::threadStop() +{ + threadActive = 0; + // Signal thread here in case it's waiting + threadSignal(); + pthread_join(pthread, NULL); +} + +void Thread::threadCancel() +{ + threadActive = 0; + pthread_cancel(pthread); + pthread_join(pthread, NULL); +} + +void Thread::threadCheckExit() +{ + if (!threadActive) pthread_exit(NULL); +} + +char Thread::threadIsActive() +{ + return threadActive; +} + +void Thread::threadSignal() +{ + pthread_mutex_lock(&threadCondMutex); + pthread_cond_signal(&threadCond); + pthread_mutex_unlock(&threadCondMutex); +} + +void Thread::threadSignalNoLock() +{ + pthread_cond_signal(&threadCond); +} + +void Thread::threadWaitForSignal() +{ + pthread_mutex_lock(&threadCondMutex); + pthread_cond_wait(&threadCond, &threadCondMutex); + pthread_mutex_unlock(&threadCondMutex); +} diff --git a/thread.h b/thread.h new file mode 100755 index 0000000..d57375a --- /dev/null +++ b/thread.h @@ -0,0 +1,57 @@ +/* + Copyright 2004-2005 Chris Tallon + + This file is part of VOMP. + + VOMP is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + VOMP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with VOMP; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#ifndef THREAD_H +#define THREAD_H + +#include +#include + +class Thread +{ + protected: + // Override this method in derived classes + virtual void threadMethod()=0; + + // Methods to use from outside the thread + int threadStart(); // start the thread. threadMethod() will be called in derived class + void threadStop(); // stop the thread nicely. thread will be stopped when it next calls threadCheckExit() + void threadCancel(); // stop thread immediately. thread will be stopped at the next cancellation point + void threadSignal(); // releases a thread that has called threadWaitForSignal + void threadSignalNoLock(); // same as above but without locking guarantees. probably not a good idea. + char threadIsActive(); // returns 1 if thread has been started but not stop() or cancel() 'd + + // Methods to use from inside the thread + void threadCheckExit(); // terminates thread if threadStop() has been called + void threadWaitForSignal(); // pauses thread until threadSignal() is called + + // Internal bits and pieces + + private: + char threadActive; + pthread_t pthread; + pthread_cond_t threadCond; + pthread_mutex_t threadCondMutex; + + public: + void threadInternalStart2(); +}; + +#endif diff --git a/transceiver.c b/transceiver.c deleted file mode 100644 index 4ff34ab..0000000 --- a/transceiver.c +++ /dev/null @@ -1,245 +0,0 @@ -/* - Edited for VOMP by Chris Tallon - Edits Copyright 2004-2005 Chris Tallon - - This class will be replaced soon. -*/ - -/* - * MediaMVP Server - * - * (C) 2003 Dominic Morris - * - * $Id$ - * $Date$ - * - * Transceiver stuff - blatantly stolen from streamdev then changed - * a bit.. - */ - - - - - -#include "transceiver.h" -#include "ts2ps.h" -#include "ts2es.h" -//#include "setup.h" - -#include - -#include -#include - - -#define VIDEOBUFSIZE MEGABYTE(1) - -/* Disable logging if BUFCOUNT buffer overflows occur within BUFOVERTIME - milliseconds. Enable logging again if there is no error within BUFOVERTIME - milliseconds. */ -#define BUFOVERTIME 5000 -#define BUFOVERCOUNT 100 - -#if VDRVERSNUM < 10300 -cMediamvpTransceiver::cMediamvpTransceiver(const cChannel *Channel, int Priority, int Socket, cDevice *Device) : - cReceiver(Channel->Ca(), Priority, 7, Channel->Vpid(), Channel->Ppid(), - Channel->Apid1(), Channel->Apid2(), Channel->Dpid1(), Channel->Dpid2(), - Channel->Tpid()) { -#else -cMediamvpTransceiver::cMediamvpTransceiver(const cChannel *Channel, int Priority, int Socket, cDevice *Device) : - cReceiver(Channel->Ca(), Priority, Channel->Vpid(), - Channel->Apids(), Channel->Dpids(), Channel->Spids()) { -#endif - m_Active = false; - m_Socket = Socket; - m_Remux = NULL; - m_Device = Device; - -//cjt - log = Log::getInstance(); - - m_RingBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 2, true); -// m_RingBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 20, true); - - /* Select the correct Muxing depending on whether it's video or not */ -#if VDRVERSNUM < 10300 - if ( Channel->Vpid() == 0 || Channel->Vpid() == 1 || Channel->Vpid() == 0x1FFF ) { - m_Remux = new cTS2ESRemux(Channel->Apid1()); - } else { - m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid1(), 0, 0, 0, 0); - } -#else - if ( Channel->Vpid() == 0 || Channel->Vpid() == 1 || Channel->Vpid() == 0x1FFF ) { - m_Remux = new cTS2ESRemux(Channel->Apid(0)); - } else { - m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid(0), 0, 0, 0, 0); - } -#endif - log->log("Transciever", Log::DEBUG, "Created transceiver at %p, remux @%p ringbuffer %p",this,m_Remux,m_RingBuffer); - - /* Suggested by Peter Wagner to assist single DVB card systems */ -#ifdef SINGLE_DEVICE - m_Device->SwitchChannel(Channel, true); -#else - m_Device->SwitchChannel(Channel, false); -#endif - Attach(); - - - // CJT - rb.init(1000000); - pthread_mutex_init(&ringLock, NULL); - -} - -cMediamvpTransceiver::~cMediamvpTransceiver(void) -{ - log->log("Transciever", Log::DEBUG, "Deleting transceiver at %p, remux @%p ringbuffer %p",this,m_Remux,m_RingBuffer); - - Detach(); - if (m_Remux) - delete m_Remux; - m_Remux = NULL; - if ( m_RingBuffer) - delete m_RingBuffer; - m_RingBuffer = NULL; -} - -void cMediamvpTransceiver::Activate(bool On) -{ - if (On) - Start(); - else if (m_Active) - Stop(); -} - -void cMediamvpTransceiver::Stop(void) -{ - if (m_Active) { - m_Active = false; - usleep(50000); - Cancel(0); - } -} - -void cMediamvpTransceiver::Receive(uchar *Data, int Length) -{ - static time_t firsterr = 0; - static int errcnt = 0; - static bool showerr = true; - - if (m_Active) { - int p = m_RingBuffer->Put(Data, Length); - if (p != Length) { - ++errcnt; -#if VDRVERSNUM < 10300 - if (showerr) { - if (firsterr == 0) - firsterr = time_ms(); - else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) { - esyslog("ERROR: too many buffer overflows, logging stopped"); - showerr = false; - firsterr = time_ms(); - } - } else if (firsterr + BUFOVERTIME < time_ms()) { - showerr = true; - firsterr = 0; - errcnt = 0; - } - - if (showerr) - esyslog("ERROR: ring buffer overflow (%d bytes dropped)", Length - p); - else - firsterr = time_ms(); -#else - if (showerr) { - if (firsterr == 0) { - firsterr = 1; - lastTime.Set(); - } - else if (lastTime.Elapsed() > BUFOVERTIME && errcnt > BUFOVERCOUNT) { - esyslog("ERROR: too many buffer overflows, logging stopped"); - showerr = false; - } - } else if (lastTime.Elapsed() < BUFOVERTIME) { - showerr = true; - firsterr = 0; - errcnt = 0; - } - - if (showerr) - esyslog("ERROR: ring buffer overflow (%d bytes dropped)", Length - p); - else - firsterr = 1; -#endif - } - } -} - -void cMediamvpTransceiver::Action(void) -{ - int max = 0; - - - log->log("Transciever", Log::DEBUG, "Mediamvp: Transceiver thread started (pid=%d)", getpid()); - - - m_Active = true; - - while (m_Active) { - int recvd; - const uchar *block = m_RingBuffer->Get(recvd); - - if (block && recvd > 0) { - const uchar *sendBlock; - int bytes = 0; - int taken = recvd; - - sendBlock = m_Remux->Process(block, taken, bytes); - - m_RingBuffer->Del(taken); - - if (bytes > max) - max = bytes; - // CJT - - // write(m_Socket,sendBlock,bytes); - // printf("Written %i bytes\n", bytes); - - - pthread_mutex_lock(&ringLock); - rb.put((unsigned char*)sendBlock, bytes); - pthread_mutex_unlock(&ringLock); -//printf("Put %i into buffer\n", bytes); - - - } else - usleep(1); - } - - - log->log("Transciever", Log::DEBUG, "Mediamvp: Transceiver thread ended"); -} - -unsigned long cMediamvpTransceiver::getBlock(unsigned char* buffer, unsigned long amount) -{ - pthread_mutex_lock(&ringLock); - - int numTries = 0; - - while ((unsigned long)rb.getContent() < amount) - { - pthread_mutex_unlock(&ringLock); - if (++numTries == 10) // 5s - { - log->log("Transciever", Log::DEBUG, "getBlock timeout"); - return 0; - } - usleep(500000); - pthread_mutex_lock(&ringLock); - } - - unsigned long amountReceived = rb.get(buffer, amount); - pthread_mutex_unlock(&ringLock); - return amountReceived; -} diff --git a/transceiver.h b/transceiver.h deleted file mode 100644 index 08e721e..0000000 --- a/transceiver.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - Edited for VOMP by Chris Tallon - Edits Copyright 2004-2005 Chris Tallon - - This class will be replaced soon. -*/ - -/* - * MediaMVP Plugin - * - * (C) 2003 Dominic Morris - * - * $Id$ - * $Date$ - * - * - * Transceiver stuff, stolen from streamdev again... - */ - -#ifndef VDR_MEDIAMVP_TRANSCEIVER_H -#define VDR_MEDIAMVP_TRANSCEIVER_H - -#include - -#include -#include - -class cRingBufferLinear; -class cRemux; -class cTSRemux; -class cServerConnection; -class cChannel; - - -#include -#include "ringbuffer.h" -#include "log.h" - -class cMediamvpTransceiver: public cReceiver, public cThread { -// friend class cMediamvpVdrURL; -private: - cDevice *m_Device; - cRingBufferLinear *m_RingBuffer; - cTSRemux *m_Remux; - int m_Socket; - - bool m_Active; -#if VDRVERSNUM >= 10300 - cTimeMs lastTime; -#endif - // CJT - Ringbuffer rb; - pthread_mutex_t ringLock; - Log* log; - - -protected: - virtual void Receive(uchar *Data, int Length); - virtual void Action(void); - -public: - cMediamvpTransceiver(const cChannel *Channel, int Priority, int Socket, cDevice *Device); - virtual ~cMediamvpTransceiver(void); - - bool Attach(void) { return m_Device->AttachReceiver(this); } - void Detach(void) { cReceiver::Detach(); } - - void Stop(void); - - - // CJT - unsigned long getBlock(unsigned char* buffer, unsigned long amount); - virtual void Activate(bool On); - -}; - -#endif // VDR_MEDIAMVP_TRANSCEIVER_H diff --git a/udpreplier.c b/udpreplier.c index e61612d..8b6674c 100644 --- a/udpreplier.c +++ b/udpreplier.c @@ -20,53 +20,35 @@ #include "udpreplier.h" -// undeclared function -void UDPReplierStartThread(void *arg) -{ - UDPReplier *m = (UDPReplier *)arg; - m->run2(); -} - - UDPReplier::UDPReplier() : ds(3024) { - runThread = 0; - running = 0; } UDPReplier::~UDPReplier() { - if (running) stop(); + if (threadIsActive()) stop(); } int UDPReplier::stop() { - if (!running) return 0; - - running = 0; - pthread_cancel(runThread); - pthread_join(runThread, NULL); - + if (!threadIsActive()) return 0; + threadCancel(); return 1; } int UDPReplier::run() { - if (running) return 1; - running = 1; - if (pthread_create(&runThread, NULL, (void*(*)(void*))UDPReplierStartThread, (void *)this) == -1) return 0; + if (threadIsActive()) return 1; + + if (!threadStart()) return 0; + Log::getInstance()->log("UDP", Log::DEBUG, "UDP replier started"); return 1; } -void UDPReplier::run2() +void UDPReplier::threadMethod() { - // I don't want signals - sigset_t sigset; - sigfillset(&sigset); - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - int retval; while(1) { diff --git a/udpreplier.h b/udpreplier.h index 3ef169a..bdcc810 100644 --- a/udpreplier.h +++ b/udpreplier.h @@ -22,27 +22,23 @@ #define UDPREPLIER_H #include -#include #include #include "log.h" #include "dsock.h" +#include "thread.h" -class UDPReplier +class UDPReplier : public Thread { public: UDPReplier(); - ~UDPReplier(); + virtual ~UDPReplier(); int run(); int stop(); - // not for external use - void run2(); - private: - pthread_t runThread; - int running; + void threadMethod(); DatagramSocket ds; }; -- 2.39.2