]> git.vomp.tv Git - vompserver.git/commitdiff
*** empty log message ***
authorChris Tallon <chris@vomp.tv>
Wed, 21 Nov 2007 23:50:31 +0000 (23:50 +0000)
committerChris Tallon <chris@vomp.tv>
Wed, 21 Nov 2007 23:50:31 +0000 (23:50 +0000)
defines.h
mvpclient.h
mvpreceiver.c
mvpreceiver.h
tcp.c

index 81a9a3b1d7b429f39e375e6f8eb186f298b2b254..0d13ce7af7b505c6ad5628d63792b5a884543a55 100644 (file)
--- a/defines.h
+++ b/defines.h
@@ -26,6 +26,7 @@
 typedef uint8_t  UCHAR;
 typedef uint16_t USHORT;
 typedef uint32_t UINT;
+typedef int32_t  LONG;
 typedef uint32_t ULONG;
 typedef uint64_t ULLONG;
 
index 6f8acc9e86d2f6709e1577f9a77354fa3f8f2b58..dfa767ab592d0f3007aadcc67b0ab0498350483e 100644 (file)
@@ -44,6 +44,8 @@
 #include "config.h"
 #include "media.h"
 
+class ResponsePacket;
+
 class MVPClient
 {
   public:
@@ -59,6 +61,7 @@ class MVPClient
     static int nr_clients;
     pthread_t runThread;
     int initted;
+    Log* log;
     TCP tcp;
     Config config;
     Config* baseConfig;
@@ -69,41 +72,41 @@ class MVPClient
 #ifndef VOMPSTANDALONE
     MVPReceiver* lp;
     cRecordings* recordingManager;
-    RecPlayer* rp;
+    RecPlayer* recplayer;
 #endif
-    Log* log;
 
-    int processLogin(UCHAR* buffer, int length);
-#ifndef VOMPSTANDALONE 
-    int processGetRecordingsList(UCHAR* data, int length);
-    int processDeleteRecording(UCHAR* data, int length);
-    int processMoveRecording(UCHAR* data, int length);
-    int processGetChannelsList(UCHAR* data, int length);
-    int processStartStreamingChannel(UCHAR* data, int length);
-    int processGetBlock(UCHAR* data, int length);
-    int processStopStreaming(UCHAR* data, int length);
-    int processStartStreamingRecording(UCHAR* data, int length);
-    int processGetChannelSchedule(UCHAR* data, int length);
-    int processGetTimers(UCHAR* data, int length);
-    int processSetTimer(UCHAR* data, int length);
-    int processPositionFromFrameNumber(UCHAR* data, int length);
-    int processFrameNumberFromPosition(UCHAR* data, int length);
-    int processGetIFrame(UCHAR* data, int length);
-    int processGetRecInfo(UCHAR* data, int length);
-    int processGetMarks(UCHAR* data, int length);
-    int processGetChannelPids(UCHAR* data, int length);
-    int processDeleteTimer(UCHAR* buffer, int length);
-
-    int processReScanRecording(UCHAR* data, int length);           // FIXME obselete
+    int processLogin(UCHAR* buffer, int length, ResponsePacket* rp);
+#ifndef VOMPSTANDALONE
+    int processGetRecordingsList(UCHAR* data, int length, ResponsePacket* rp);
+    int processDeleteRecording(UCHAR* data, int length, ResponsePacket* rp);
+    int processMoveRecording(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetChannelsList(UCHAR* data, int length, ResponsePacket* rp);
+    int processStartStreamingChannel(UCHAR* data, int length, ULONG streamID, ResponsePacket* rp);
+    int processGetBlock(UCHAR* data, int length, ResponsePacket* rp);
+    int processStopStreaming(UCHAR* data, int length, ResponsePacket* rp);
+    int processStartStreamingRecording(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetChannelSchedule(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetTimers(UCHAR* data, int length, ResponsePacket* rp);
+    int processSetTimer(UCHAR* data, int length, ResponsePacket* rp);
+    int processPositionFromFrameNumber(UCHAR* data, int length, ResponsePacket* rp);
+    int processFrameNumberFromPosition(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetIFrame(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetRecInfo(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetMarks(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetChannelPids(UCHAR* data, int length, ResponsePacket* rp);
+    int processDeleteTimer(UCHAR* buffer, int length, ResponsePacket* rp);
+
+    int processReScanRecording(UCHAR* data, int length, ResponsePacket* rp);           // FIXME obselete
 #endif
-    int processConfigSave(UCHAR* data, int length);
-    int processConfigLoad(UCHAR* data, int length);
-    int processGetMediaList(UCHAR* data, int length);
-    int processGetPicture(UCHAR* data, int length);
-    int processGetImageBlock(UCHAR* data, int length);
+    int processConfigSave(UCHAR* data, int length, ResponsePacket* rp);
+    int processConfigLoad(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetMediaList(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetPicture(UCHAR* data, int length, ResponsePacket* rp);
+    int processGetImageBlock(UCHAR* data, int length, ResponsePacket* rp);
 
     void incClients();
     void decClients();
+
 #ifndef VOMPSTANDALONE
     cChannel* channelFromNumber(ULONG channelNumber);
     void writeResumeData();
index a7dccea3b022e84d2c80cb6dfbd0be7a32c2d505..fcc9caf0c4c3964e2167a9d4e3291ad017152da2 100755 (executable)
@@ -34,6 +34,8 @@ MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
   logger = Log::getInstance();
   vdrActivated = false;
   inittedOK = 0;
+  streamID = 0;
+  tcp = NULL;
 
 //  logger->log("MVPReceiver", Log::DEBUG, "Channel has VPID %i APID %i", channel->Vpid(), channel->Apid(0));
 
@@ -47,21 +49,32 @@ MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
   device->AttachReceiver(this);
 }
 
-int MVPReceiver::init()
+int MVPReceiver::init(TCP* ttcp, ULONG tstreamID)
 {
+  tcp = ttcp;
+  streamID = tstreamID;
   return inittedOK;
 }
 
 MVPReceiver::~MVPReceiver()
 {
   Detach();
+  threadStop();
 }
 
 void MVPReceiver::Activate(bool on)
 {
   vdrActivated = on;
-  if (on) logger->log("MVPReceiver", Log::DEBUG, "VDR active");
-  else logger->log("MVPReceiver", Log::DEBUG, "VDR inactive");
+  if (on) 
+  {
+    logger->log("MVPReceiver", Log::DEBUG, "VDR active");
+    threadStart();
+  }
+  else
+  {
+    logger->log("MVPReceiver", Log::DEBUG, "VDR inactive");
+    threadStop();
+  }
 }
 
 bool MVPReceiver::isVdrActivated()
@@ -73,11 +86,36 @@ void MVPReceiver::Receive(UCHAR* data, int length)
 {
   pthread_mutex_lock(&processedRingLock);
   processed.put(data, length);
+  if (processed.getContent() > streamChunkSize) threadSignal();
   pthread_mutex_unlock(&processedRingLock);
 }
 
-unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
+void MVPReceiver::threadMethod()
+{
+  UCHAR buffer[streamChunkSize + 12];
+  int amountReceived;
+
+//   threadSetKillable(); ??
+
+  while(1)
+  {
+    threadWaitForSignal();
+    threadCheckExit();
+    
+    pthread_mutex_lock(&processedRingLock);
+    amountReceived = processed.get(buffer+12, streamChunkSize);
+    pthread_mutex_unlock(&processedRingLock);
+    
+    *(ULONG*)&buffer[0] = htonl(2); // stream channel
+    *(ULONG*)&buffer[4] = htonl(streamID);
+    *(ULONG*)&buffer[8] = htonl(amountReceived);
+    tcp->sendPacket(buffer, amountReceived + 12);
+  }  
+}
+
+ULONG MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
 {
+/*
   pthread_mutex_lock(&processedRingLock);
 
   int numTries = 0;
@@ -97,4 +135,7 @@ unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
   unsigned long amountReceived = processed.get(buffer, amount);
   pthread_mutex_unlock(&processedRingLock);
   return amountReceived;
+  */
+  sleep(10);
+  return 0;
 }
index f6cb63eeefae8d02f074133b906408b33d135cba..523ba9a004e20da8c46e3052709980c95db900b1 100755 (executable)
 #include "log.h"
 #include "thread.h"
 #include "ringbuffer.h"
+#include "tcp.h"
+#include "thread.h"
 
-class MVPReceiver : public cReceiver
+class MVPReceiver : public cReceiver, public Thread
 {
   public:
     static MVPReceiver* create(cChannel*, int priority);
     virtual ~MVPReceiver();
-    int init();
-    unsigned long getBlock(unsigned char* buffer, unsigned long amount);
+    int init(TCP* tcp, ULONG streamID);
+    ULONG getBlock(unsigned char* buffer, unsigned long amount);
     bool isVdrActivated();
 
   private:
@@ -47,9 +49,17 @@ class MVPReceiver : public cReceiver
     Ringbuffer processed;    // A simpler deleting ringbuffer for processed data
     pthread_mutex_t processedRingLock; // needs outside locking
 
+    TCP* tcp;
+    ULONG streamID;
+    ULONG streamDataCollected;
+    const static int streamChunkSize = 50000;
+
     // cReciever stuff
     void Activate(bool On);
     void Receive(UCHAR *Data, int Length);
+    
+  protected:
+    void threadMethod();
 };
 
 #endif
diff --git a/tcp.c b/tcp.c
index 14ed0d4a3b398fdf88cd29c63f25166d6d1f3abf..7053be50bcd42c0b323548fb19816cf14cc50059 100644 (file)
--- a/tcp.c
+++ b/tcp.c
@@ -253,6 +253,7 @@ int TCP::sendPacket(UCHAR* buf, size_t count)
     }
 
     thisWrite = write(sock, &buf[bytesWritten], count - bytesWritten);
+          log->log("TCP", Log::DEBUG, "written %i", thisWrite);
     if (!thisWrite)
     {
       // if write returns 0 then connection is closed ?