]> git.vomp.tv Git - vompserver.git/commitdiff
Replacement transceiver and use of new thread class (from client!)
authorChris Tallon <chris@vomp.tv>
Tue, 9 Aug 2005 00:48:25 +0000 (00:48 +0000)
committerChris Tallon <chris@vomp.tv>
Tue, 9 Aug 2005 00:48:25 +0000 (00:48 +0000)
14 files changed:
Makefile
mvpclient.c
mvpclient.h
mvpreceiver.c [new file with mode: 0755]
mvpreceiver.h [new file with mode: 0755]
mvpserver.c
mvpserver.h
ringbuffer.c
thread.c [new file with mode: 0755]
thread.h [new file with mode: 0755]
transceiver.c [deleted file]
transceiver.h [deleted file]
udpreplier.c
udpreplier.h

index 88a268b07148f1af7d8f667db7f435378534e705..af13c1e00375e6b07b4a559fee93b9aab367a8dc 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -47,8 +47,8 @@ DEFINES += -DPLUGIN_NAME_I18N='"$(PLUGIN)"'
 ### The object files (add further files here):
 
 OBJS = $(PLUGIN).o dsock.o mvpserver.o udpreplier.o mvpclient.o tcp.o \
-                   transceiver.o remux/ts2ps.o remux/ts2es.o remux/tsremux.o ringbuffer.o \
-                   recplayer.o config.o log.o
+                   remux/ts2ps.o remux/ts2es.o remux/tsremux.o ringbuffer.o \
+                   recplayer.o config.o log.o thread.o mvpreceiver.o
 
 
 libdvbmpeg/libdvbmpegtools.a: libdvbmpeg/*.c libdvbmpeg/*.cc libdvbmpeg/*.h libdvbmpeg/*.hh
index ef19bbfd6b2a134c92f46d000e13016851b56ae8..1fe4cd2d270c21d4b637e33eec4efaf546e5392e 100644 (file)
@@ -23,7 +23,7 @@
 MVPClient::MVPClient(int tsocket)
  : tcp(tsocket)
 {
-  cm = NULL;
+  lp = NULL;
   rp = NULL;
   recordingManager = NULL;
   log = Log::getInstance();
@@ -65,11 +65,10 @@ MVPClient::MVPClient(int tsocket)
 MVPClient::~MVPClient()
 {
   log->log("Client", Log::DEBUG, "MVP client destructor");
-  if (cm)
+  if (lp)
   {
-    cm->Stop();
-    delete cm;
-    cm = NULL;
+    delete lp;
+    lp = NULL;
   }
   else if (rp)
   {
@@ -417,49 +416,32 @@ void MVPClient::processStartStreamingChannel(unsigned char* data, int length)
     return;
   }
 
+  lp = MVPReceiver::create(channel);
 
-//  static cDevice *GetDevice(const cChannel *Channel, int Priority = -1, bool *NeedsDetachReceivers = NULL);
-         ///< Returns a device that is able to receive the given Channel at the
-         ///< given Priority.
-         ///< See ProvidesChannel() for more information on how
-         ///< priorities are handled, and the meaning of NeedsDetachReceivers.
-
-  bool NeedsDetachReceivers;
-  cDevice* device = cDevice::GetDevice(channel, 0, &NeedsDetachReceivers);
-  if (!device)
+  if (!lp)
   {
-    log->log("Client", Log::DEBUG, "No device found to receive this channel at this priority");
     sendULONG(0);
+    return;
   }
-  else if (NeedsDetachReceivers)
+
+  if (!lp->init())
   {
-    // can't really happen since we stream with priority zero. if a rec has pri zero maybe
-    log->log("Client", Log::DEBUG, "Needs detach receivers");
+    delete lp;
+    lp = NULL;
     sendULONG(0);
+    return;
   }
-  else
-  {
-    cm = new cMediamvpTransceiver(channel, 0, 0, device);
-    device->AttachReceiver(cm);
-    sendULONG(1);
-  }
-
 
-//////  MVPReceiver* m = new MVPReceiver(channel->Vpid(), channel->Apid1());
-//  cm = new cMediamvpTransceiver(channel, 0, 0, cDevice::ActualDevice());
-//  cDevice::ActualDevice()->AttachReceiver(cm);
-////  //cDevice::ActualDevice()->SwitchChannel(channel, false);
-
-//  sendULONG(1);
+  sendULONG(1);
 }
 
 void MVPClient::processStopStreaming(unsigned char* data, int length)
 {
   log->log("Client", Log::DEBUG, "STOP STREAMING RECEIVED");
-  if (cm)
+  if (lp)
   {
-    delete cm;
-    cm = NULL;
+    delete lp;
+    lp = NULL;
   }
   else if (rp)
   {
@@ -476,7 +458,7 @@ void MVPClient::processStopStreaming(unsigned char* data, int length)
 
 void MVPClient::processGetBlock(unsigned char* data, int length)
 {
-  if (!cm && !rp)
+  if (!lp && !rp)
   {
     log->log("Client", Log::DEBUG, "Get block called when no streaming happening!");
     return;
@@ -490,10 +472,18 @@ void MVPClient::processGetBlock(unsigned char* data, int length)
 
   unsigned char sendBuffer[amount + 4];
   unsigned long amountReceived = 0; // compiler moan.
-  if (cm)
+  if (lp)
   {
     log->log("Client", Log::DEBUG, "getting from live");
-    amountReceived = cm->getBlock(&sendBuffer[4], amount);
+    amountReceived = lp->getBlock(&sendBuffer[4], amount);
+
+    if (!amountReceived)
+    {
+      // vdr has possibly disconnected the receiver
+      log->log("Client", Log::DEBUG, "VDR has disconnected the live receiver");
+      delete lp;
+      lp = NULL;
+    }
   }
   else if (rp)
   {
index c5cbcc1ecac4da511d76db8158644bdb0ae91240..3658d0089225b9dda1da2c522b346551c26833d8 100644 (file)
@@ -34,7 +34,7 @@
 #include <vdr/plugin.h>
 
 #include "tcp.h"
-#include "transceiver.h"
+#include "mvpreceiver.h"
 #include "recplayer.h"
 #include "config.h"
 
@@ -53,7 +53,9 @@ class MVPClient
     int initted;
     TCP tcp;
     Config config;
-    cMediamvpTransceiver* cm;
+    MVPReceiver* lp;
+
+
     cRecordings* recordingManager;
     RecPlayer* rp;
     Log* log;
@@ -74,7 +76,6 @@ class MVPClient
     cChannel* channelFromNumber(unsigned long channelNumber);
     void writeResumeData();
     void cleanConfig();
-
     void sendULONG(ULONG ul);
 
     void testChannelSchedule(unsigned char* data, int length);
diff --git a/mvpreceiver.c b/mvpreceiver.c
new file mode 100755 (executable)
index 0000000..56337c0
--- /dev/null
@@ -0,0 +1,180 @@
+#include "mvpreceiver.h"
+
+MVPReceiver* MVPReceiver::create(cChannel* channel)
+{
+  bool NeedsDetachReceivers;
+  cDevice* device = cDevice::GetDevice(channel, 0, &NeedsDetachReceivers);
+
+  if (!device)
+  {
+    Log::getInstance()->log("MVPReceiver", Log::DEBUG, "No device found to receive this channel at this priority");
+    return NULL;
+  }
+
+  if (NeedsDetachReceivers)
+  {
+    // can't really happen since we stream with priority zero. if a rec has pri zero maybe
+    Log::getInstance()->log("MVPReceiver", Log::DEBUG, "Needs detach receivers");
+    return NULL;
+  }
+
+  MVPReceiver* m = new MVPReceiver(channel, device);
+  return m;
+}
+
+MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
+#if VDRVERSNUM < 10300
+: cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid())
+#else
+: cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
+#endif
+{
+  logger = Log::getInstance();
+  vdrActivated = false;
+  inittedOK = 0;
+  remuxer = NULL;
+  unprocessed = NULL;
+
+  // Init
+
+  // Get the remuxer for audio or video
+
+#if VDRVERSNUM < 10300
+  if ((channel->Vpid() == 0) || (channel->Vpid() == 1) || (channel->Vpid() == 0x1FFF))
+  {
+    remuxer = new cTS2ESRemux(channel->Apid1());
+    logger->log("MVPReceiver", Log::DEBUG, "Created new < 1.3 TS->ES");
+  }
+  else
+  {
+    remuxer = new cTS2PSRemux(channel->Vpid(), channel->Apid1(), 0, 0, 0, 0);
+    logger->log("MVPReceiver", Log::DEBUG, "Created new < 1.3 TS->PS");
+  }
+#else
+  if ((channel->Vpid() == 0) || (channel->Vpid() == 1) || (channel->Vpid() == 0x1FFF))
+  {
+    remuxer = new cTS2ESRemux(channel->Apid(0));
+    logger->log("MVPReceiver", Log::DEBUG, "Created new > 1.3 TS->ES");
+  }
+  else
+  {
+    remuxer = new cTS2PSRemux(channel->Vpid(), channel->Apid(0), 0, 0, 0, 0);
+    logger->log("MVPReceiver", Log::DEBUG, "Created new > 1.3 TS->PS");
+  }
+#endif
+
+  unprocessed = new cRingBufferLinear(1000000, TS_SIZE * 2, false);
+
+  if (!processed.init(1000000)) return;
+  pthread_mutex_init(&processedRingLock, NULL);
+
+  if (!threadStart()) return;
+
+  // OK
+
+  inittedOK = 1;
+  device->SwitchChannel(channel, false);
+  device->AttachReceiver(this);
+}
+
+int MVPReceiver::init()
+{
+  return inittedOK;
+}
+
+MVPReceiver::~MVPReceiver()
+{
+  Detach();
+  if (threadIsActive()) threadCancel();
+  if (unprocessed) delete unprocessed;
+  if (remuxer) delete remuxer;
+}
+
+void MVPReceiver::Activate(bool on)
+{
+  vdrActivated = on;
+  if (on) logger->log("MVPReceiver", Log::DEBUG, "VDR active");
+  else logger->log("MVPReceiver", Log::DEBUG, "VDR inactive");
+}
+
+bool MVPReceiver::isVdrActivated()
+{
+  return vdrActivated;
+}
+
+void MVPReceiver::Receive(UCHAR* data, int length)
+{
+  static int receiveCount = 0;
+
+//  int p = unprocessed->Put(data, length);
+//  if (p != length) printf("Buffer overrun\n");
+
+  unprocessed->Put(data, length);
+
+  if (++receiveCount == 15)
+  {
+    threadSignal();
+    receiveCount = 0;
+  }
+}
+
+void MVPReceiver::threadMethod()
+{
+  int amountGot;
+  UCHAR* dataGot;
+
+  int remuxTook;
+  UCHAR* remuxedData;
+  int outputSize;
+
+  while(1)
+  {
+    threadWaitForSignal();
+
+    while(1)
+    {
+      dataGot = unprocessed->Get(amountGot);
+      if (dataGot && (amountGot > 0))
+      {
+        outputSize = 0;
+        remuxTook = amountGot;
+        remuxedData = remuxer->Process(dataGot, remuxTook, outputSize);
+        unprocessed->Del(remuxTook);
+
+        pthread_mutex_lock(&processedRingLock);
+        processed.put(remuxedData, outputSize);
+        pthread_mutex_unlock(&processedRingLock);
+
+//        printf("Got from unprocessed: %i, Got from remux: %p %i, consumed: %i\n",
+//               amountGot, remuxedData, outputSize, remuxTook);
+      }
+      else
+      {
+        break;
+      }
+    }
+  }
+}
+
+unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
+{
+  pthread_mutex_lock(&processedRingLock);
+
+  int numTries = 0;
+
+  while ((unsigned long)processed.getContent() < amount)
+  {
+    pthread_mutex_unlock(&processedRingLock);
+    if (++numTries == 10) // 5s
+    {
+      logger->log("MVPReceiver", Log::DEBUG, "getBlock timeout");
+      return 0;
+    }
+    usleep(500000);
+    pthread_mutex_lock(&processedRingLock);
+  }
+
+  unsigned long amountReceived = processed.get(buffer, amount);
+  pthread_mutex_unlock(&processedRingLock);
+  return amountReceived;
+}
diff --git a/mvpreceiver.h b/mvpreceiver.h
new file mode 100755 (executable)
index 0000000..2d1440d
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+    Copyright 2004-2005 Chris Tallon
+
+    This file is part of VOMP.
+
+    VOMP is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    VOMP is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with VOMP; if not, write to the Free Software
+    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#ifndef MVPRECEIVER_H
+#define MVPRECEIVER_H
+
+#include <vdr/channels.h>
+#include <vdr/device.h>
+#include <vdr/receiver.h>
+#include <vdr/ringbuffer.h>
+
+#include "tsremux.h"
+#include "ts2es.h"
+#include "ts2ps.h"
+
+#include "log.h"
+#include "thread.h"
+#include "ringbuffer.h"
+
+class MVPReceiver : public cReceiver, public Thread
+{
+  public:
+    static MVPReceiver* create(cChannel*);
+    virtual ~MVPReceiver();
+    int init();
+    unsigned long getBlock(unsigned char* buffer, unsigned long amount);
+    bool isVdrActivated();
+
+  private:
+    MVPReceiver(cChannel* channel, cDevice* device);
+    void threadMethod();
+
+    Log* logger;
+    bool vdrActivated;
+    int inittedOK;
+    cTSRemux* remuxer;
+    cRingBufferLinear* unprocessed; // A VDR ring buffer used for the unprocessed data
+                                    // it doesn't delete until told and does its own locking
+    Ringbuffer processed;    // A simpler deleting ringbuffer for processed data
+    pthread_mutex_t processedRingLock; // needs outside locking
+
+    // cReciever stuff
+    void Activate(bool On);
+    void Receive(UCHAR *Data, int Length);
+};
+
+#endif
+
+
+/*
+    cReceiver docs from the header file
+
+    void Activate(bool On);
+      // This function is called just before the cReceiver gets attached to
+      // (On == true) or detached from (On == false) a cDevice. It can be used
+      // to do things like starting/stopping a thread.
+      // It is guaranteed that Receive() will not be called before Activate(true).
+    void Receive(uchar *Data, int Length);
+      // This function is called from the cDevice we are attached to, and
+      // delivers one TS packet from the set of PIDs the cReceiver has requested.
+      // The data packet must be accepted immediately, and the call must return
+      // as soon as possible, without any unnecessary delay. Each TS packet
+      // will be delivered only ONCE, so the cReceiver must make sure that
+      // it will be able to buffer the data if necessary.
+
+*/
+
+/*
+
+  cDevice docs
+
+  static cDevice *GetDevice(const cChannel *Channel, int Priority = -1, bool *NeedsDetachReceivers = NULL);
+     ///< Returns a device that is able to receive the given Channel at the
+     ///< given Priority.
+     ///< See ProvidesChannel() for more information on how
+     ///< priorities are handled, and the meaning of NeedsDetachReceivers.
+
+*/
index 46fb62f09bdf83fc1df4d5704fab8d2c9bfab123..82d9afb56406672df7d5e50f210445b9848e7a99 100644 (file)
 
 #include "mvpserver.h"
 
-// undeclared function
-void MVPServerStartThread(void *arg)
-{
-  MVPServer *m = (MVPServer *)arg;
-  m->run2();
-}
-
-
 MVPServer::MVPServer()
 {
-  runThread = 0;
-  running = 0;
 }
 
 MVPServer::~MVPServer()
 {
-  if (running) stop();
+  if (threadIsActive()) stop();
 }
 
 int MVPServer::stop()
 {
-  if (!running) return 0;
+  if (!threadIsActive()) return 0;
 
-  log.shutdown();
-  udpr.stop();
+  threadCancel();
+  log.log("MVPServer", Log::INFO, "Stopped MVPServer thread");
 
-  pthread_cancel(runThread);
-  pthread_join(runThread, NULL);
+  udpr.stop();
+  log.shutdown();
 
   close(listeningSocket);
 
@@ -56,24 +46,32 @@ int MVPServer::stop()
 
 int MVPServer::run()
 {
-  if (running) return 1;
+  if (threadIsActive()) return 1;
 
-  log.init(Log::DEBUG, "/tmp/vompserver.log", 0);
+  log.init(Log::DEBUG, "/tmp/vompserver.log", 1);
+  if (!udpr.run())
+  {
+    log.log("MVPServer", Log::CRIT, "Could not start UDP replier");
+    log.shutdown();
+    return 0;
+  }
 
-  if (udpr.run() == 0) return 0;
+  // start thread here
+  if (!threadStart())
+  {
+    log.log("MVPServer", Log::CRIT, "Could not start MVPServer thread");
+    udpr.stop();
+    log.shutdown();
+    return 0;
+  }
 
-  if (pthread_create(&runThread, NULL, (void*(*)(void*))MVPServerStartThread, (void *)this) == -1) return 0;
   log.log("MVPServer", Log::DEBUG, "MVPServer run success");
   return 1;
 }
 
-void MVPServer::run2()
+void MVPServer::threadMethod()
 {
-  // Thread stuff
-  // I don't want signals and I want to die as soon as I am cancelled because I'll be in accept()
-  sigset_t sigset;
-  sigfillset(&sigset);
-  pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+  // I want to die as soon as I am cancelled because I'll be in accept()
   pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
   pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
 
@@ -86,7 +84,7 @@ void MVPServer::run2()
   listeningSocket = socket(AF_INET, SOCK_STREAM, 0);
   if (listeningSocket < 0)
   {
-    log.log("MVPServer", Log::DEBUG, "Could not get TCP socket in vompserver");
+    log.log("MVPServer", Log::CRIT, "Could not get TCP socket in vompserver");
     return;
   }
 
@@ -95,7 +93,7 @@ void MVPServer::run2()
 
   if (bind(listeningSocket,(struct sockaddr *)&address,sizeof(address)) < 0)
   {
-    log.log("MVPServer", Log::DEBUG, "Could not bind to socket in vompserver");
+    log.log("MVPServer", Log::CRIT, "Could not bind to socket in vompserver");
     close(listeningSocket);
     return;
   }
index 0dbcac329c878b925fe771855eaa3762d9bae6e3..a14b1f38a073a81e3b6616262eb957fed0e6cd3a 100644 (file)
 #include "defines.h"
 #include "udpreplier.h"
 #include "mvpclient.h"
+#include "thread.h"
 
-class MVPServer
+class MVPServer : public Thread
 {
   public:
     MVPServer();
-    ~MVPServer();
+    virtual ~MVPServer();
 
     int run();
     int stop();
 
-    // not for external use
-    void run2();
-
   private:
-    pthread_t runThread;
-    int running;
+    void threadMethod();
 
     Log log;
     UDPReplier udpr;
index abe7d7d6a566a5ed78c8278ad558caa4a23de1cf..410d1afd8afe11185b390a97b8a02f4c6389699c 100644 (file)
@@ -31,10 +31,10 @@ Ringbuffer::Ringbuffer()
 
 Ringbuffer::~Ringbuffer()
 {
-  free(buffer);
+  if (buffer) free(buffer);
+  buffer = NULL;
   capacity = 0;
   content = 0;
-  buffer = NULL;
   start = NULL;
   end = NULL;
 }
diff --git a/thread.c b/thread.c
new file mode 100755 (executable)
index 0000000..75b4281
--- /dev/null
+++ b/thread.c
@@ -0,0 +1,92 @@
+/*
+    Copyright 2004-2005 Chris Tallon
+
+    This file is part of VOMP.
+
+    VOMP is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    VOMP is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with VOMP; if not, write to the Free Software
+    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "thread.h"
+
+// Undeclared functions, only for use in this file to start the thread
+void threadInternalStart(void *arg)
+{
+  // I don't want signals
+  sigset_t sigset;
+  sigfillset(&sigset);
+  pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
+  Thread *t = (Thread *)arg;
+  t->threadInternalStart2();
+}
+
+void Thread::threadInternalStart2()
+{
+  threadMethod();
+}
+
+int Thread::threadStart()
+{
+  pthread_cond_init(&threadCond, NULL);
+  pthread_mutex_init(&threadCondMutex, NULL);
+
+  threadActive = 1;
+  if (pthread_create(&pthread, NULL, (void*(*)(void*))threadInternalStart, (void *)this) == -1) return 0;
+  return 1;
+}
+
+void Thread::threadStop()
+{
+  threadActive = 0;
+  // Signal thread here in case it's waiting
+  threadSignal();
+  pthread_join(pthread, NULL);
+}
+
+void Thread::threadCancel()
+{
+  threadActive = 0;
+  pthread_cancel(pthread);
+  pthread_join(pthread, NULL);
+}
+
+void Thread::threadCheckExit()
+{
+  if (!threadActive) pthread_exit(NULL);
+}
+
+char Thread::threadIsActive()
+{
+  return threadActive;
+}
+
+void Thread::threadSignal()
+{
+  pthread_mutex_lock(&threadCondMutex);
+  pthread_cond_signal(&threadCond);
+  pthread_mutex_unlock(&threadCondMutex);
+}
+
+void Thread::threadSignalNoLock()
+{
+  pthread_cond_signal(&threadCond);
+}
+
+void Thread::threadWaitForSignal()
+{
+  pthread_mutex_lock(&threadCondMutex);
+  pthread_cond_wait(&threadCond, &threadCondMutex);
+  pthread_mutex_unlock(&threadCondMutex);
+}
diff --git a/thread.h b/thread.h
new file mode 100755 (executable)
index 0000000..d57375a
--- /dev/null
+++ b/thread.h
@@ -0,0 +1,57 @@
+/*
+    Copyright 2004-2005 Chris Tallon
+
+    This file is part of VOMP.
+
+    VOMP is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    VOMP is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with VOMP; if not, write to the Free Software
+    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#ifndef THREAD_H
+#define THREAD_H
+
+#include <pthread.h>
+#include <signal.h>
+
+class Thread
+{
+  protected:
+    // Override this method in derived classes
+    virtual void threadMethod()=0;
+
+    // Methods to use from outside the thread
+    int threadStart();    // start the thread. threadMethod() will be called in derived class
+    void threadStop();    // stop the thread nicely. thread will be stopped when it next calls threadCheckExit()
+    void threadCancel();  // stop thread immediately. thread will be stopped at the next cancellation point
+    void threadSignal();  // releases a thread that has called threadWaitForSignal
+    void threadSignalNoLock();  // same as above but without locking guarantees. probably not a good idea.
+    char threadIsActive(); // returns 1 if thread has been started but not stop() or cancel() 'd
+
+    // Methods to use from inside the thread
+    void threadCheckExit();      // terminates thread if threadStop() has been called
+    void threadWaitForSignal();  // pauses thread until threadSignal() is called
+
+    // Internal bits and pieces
+
+  private:
+    char threadActive;
+    pthread_t pthread;
+    pthread_cond_t threadCond;
+    pthread_mutex_t threadCondMutex;
+
+  public:
+    void threadInternalStart2();
+};
+
+#endif
diff --git a/transceiver.c b/transceiver.c
deleted file mode 100644 (file)
index 4ff34ab..0000000
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
-     Edited for VOMP by Chris Tallon
-     Edits Copyright 2004-2005 Chris Tallon
-
-     This class will be replaced soon.
-*/
-
-/*
- *   MediaMVP Server
- *
- *   (C) 2003 Dominic Morris
- *
- *   $Id$
- *   $Date$
- *
- *   Transceiver stuff - blatantly stolen from streamdev then changed
- *   a bit..
- */
-
-
-
-
-
-#include "transceiver.h"
-#include "ts2ps.h"
-#include "ts2es.h"
-//#include "setup.h"
-
-#include <vdr/ringbuffer.h>
-
-#include <sys/types.h>
-#include <unistd.h>
-
-
-#define VIDEOBUFSIZE MEGABYTE(1)
-
-/* Disable logging if BUFCOUNT buffer overflows occur within BUFOVERTIME
-   milliseconds. Enable logging again if there is no error within BUFOVERTIME
-   milliseconds. */
-#define BUFOVERTIME  5000
-#define BUFOVERCOUNT 100
-
-#if VDRVERSNUM < 10300
-cMediamvpTransceiver::cMediamvpTransceiver(const cChannel *Channel, int Priority, int Socket, cDevice *Device) :
-                cReceiver(Channel->Ca(), Priority, 7, Channel->Vpid(), Channel->Ppid(),
-                                Channel->Apid1(), Channel->Apid2(), Channel->Dpid1(), Channel->Dpid2(),
-                                Channel->Tpid()) {
-#else
-cMediamvpTransceiver::cMediamvpTransceiver(const cChannel *Channel, int Priority, int Socket, cDevice *Device) :
-                cReceiver(Channel->Ca(), Priority, Channel->Vpid(),
-                                Channel->Apids(), Channel->Dpids(), Channel->Spids()) {
-#endif
-  m_Active = false;
-        m_Socket = Socket;
-        m_Remux = NULL;
-        m_Device = Device;
-
-//cjt
-  log = Log::getInstance();
-
-        m_RingBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 2, true);
-//        m_RingBuffer = new cRingBufferLinear(VIDEOBUFSIZE, TS_SIZE * 20, true);
-
-    /* Select the correct Muxing depending on whether it's video or not */
-#if VDRVERSNUM < 10300
-    if ( Channel->Vpid() == 0 || Channel->Vpid() == 1 || Channel->Vpid() == 0x1FFF ) {
-        m_Remux = new cTS2ESRemux(Channel->Apid1());
-    } else {
-                m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid1(), 0, 0, 0, 0);
-    }
-#else
-    if ( Channel->Vpid() == 0 || Channel->Vpid() == 1 || Channel->Vpid() == 0x1FFF ) {
-        m_Remux = new cTS2ESRemux(Channel->Apid(0));
-    } else {
-                m_Remux = new cTS2PSRemux(Channel->Vpid(), Channel->Apid(0), 0, 0, 0, 0);
-    }
-#endif
-    log->log("Transciever", Log::DEBUG, "Created transceiver at %p, remux @%p ringbuffer %p",this,m_Remux,m_RingBuffer);
-
-    /* Suggested by Peter Wagner to assist single DVB card systems */
-#ifdef SINGLE_DEVICE
-        m_Device->SwitchChannel(Channel, true);
-#else
-        m_Device->SwitchChannel(Channel, false);
-#endif
-        Attach();
-
-
-        // CJT
-        rb.init(1000000);
-        pthread_mutex_init(&ringLock, NULL);
-
-}
-
-cMediamvpTransceiver::~cMediamvpTransceiver(void)
-{
-    log->log("Transciever", Log::DEBUG, "Deleting transceiver at %p, remux @%p ringbuffer %p",this,m_Remux,m_RingBuffer);
-
-        Detach();
-        if (m_Remux)
-        delete m_Remux;
-    m_Remux = NULL;
-    if ( m_RingBuffer)
-        delete m_RingBuffer;
-    m_RingBuffer = NULL;
-}
-
-void cMediamvpTransceiver::Activate(bool On)
-{
-        if (On)
-                Start();
-        else if (m_Active)
-                Stop();
-}
-
-void cMediamvpTransceiver::Stop(void)
-{
-        if (m_Active) {
-                m_Active = false;
-                usleep(50000);
-                Cancel(0);
-        }
-}
-
-void cMediamvpTransceiver::Receive(uchar *Data, int Length)
-{
-        static time_t firsterr = 0;
-        static int errcnt = 0;
-        static bool showerr = true;
-
-        if (m_Active) {
-                int p = m_RingBuffer->Put(Data, Length);
-                if (p != Length) {
-                        ++errcnt;
-#if VDRVERSNUM < 10300
-      if (showerr) {
-                                if (firsterr == 0)
-                                        firsterr = time_ms();
-                                else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) {
-                                        esyslog("ERROR: too many buffer overflows, logging stopped");
-                                        showerr = false;
-                                        firsterr = time_ms();
-                                }
-                        } else if (firsterr + BUFOVERTIME < time_ms()) {
-                                showerr = true;
-                                firsterr = 0;
-                                errcnt = 0;
-                        }
-
-                        if (showerr)
-                                esyslog("ERROR: ring buffer overflow (%d bytes dropped)", Length - p);
-                        else
-                                firsterr = time_ms();
-#else
-      if (showerr) {
-                                if (firsterr == 0) {
-                                        firsterr = 1;
-          lastTime.Set();
-        }
-                                else if (lastTime.Elapsed() > BUFOVERTIME && errcnt > BUFOVERCOUNT) {
-                                        esyslog("ERROR: too many buffer overflows, logging stopped");
-                                        showerr = false;
-                                }
-                        } else if (lastTime.Elapsed() < BUFOVERTIME) {
-                                showerr = true;
-                                firsterr = 0;
-                                errcnt = 0;
-                        }
-
-                        if (showerr)
-                                esyslog("ERROR: ring buffer overflow (%d bytes dropped)", Length - p);
-                        else
-                                firsterr = 1;
-#endif
-                }
-        }
-}
-
-void cMediamvpTransceiver::Action(void)
-{
-        int max = 0;
-
-
-    log->log("Transciever", Log::DEBUG, "Mediamvp: Transceiver thread started (pid=%d)", getpid());
-
-
-        m_Active = true;
-
-        while (m_Active) {
-                int recvd;
-                const uchar *block = m_RingBuffer->Get(recvd);
-
-                if (block && recvd > 0) {
-                        const uchar *sendBlock;
-                        int bytes = 0;
-                        int taken = recvd;
-
-            sendBlock = m_Remux->Process(block, taken, bytes);
-
-                        m_RingBuffer->Del(taken);
-
-                        if (bytes > max)
-                                max = bytes;
-      // CJT
-
-       //     write(m_Socket,sendBlock,bytes);
-      //      printf("Written %i bytes\n", bytes);
-
-
-      pthread_mutex_lock(&ringLock);
-      rb.put((unsigned char*)sendBlock, bytes);
-      pthread_mutex_unlock(&ringLock);
-//printf("Put %i into buffer\n", bytes);
-
-
-                } else
-                        usleep(1);
-        }
-
-
-    log->log("Transciever", Log::DEBUG, "Mediamvp: Transceiver thread ended");
-}
-
-unsigned long cMediamvpTransceiver::getBlock(unsigned char* buffer, unsigned long amount)
-{
-  pthread_mutex_lock(&ringLock);
-
-  int numTries = 0;
-
-  while ((unsigned long)rb.getContent() < amount)
-  {
-    pthread_mutex_unlock(&ringLock);
-    if (++numTries == 10) // 5s
-    {
-      log->log("Transciever", Log::DEBUG, "getBlock timeout");
-      return 0;
-    }
-    usleep(500000);
-    pthread_mutex_lock(&ringLock);
-  }
-
-  unsigned long amountReceived = rb.get(buffer, amount);
-  pthread_mutex_unlock(&ringLock);
-  return amountReceived;
-}
diff --git a/transceiver.h b/transceiver.h
deleted file mode 100644 (file)
index 08e721e..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
-     Edited for VOMP by Chris Tallon
-     Edits Copyright 2004-2005 Chris Tallon
-
-     This class will be replaced soon.
-*/
-
-/*
- *   MediaMVP Plugin
- *
- *   (C) 2003 Dominic Morris
- *
- *   $Id$
- *   $Date$
- *
- *
- *   Transceiver stuff, stolen from streamdev again...
- */
-
-#ifndef VDR_MEDIAMVP_TRANSCEIVER_H
-#define VDR_MEDIAMVP_TRANSCEIVER_H
-
-#include <vdr/receiver.h>
-
-#include <vdr/thread.h>
-#include <vdr/status.h>
-
-class cRingBufferLinear;
-class cRemux;
-class cTSRemux;
-class cServerConnection;
-class cChannel;
-
-
-#include <pthread.h>
-#include "ringbuffer.h"
-#include "log.h"
-
-class cMediamvpTransceiver: public cReceiver, public cThread {
-//    friend class cMediamvpVdrURL;
-private:
-        cDevice *m_Device;
-        cRingBufferLinear *m_RingBuffer;
-        cTSRemux *m_Remux;
-    int       m_Socket;
-
-        bool m_Active;
-#if VDRVERSNUM >= 10300
-  cTimeMs lastTime;
-#endif
-        // CJT
-        Ringbuffer rb;
-        pthread_mutex_t ringLock;
-        Log* log;
-
-
-protected:
-        virtual void Receive(uchar *Data, int Length);
-        virtual void Action(void);
-
-public:
-        cMediamvpTransceiver(const cChannel *Channel, int Priority, int Socket, cDevice *Device);
-        virtual ~cMediamvpTransceiver(void);
-
-        bool Attach(void) { return m_Device->AttachReceiver(this); }
-        void Detach(void) { cReceiver::Detach(); }
-
-        void Stop(void);
-
-
-        // CJT
-        unsigned long getBlock(unsigned char* buffer, unsigned long amount);
-        virtual void Activate(bool On);
-
-};
-
-#endif // VDR_MEDIAMVP_TRANSCEIVER_H
index e61612d69c18acf62e9f3c0a59f5e5a477910859..8b6674ca8955e25e724a91f099073872a7ce19ea 100644 (file)
 
 #include "udpreplier.h"
 
-// undeclared function
-void UDPReplierStartThread(void *arg)
-{
-  UDPReplier *m = (UDPReplier *)arg;
-  m->run2();
-}
-
-
 UDPReplier::UDPReplier()
  : ds(3024)
 {
-  runThread = 0;
-  running = 0;
 }
 
 UDPReplier::~UDPReplier()
 {
-  if (running) stop();
+  if (threadIsActive()) stop();
 }
 
 int UDPReplier::stop()
 {
-  if (!running) return 0;
-
-  running = 0;
-  pthread_cancel(runThread);
-  pthread_join(runThread, NULL);
-
+  if (!threadIsActive()) return 0;
+  threadCancel();
   return 1;
 }
 
 int UDPReplier::run()
 {
-  if (running) return 1;
-  running = 1;
-  if (pthread_create(&runThread, NULL, (void*(*)(void*))UDPReplierStartThread, (void *)this) == -1) return 0;
+  if (threadIsActive()) return 1;
+
+  if (!threadStart()) return 0;
+
   Log::getInstance()->log("UDP", Log::DEBUG, "UDP replier started");
   return 1;
 }
 
-void UDPReplier::run2()
+void UDPReplier::threadMethod()
 {
-  // I don't want signals
-  sigset_t sigset;
-  sigfillset(&sigset);
-  pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
   int retval;
   while(1)
   {
index 3ef169ac4204c1a0f0da19d007e22bb6e1140bbc..bdcc81086ab6e572365f94678340f3c9de129f62 100644 (file)
 #define UDPREPLIER_H
 
 #include <stdio.h>
-#include <pthread.h>
 #include <signal.h>
 
 #include "log.h"
 #include "dsock.h"
+#include "thread.h"
 
-class UDPReplier
+class UDPReplier : public Thread
 {
   public:
     UDPReplier();
-    ~UDPReplier();
+    virtual ~UDPReplier();
 
     int run();
     int stop();
 
-    // not for external use
-    void run2();
-
   private:
-    pthread_t runThread;
-    int running;
+    void threadMethod();
 
     DatagramSocket ds;
 };