]> git.vomp.tv Git - vompserver.git/blob - mvpreceiver.c
Server side timeouts
[vompserver.git] / mvpreceiver.c
1 #include "mvpreceiver.h"
2
3 int MVPReceiver::numMVPReceivers = 0;
4
5 MVPReceiver* MVPReceiver::create(cChannel* channel, int priority)
6 {
7 #if VDRVERSNUM < 10500
8   bool NeedsDetachReceivers;
9   cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
10 #else
11   cDevice* device = cDevice::GetDevice(channel, priority, true); // last param is live-view
12 #endif
13
14   if (!device)
15   {
16     Log::getInstance()->log("MVPReceiver", Log::INFO, "No device found to receive this channel at this priority");
17     return NULL;
18   }
19
20 #if VDRVERSNUM < 10500
21   if (NeedsDetachReceivers)
22   {
23     Log::getInstance()->log("MVPReceiver", Log::WARN, "Needs detach receivers");
24
25     // Need to detach other receivers or VDR will shut down??
26   }
27 #endif
28
29   MVPReceiver* m = new MVPReceiver(channel, device);
30
31   numMVPReceivers++;
32   Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now up to %i", numMVPReceivers);
33   
34   return m;
35 }
36
37 MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
38 #if VDRVERSNUM < 10300
39 : cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid())
40 #elif VDRVERSNUM < 10500
41 : cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
42 #else
43 : cReceiver(channel->GetChannelID(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
44 #endif
45 {
46   logger = Log::getInstance();
47   vdrActivated = false;
48   inittedOK = 0;
49   streamID = 0;
50   tcp = NULL;
51
52 //  logger->log("MVPReceiver", Log::DEBUG, "Channel has VPID %i APID %i", channel->Vpid(), channel->Apid(0));
53
54   if (!processed.init(1000000)) return;
55   pthread_mutex_init(&processedRingLock, NULL);
56
57   // OK
58   
59   // Detect whether this is video or radio and set an appropriate stream chunk size
60   // 50k for video, 5k for radio
61   // Perhaps move this client side?
62   if (channel->Vpid()) streamChunkSize = 50000;
63   else streamChunkSize = 5000;
64   
65   inittedOK = 1;
66   device->SwitchChannel(channel, false);
67   device->AttachReceiver(this);
68 }
69
70 int MVPReceiver::init(TCP* ttcp, ULONG tstreamID)
71 {
72   tcp = ttcp;
73   streamID = tstreamID;
74   return inittedOK;
75 }
76
77 MVPReceiver::~MVPReceiver()
78 {
79   Detach();
80   threadStop();
81
82   numMVPReceivers--;
83   Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now down to %i", numMVPReceivers);
84 }
85
86 void MVPReceiver::Activate(bool on)
87 {
88   vdrActivated = on;
89   if (on) 
90   {
91     logger->log("MVPReceiver", Log::DEBUG, "VDR active");
92     threadStart();
93   }
94   else
95   {
96     logger->log("MVPReceiver", Log::DEBUG, "VDR inactive, sending stream end message");
97     threadStop();
98     sendStreamEnd();
99   }
100 }
101
102 bool MVPReceiver::isVdrActivated()
103 {
104   return vdrActivated;
105 }
106
107 void MVPReceiver::Receive(UCHAR* data, int length)
108 {
109   pthread_mutex_lock(&processedRingLock);
110   processed.put(data, length);
111   if (processed.getContent() > streamChunkSize) threadSignal();
112   pthread_mutex_unlock(&processedRingLock);
113 }
114
115 void MVPReceiver::threadMethod()
116 {
117   ULONG headerLength = sizeof(ULONG) * 4;
118   UCHAR buffer[streamChunkSize + headerLength];
119   int amountReceived;
120
121 //   threadSetKillable(); ??
122
123   while(1)
124   {
125     threadLock();
126     threadWaitForSignal();
127     threadUnlock();
128     threadCheckExit();
129     
130     do
131     {
132       pthread_mutex_lock(&processedRingLock);
133       amountReceived = processed.get(buffer+headerLength, streamChunkSize);
134       pthread_mutex_unlock(&processedRingLock);
135     
136       *(ULONG*)&buffer[0] = htonl(2); // stream channel
137       *(ULONG*)&buffer[4] = htonl(streamID);
138       *(ULONG*)&buffer[8] = htonl(0); // here insert flag: 0 = ok, data follows
139       *(ULONG*)&buffer[12] = htonl(amountReceived);
140
141       tcp->sendPacket(buffer, amountReceived + headerLength);
142     } while(processed.getContent() >= streamChunkSize);
143   }  
144 }
145
146 void MVPReceiver::sendStreamEnd()
147 {
148   ULONG bufferLength = sizeof(ULONG) * 4;
149   UCHAR buffer[bufferLength];
150   *(ULONG*)&buffer[0] = htonl(2); // stream channel
151   *(ULONG*)&buffer[4] = htonl(streamID);
152   *(ULONG*)&buffer[8] = htonl(1); // stream end
153   *(ULONG*)&buffer[12] = htonl(0); // zero length, no more data
154   tcp->sendPacket(buffer, bufferLength);
155 }
156
157 ULONG MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
158 {
159 /*
160   pthread_mutex_lock(&processedRingLock);
161
162   int numTries = 0;
163
164   while ((unsigned long)processed.getContent() < amount)
165   {
166     pthread_mutex_unlock(&processedRingLock);
167     if (++numTries == 30) // 15s
168     {
169       logger->log("MVPReceiver", Log::DEBUG, "getBlock timeout");
170       return 0;
171     }
172     usleep(500000);
173     pthread_mutex_lock(&processedRingLock);
174   }
175
176   unsigned long amountReceived = processed.get(buffer, amount);
177   pthread_mutex_unlock(&processedRingLock);
178   return amountReceived;
179   */
180   sleep(10);
181   return 0;
182 }