]> git.vomp.tv Git - vompserver.git/blob - mvpreceiver.c
15 years that line of code has been waiting to crash
[vompserver.git] / mvpreceiver.c
1 #include "mvpreceiver.h"
2
3 int MVPReceiver::numMVPReceivers = 0;
4
5 MVPReceiver* MVPReceiver::create(const cChannel* channel, int priority)
6 {
7 #if VDRVERSNUM < 10500
8   bool NeedsDetachReceivers;
9   cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
10 #else
11   cDevice* device = cDevice::GetDevice(channel, priority, true); // last param is live-view
12 #endif
13
14   if (!device)
15   {
16     Log::getInstance()->log("MVPReceiver", Log::INFO, "No device found to receive this channel at this priority");
17     return NULL;
18   }
19
20 #if VDRVERSNUM < 10500
21   if (NeedsDetachReceivers)
22   {
23     Log::getInstance()->log("MVPReceiver", Log::WARN, "Needs detach receivers");
24
25     // Need to detach other receivers or VDR will shut down??
26   }
27 #endif
28
29   MVPReceiver* m = new MVPReceiver(channel, device);
30
31   numMVPReceivers++;
32   Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now up to %i", numMVPReceivers);
33   
34   return m;
35 }
36
37 MVPReceiver::MVPReceiver(const cChannel* channel, cDevice* device)
38 #if VDRVERSNUM < 10300
39 : cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid())
40 #elif VDRVERSNUM < 10500
41 : cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), mergeSpidsTpid(channel->Spids(),channel->Tpid()))
42 #elif VDRVERSNUM < 10712
43 : cReceiver(channel->GetChannelID(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), mergeSpidsTpid(channel->Spids(),channel->Tpid()))
44 #else
45 : cReceiver(channel, 0)
46 #endif
47 {
48   logger = Log::getInstance();
49   vdrActivated = false;
50   inittedOK = 0;
51   streamID = 0;
52   tcp = NULL;
53
54 #if VDRVERSNUM >= 10712
55   AddPid(channel->Tpid()); 
56 #endif
57
58 //  logger->log("MVPReceiver", Log::DEBUG, "Channel has VPID %i APID %i", channel->Vpid(), channel->Apid(0));
59
60   if (!processed.init(6000000)) return; // Ringbuffer increased for better performance 
61   pthread_mutex_init(&processedRingLock, NULL);
62
63   // OK
64   
65   // Detect whether this is video or radio and set an appropriate stream chunk size
66   // 50k for video, 5k for radio
67   // Perhaps move this client side?
68   if (channel->Vpid()) streamChunkSize = 50000;
69   else streamChunkSize = 5000;
70   
71   inittedOK = 1;
72   device->SwitchChannel(channel, false);
73   device->AttachReceiver(this);
74 }
75
76 int MVPReceiver::init(TCP* ttcp, ULONG tstreamID)
77 {
78   tcp = ttcp;
79   streamID = tstreamID;
80   return inittedOK;
81 }
82
83 MVPReceiver::~MVPReceiver()
84 {
85   numMVPReceivers--;
86   Log::getInstance()->log("MVPReceiver", Log::DEBUG, "num mvp receivers now down to %i", numMVPReceivers);
87 }
88
89 void MVPReceiver::Activate(bool on)
90 {
91   vdrActivated = on;
92   if (on) 
93   {
94     logger->log("MVPReceiver", Log::DEBUG, "VDR active");
95     threadStart();
96   }
97   else
98   {
99     logger->log("MVPReceiver", Log::DEBUG, "VDR inactive, sending stream end message");
100     threadStop();
101     sendStreamEnd();
102   }
103 }
104
105 bool MVPReceiver::isVdrActivated()
106 {
107   return vdrActivated;
108 }
109
110 void MVPReceiver::detachMVPReceiver()
111 {
112   threadStop();
113   Detach();
114 }
115
116
117 void MVPReceiver::Receive(UCHAR* data, int length)
118 {
119   pthread_mutex_lock(&processedRingLock);
120   processed.put(data, length);
121   if (processed.getContent() > streamChunkSize) threadSignal();
122   pthread_mutex_unlock(&processedRingLock);
123 }
124 void MVPReceiver::Receive(const UCHAR* data, int length)
125 {
126   pthread_mutex_lock(&processedRingLock);
127   processed.put(data, length);
128   if (processed.getContent() > streamChunkSize) threadSignal();
129   pthread_mutex_unlock(&processedRingLock);
130 }
131
132
133 void MVPReceiver::threadMethod()
134 {
135   ULONG *p;
136   ULONG headerLength = sizeof(ULONG) * 4;
137   UCHAR buffer[streamChunkSize + headerLength];
138   int amountReceived;
139
140 //   threadSetKillable(); ??
141
142   while(1)
143   {
144     threadLock();
145     threadWaitForSignal();
146     threadUnlock();
147     threadCheckExit();
148     
149     do
150     {
151       pthread_mutex_lock(&processedRingLock);
152       amountReceived = processed.get(buffer+headerLength, streamChunkSize);
153       pthread_mutex_unlock(&processedRingLock);
154     
155       p = (ULONG*)&buffer[0]; *p = htonl(2); // stream channel
156       p = (ULONG*)&buffer[4]; *p = htonl(streamID);
157       p = (ULONG*)&buffer[8]; *p = htonl(0); // here insert flag: 0 = ok, data follows
158       p = (ULONG*)&buffer[12]; *p = htonl(amountReceived);
159
160       tcp->sendPacket(buffer, amountReceived + headerLength);
161     } while(processed.getContent() >= streamChunkSize);
162   }  
163 }
164
165 void MVPReceiver::sendStreamEnd()
166 {
167   ULONG *p;
168   ULONG bufferLength = sizeof(ULONG) * 4;
169   UCHAR buffer[bufferLength];
170   p = (ULONG*)&buffer[0]; *p = htonl(2); // stream channel
171   p = (ULONG*)&buffer[4]; *p = htonl(streamID);
172   p = (ULONG*)&buffer[8]; *p = htonl(1); // stream end
173   p = (ULONG*)&buffer[12]; *p = htonl(0); // zero length, no more data
174   tcp->sendPacket(buffer, bufferLength);
175 }
176
177
178 int *MVPReceiver::mergeSpidsTpid(const int *spids,int tpid)
179 {
180   int *destpids;
181   const int *runspid=spids;
182   for (runspid=spids,destpids=mergedSpidsTpid;*runspid;runspid++,destpids++) {
183        *destpids=*runspid;
184   }
185   *destpids=tpid;
186   destpids++;
187   *destpids=0;
188   return mergedSpidsTpid;
189 }
190