From 10dc03d3ea553fcef41f292948a20891027579e8 Mon Sep 17 00:00:00 2001 From: Chris Tallon Date: Wed, 21 Nov 2007 23:50:31 +0000 Subject: [PATCH] *** empty log message *** --- defines.h | 1 + mvpclient.h | 61 +++++++++++++++++++++++++++------------------------ mvpreceiver.c | 49 +++++++++++++++++++++++++++++++++++++---- mvpreceiver.h | 16 +++++++++++--- tcp.c | 1 + 5 files changed, 92 insertions(+), 36 deletions(-) diff --git a/defines.h b/defines.h index 81a9a3b..0d13ce7 100644 --- a/defines.h +++ b/defines.h @@ -26,6 +26,7 @@ typedef uint8_t UCHAR; typedef uint16_t USHORT; typedef uint32_t UINT; +typedef int32_t LONG; typedef uint32_t ULONG; typedef uint64_t ULLONG; diff --git a/mvpclient.h b/mvpclient.h index 6f8acc9..dfa767a 100644 --- a/mvpclient.h +++ b/mvpclient.h @@ -44,6 +44,8 @@ #include "config.h" #include "media.h" +class ResponsePacket; + class MVPClient { public: @@ -59,6 +61,7 @@ class MVPClient static int nr_clients; pthread_t runThread; int initted; + Log* log; TCP tcp; Config config; Config* baseConfig; @@ -69,41 +72,41 @@ class MVPClient #ifndef VOMPSTANDALONE MVPReceiver* lp; cRecordings* recordingManager; - RecPlayer* rp; + RecPlayer* recplayer; #endif - Log* log; - int processLogin(UCHAR* buffer, int length); -#ifndef VOMPSTANDALONE - int processGetRecordingsList(UCHAR* data, int length); - int processDeleteRecording(UCHAR* data, int length); - int processMoveRecording(UCHAR* data, int length); - int processGetChannelsList(UCHAR* data, int length); - int processStartStreamingChannel(UCHAR* data, int length); - int processGetBlock(UCHAR* data, int length); - int processStopStreaming(UCHAR* data, int length); - int processStartStreamingRecording(UCHAR* data, int length); - int processGetChannelSchedule(UCHAR* data, int length); - int processGetTimers(UCHAR* data, int length); - int processSetTimer(UCHAR* data, int length); - int processPositionFromFrameNumber(UCHAR* data, int length); - int processFrameNumberFromPosition(UCHAR* data, int length); - int processGetIFrame(UCHAR* data, int length); - int processGetRecInfo(UCHAR* data, int length); - int processGetMarks(UCHAR* data, int length); - int processGetChannelPids(UCHAR* data, int length); - int processDeleteTimer(UCHAR* buffer, int length); - - int processReScanRecording(UCHAR* data, int length); // FIXME obselete + int processLogin(UCHAR* buffer, int length, ResponsePacket* rp); +#ifndef VOMPSTANDALONE + int processGetRecordingsList(UCHAR* data, int length, ResponsePacket* rp); + int processDeleteRecording(UCHAR* data, int length, ResponsePacket* rp); + int processMoveRecording(UCHAR* data, int length, ResponsePacket* rp); + int processGetChannelsList(UCHAR* data, int length, ResponsePacket* rp); + int processStartStreamingChannel(UCHAR* data, int length, ULONG streamID, ResponsePacket* rp); + int processGetBlock(UCHAR* data, int length, ResponsePacket* rp); + int processStopStreaming(UCHAR* data, int length, ResponsePacket* rp); + int processStartStreamingRecording(UCHAR* data, int length, ResponsePacket* rp); + int processGetChannelSchedule(UCHAR* data, int length, ResponsePacket* rp); + int processGetTimers(UCHAR* data, int length, ResponsePacket* rp); + int processSetTimer(UCHAR* data, int length, ResponsePacket* rp); + int processPositionFromFrameNumber(UCHAR* data, int length, ResponsePacket* rp); + int processFrameNumberFromPosition(UCHAR* data, int length, ResponsePacket* rp); + int processGetIFrame(UCHAR* data, int length, ResponsePacket* rp); + int processGetRecInfo(UCHAR* data, int length, ResponsePacket* rp); + int processGetMarks(UCHAR* data, int length, ResponsePacket* rp); + int processGetChannelPids(UCHAR* data, int length, ResponsePacket* rp); + int processDeleteTimer(UCHAR* buffer, int length, ResponsePacket* rp); + + int processReScanRecording(UCHAR* data, int length, ResponsePacket* rp); // FIXME obselete #endif - int processConfigSave(UCHAR* data, int length); - int processConfigLoad(UCHAR* data, int length); - int processGetMediaList(UCHAR* data, int length); - int processGetPicture(UCHAR* data, int length); - int processGetImageBlock(UCHAR* data, int length); + int processConfigSave(UCHAR* data, int length, ResponsePacket* rp); + int processConfigLoad(UCHAR* data, int length, ResponsePacket* rp); + int processGetMediaList(UCHAR* data, int length, ResponsePacket* rp); + int processGetPicture(UCHAR* data, int length, ResponsePacket* rp); + int processGetImageBlock(UCHAR* data, int length, ResponsePacket* rp); void incClients(); void decClients(); + #ifndef VOMPSTANDALONE cChannel* channelFromNumber(ULONG channelNumber); void writeResumeData(); diff --git a/mvpreceiver.c b/mvpreceiver.c index a7dccea..fcc9caf 100755 --- a/mvpreceiver.c +++ b/mvpreceiver.c @@ -34,6 +34,8 @@ MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device) logger = Log::getInstance(); vdrActivated = false; inittedOK = 0; + streamID = 0; + tcp = NULL; // logger->log("MVPReceiver", Log::DEBUG, "Channel has VPID %i APID %i", channel->Vpid(), channel->Apid(0)); @@ -47,21 +49,32 @@ MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device) device->AttachReceiver(this); } -int MVPReceiver::init() +int MVPReceiver::init(TCP* ttcp, ULONG tstreamID) { + tcp = ttcp; + streamID = tstreamID; return inittedOK; } MVPReceiver::~MVPReceiver() { Detach(); + threadStop(); } void MVPReceiver::Activate(bool on) { vdrActivated = on; - if (on) logger->log("MVPReceiver", Log::DEBUG, "VDR active"); - else logger->log("MVPReceiver", Log::DEBUG, "VDR inactive"); + if (on) + { + logger->log("MVPReceiver", Log::DEBUG, "VDR active"); + threadStart(); + } + else + { + logger->log("MVPReceiver", Log::DEBUG, "VDR inactive"); + threadStop(); + } } bool MVPReceiver::isVdrActivated() @@ -73,11 +86,36 @@ void MVPReceiver::Receive(UCHAR* data, int length) { pthread_mutex_lock(&processedRingLock); processed.put(data, length); + if (processed.getContent() > streamChunkSize) threadSignal(); pthread_mutex_unlock(&processedRingLock); } -unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount) +void MVPReceiver::threadMethod() +{ + UCHAR buffer[streamChunkSize + 12]; + int amountReceived; + +// threadSetKillable(); ?? + + while(1) + { + threadWaitForSignal(); + threadCheckExit(); + + pthread_mutex_lock(&processedRingLock); + amountReceived = processed.get(buffer+12, streamChunkSize); + pthread_mutex_unlock(&processedRingLock); + + *(ULONG*)&buffer[0] = htonl(2); // stream channel + *(ULONG*)&buffer[4] = htonl(streamID); + *(ULONG*)&buffer[8] = htonl(amountReceived); + tcp->sendPacket(buffer, amountReceived + 12); + } +} + +ULONG MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount) { +/* pthread_mutex_lock(&processedRingLock); int numTries = 0; @@ -97,4 +135,7 @@ unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount) unsigned long amountReceived = processed.get(buffer, amount); pthread_mutex_unlock(&processedRingLock); return amountReceived; + */ + sleep(10); + return 0; } diff --git a/mvpreceiver.h b/mvpreceiver.h index f6cb63e..523ba9a 100755 --- a/mvpreceiver.h +++ b/mvpreceiver.h @@ -28,14 +28,16 @@ #include "log.h" #include "thread.h" #include "ringbuffer.h" +#include "tcp.h" +#include "thread.h" -class MVPReceiver : public cReceiver +class MVPReceiver : public cReceiver, public Thread { public: static MVPReceiver* create(cChannel*, int priority); virtual ~MVPReceiver(); - int init(); - unsigned long getBlock(unsigned char* buffer, unsigned long amount); + int init(TCP* tcp, ULONG streamID); + ULONG getBlock(unsigned char* buffer, unsigned long amount); bool isVdrActivated(); private: @@ -47,9 +49,17 @@ class MVPReceiver : public cReceiver Ringbuffer processed; // A simpler deleting ringbuffer for processed data pthread_mutex_t processedRingLock; // needs outside locking + TCP* tcp; + ULONG streamID; + ULONG streamDataCollected; + const static int streamChunkSize = 50000; + // cReciever stuff void Activate(bool On); void Receive(UCHAR *Data, int Length); + + protected: + void threadMethod(); }; #endif diff --git a/tcp.c b/tcp.c index 14ed0d4..7053be5 100644 --- a/tcp.c +++ b/tcp.c @@ -253,6 +253,7 @@ int TCP::sendPacket(UCHAR* buf, size_t count) } thisWrite = write(sock, &buf[bytesWritten], count - bytesWritten); + log->log("TCP", Log::DEBUG, "written %i", thisWrite); if (!thisWrite) { // if write returns 0 then connection is closed ? -- 2.39.2