]> git.vomp.tv Git - vompserver.git/blob - mvpreceiver.c
Radio stream chunk size, stream end detection
[vompserver.git] / mvpreceiver.c
1 #include "mvpreceiver.h"
2
3 int MVPReceiver::numMVPReceivers = 0;
4
5 MVPReceiver* MVPReceiver::create(cChannel* channel, int priority)
6 {
7 #if VDRVERSNUM < 10500
8   bool NeedsDetachReceivers;
9   cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
10 #else
11   cDevice* device = cDevice::GetDevice(channel, priority, true); // last param is live-view
12 #endif
13
14   if (!device)
15   {
16     Log::getInstance()->log("MVPReceiver", Log::INFO, "No device found to receive this channel at this priority");
17     return NULL;
18   }
19
20 #if VDRVERSNUM < 10500
21   if (NeedsDetachReceivers)
22   {
23     Log::getInstance()->log("MVPReceiver", Log::WARN, "Needs detach receivers");
24
25     // Need to detach other receivers or VDR will shut down??
26   }
27 #endif
28
29   MVPReceiver* m = new MVPReceiver(channel, device);
30
31   numMVPReceivers++;
32   Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now up to %i", numMVPReceivers);
33   
34   return m;
35 }
36
37 MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
38 #if VDRVERSNUM < 10300
39 : cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid())
40 #elif VDRVERSNUM < 10500
41 : cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
42 #else
43 : cReceiver(channel->GetChannelID(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
44 #endif
45 {
46   logger = Log::getInstance();
47   vdrActivated = false;
48   inittedOK = 0;
49   streamID = 0;
50   tcp = NULL;
51
52 //  logger->log("MVPReceiver", Log::DEBUG, "Channel has VPID %i APID %i", channel->Vpid(), channel->Apid(0));
53
54   if (!processed.init(1000000)) return;
55   pthread_mutex_init(&processedRingLock, NULL);
56
57   // OK
58   
59   // Detect whether this is video or radio and set an appropriate stream chunk size
60   // 50k for video, 5k for radio
61   // Perhaps move this client side?
62   if (channel->Vpid()) streamChunkSize = 50000;
63   else streamChunkSize = 5000;
64   
65   inittedOK = 1;
66   device->SwitchChannel(channel, false);
67   device->AttachReceiver(this);
68 }
69
70 int MVPReceiver::init(TCP* ttcp, ULONG tstreamID)
71 {
72   tcp = ttcp;
73   streamID = tstreamID;
74   return inittedOK;
75 }
76
77 MVPReceiver::~MVPReceiver()
78 {
79   Detach();
80   threadStop();
81
82   numMVPReceivers--;
83   Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now down to %i", numMVPReceivers);
84 }
85
86 void MVPReceiver::Activate(bool on)
87 {
88   vdrActivated = on;
89   if (on) 
90   {
91     logger->log("MVPReceiver", Log::DEBUG, "VDR active");
92     threadStart();
93   }
94   else
95   {
96     logger->log("MVPReceiver", Log::DEBUG, "VDR inactive, sending stream end message");
97     threadStop();
98     sendStreamEnd();
99   }
100 }
101
102 bool MVPReceiver::isVdrActivated()
103 {
104   return vdrActivated;
105 }
106
107 void MVPReceiver::Receive(UCHAR* data, int length)
108 {
109   pthread_mutex_lock(&processedRingLock);
110   processed.put(data, length);
111   if (processed.getContent() > streamChunkSize) threadSignal();
112   pthread_mutex_unlock(&processedRingLock);
113 }
114
115 void MVPReceiver::threadMethod()
116 {
117   ULONG headerLength = sizeof(ULONG) * 4;
118   UCHAR buffer[streamChunkSize + headerLength];
119   int amountReceived;
120
121 //   threadSetKillable(); ??
122
123   while(1)
124   {
125     threadWaitForSignal();
126     threadCheckExit();
127     
128     do
129     {
130       pthread_mutex_lock(&processedRingLock);
131       amountReceived = processed.get(buffer+headerLength, streamChunkSize);
132       pthread_mutex_unlock(&processedRingLock);
133     
134       *(ULONG*)&buffer[0] = htonl(2); // stream channel
135       *(ULONG*)&buffer[4] = htonl(streamID);
136       *(ULONG*)&buffer[8] = htonl(0); // here insert flag: 0 = ok, data follows
137       *(ULONG*)&buffer[12] = htonl(amountReceived);
138
139       tcp->sendPacket(buffer, amountReceived + headerLength);
140     } while(processed.getContent() >= streamChunkSize);
141   }  
142 }
143
144 void MVPReceiver::sendStreamEnd()
145 {
146   ULONG bufferLength = sizeof(ULONG) * 4;
147   UCHAR buffer[bufferLength];
148   *(ULONG*)&buffer[0] = htonl(2); // stream channel
149   *(ULONG*)&buffer[4] = htonl(streamID);
150   *(ULONG*)&buffer[8] = htonl(1); // stream end
151   *(ULONG*)&buffer[12] = htonl(0); // zero length, no more data
152   tcp->sendPacket(buffer, bufferLength);
153 }
154
155 ULONG MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
156 {
157 /*
158   pthread_mutex_lock(&processedRingLock);
159
160   int numTries = 0;
161
162   while ((unsigned long)processed.getContent() < amount)
163   {
164     pthread_mutex_unlock(&processedRingLock);
165     if (++numTries == 30) // 15s
166     {
167       logger->log("MVPReceiver", Log::DEBUG, "getBlock timeout");
168       return 0;
169     }
170     usleep(500000);
171     pthread_mutex_lock(&processedRingLock);
172   }
173
174   unsigned long amountReceived = processed.get(buffer, amount);
175   pthread_mutex_unlock(&processedRingLock);
176   return amountReceived;
177   */
178   sleep(10);
179   return 0;
180 }