]> git.vomp.tv Git - vompserver.git/commitdiff
Radio stream chunk size, stream end detection
authorChris Tallon <chris@vomp.tv>
Tue, 6 May 2008 21:00:22 +0000 (21:00 +0000)
committerChris Tallon <chris@vomp.tv>
Tue, 6 May 2008 21:00:22 +0000 (21:00 +0000)
mvpreceiver.c
mvpreceiver.h

index 292cc175fb334aee1bcf2d180c3184f09d0ddae3..f901a702d439a1c8bd70d55e6f14efef02e5aff3 100755 (executable)
@@ -1,30 +1,36 @@
 #include "mvpreceiver.h"
 
+int MVPReceiver::numMVPReceivers = 0;
+
 MVPReceiver* MVPReceiver::create(cChannel* channel, int priority)
 {
 #if VDRVERSNUM < 10500
-   bool NeedsDetachReceivers;
-   cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
+  bool NeedsDetachReceivers;
+  cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
 #else
   cDevice* device = cDevice::GetDevice(channel, priority, true); // last param is live-view
 #endif
 
   if (!device)
   {
-    Log::getInstance()->log("MVPReceiver", Log::DEBUG, "No device found to receive this channel at this priority");
+    Log::getInstance()->log("MVPReceiver", Log::INFO, "No device found to receive this channel at this priority");
     return NULL;
   }
 
 #if VDRVERSNUM < 10500
   if (NeedsDetachReceivers)
   {
-    Log::getInstance()->log("MVPReceiver", Log::DEBUG, "Needs detach receivers");
+    Log::getInstance()->log("MVPReceiver", Log::WARN, "Needs detach receivers");
 
     // Need to detach other receivers or VDR will shut down??
   }
 #endif
 
   MVPReceiver* m = new MVPReceiver(channel, device);
+
+  numMVPReceivers++;
+  Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now up to %i", numMVPReceivers);
+  
   return m;
 }
 
@@ -49,7 +55,13 @@ MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
   pthread_mutex_init(&processedRingLock, NULL);
 
   // OK
-
+  
+  // Detect whether this is video or radio and set an appropriate stream chunk size
+  // 50k for video, 5k for radio
+  // Perhaps move this client side?
+  if (channel->Vpid()) streamChunkSize = 50000;
+  else streamChunkSize = 5000;
+  
   inittedOK = 1;
   device->SwitchChannel(channel, false);
   device->AttachReceiver(this);
@@ -66,6 +78,9 @@ MVPReceiver::~MVPReceiver()
 {
   Detach();
   threadStop();
+
+  numMVPReceivers--;
+  Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now down to %i", numMVPReceivers);
 }
 
 void MVPReceiver::Activate(bool on)
@@ -78,8 +93,9 @@ void MVPReceiver::Activate(bool on)
   }
   else
   {
-    logger->log("MVPReceiver", Log::DEBUG, "VDR inactive");
+    logger->log("MVPReceiver", Log::DEBUG, "VDR inactive, sending stream end message");
     threadStop();
+    sendStreamEnd();
   }
 }
 
@@ -98,7 +114,8 @@ void MVPReceiver::Receive(UCHAR* data, int length)
 
 void MVPReceiver::threadMethod()
 {
-  UCHAR buffer[streamChunkSize + 12];
+  ULONG headerLength = sizeof(ULONG) * 4;
+  UCHAR buffer[streamChunkSize + headerLength];
   int amountReceived;
 
 //   threadSetKillable(); ??
@@ -111,17 +128,30 @@ void MVPReceiver::threadMethod()
     do
     {
       pthread_mutex_lock(&processedRingLock);
-      amountReceived = processed.get(buffer+12, streamChunkSize);
+      amountReceived = processed.get(buffer+headerLength, streamChunkSize);
       pthread_mutex_unlock(&processedRingLock);
     
       *(ULONG*)&buffer[0] = htonl(2); // stream channel
       *(ULONG*)&buffer[4] = htonl(streamID);
-      *(ULONG*)&buffer[8] = htonl(amountReceived);
-      tcp->sendPacket(buffer, amountReceived + 12);
+      *(ULONG*)&buffer[8] = htonl(0); // here insert flag: 0 = ok, data follows
+      *(ULONG*)&buffer[12] = htonl(amountReceived);
+
+      tcp->sendPacket(buffer, amountReceived + headerLength);
     } while(processed.getContent() >= streamChunkSize);
   }  
 }
 
+void MVPReceiver::sendStreamEnd()
+{
+  ULONG bufferLength = sizeof(ULONG) * 4;
+  UCHAR buffer[bufferLength];
+  *(ULONG*)&buffer[0] = htonl(2); // stream channel
+  *(ULONG*)&buffer[4] = htonl(streamID);
+  *(ULONG*)&buffer[8] = htonl(1); // stream end
+  *(ULONG*)&buffer[12] = htonl(0); // zero length, no more data
+  tcp->sendPacket(buffer, bufferLength);
+}
+
 ULONG MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
 {
 /*
index cee6611663ad668371e17d71531e56a6989917ab..7fec4411416e9f27d4dcb2708cf2a73a8b92b8cc 100755 (executable)
@@ -52,11 +52,14 @@ class MVPReceiver : public cReceiver, public Thread
     TCP* tcp;
     ULONG streamID;
     ULONG streamDataCollected;
-    const static int streamChunkSize = 50000;
+    int streamChunkSize;
 
     // cReciever stuff
     void Activate(bool On);
     void Receive(UCHAR *Data, int Length);
+    void sendStreamEnd();
+
+    static int numMVPReceivers;
     
   protected:
     void threadMethod();