]> git.vomp.tv Git - vompclient.git/blob - src/playerradiolive.cc
Type change: UCHAR -> u1
[vompclient.git] / src / playerradiolive.cc
1 /*
2     Copyright 2008-2020 Chris Tallon
3
4     This file is part of VOMP.
5
6     VOMP is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     VOMP is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with VOMP.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include <stdlib.h>
21 #ifndef WIN32
22 #include <sys/time.h>
23 #endif
24 #include <time.h>
25
26 #include "defines.h"
27 #include "log.h"
28 #include "audio.h"
29 #include "demuxerts.h"
30 #include "vdr.h"
31 #include "messagequeue.h"
32 #include "input.h"
33 #include "message.h"
34 #include "channel.h"
35 #include "video.h"
36
37 #include "playerradiolive.h"
38
39 static const char* TAG = "PlayerRadioLive";
40
41 // ----------------------------------- Called from outside, one offs or info funcs
42
43 PlayerRadioLive::PlayerRadioLive(MessageQueue* tmessageQueue, MessageReceiver* tmessageReceiver, std::shared_ptr<ChannelList> tchanList)
44 : messageQueue(tmessageQueue), messageReceiver(tmessageReceiver), afeed(this), chanList(tchanList)
45 {
46   audio = Audio::getInstance();
47   logger = LogNT::getInstance();
48   vdr = VDR::getInstance();
49
50   Video::getInstance()->turnVideoOff();
51 }
52
53 PlayerRadioLive::~PlayerRadioLive()
54 {
55   if (initted) shutdown();
56 }
57
58 int PlayerRadioLive::init()
59 {
60   if (initted) return 0;
61
62   demuxer = new DemuxerTS();
63   if (!demuxer) return 0;
64  
65   if (!demuxer->init(this, audio, NULL, NULL, 0, 200000, 0))
66   {
67     logger->error(TAG, "Demuxer failed to init");
68     shutdown();
69     return 0;
70   }
71
72   audio->stop();
73
74   initted = true;
75   return 1;
76 }
77
78 int PlayerRadioLive::shutdown()
79 {
80   if (!initted) return 0;
81   if (state != S_STOP) // FIXME check when this is called and how. This is not thread-sync bullet proof as-is.
82   {
83     logger->debug(TAG, "state is not stop, calling");
84     stop();
85   }
86   initted = false;
87   delete demuxer;
88   return 1;
89 }
90
91 bool* PlayerRadioLive::getDemuxerMpegAudioChannels()
92 {
93   return demuxer->getmpAudioChannels();
94 }
95
96 bool* PlayerRadioLive::getDemuxerAc3AudioChannels()
97 {
98   return demuxer->getac3AudioChannels();
99 }
100
101 int PlayerRadioLive::getCurrentAudioChannel()
102 {
103   return demuxer->getAID();
104 }
105
106 int* PlayerRadioLive::getTeletxtSubtitlePages()
107 {
108   return NULL;
109 }
110
111 int PlayerRadioLive::getCurrentSubtitleChannel()
112 {
113   return demuxer->getSubID();
114 }
115
116 void PlayerRadioLive::setAudioChannel(int newChannel, int type, int streamtype)
117 {
118   demuxer->setAID(newChannel, type, streamtype, true);
119 }
120
121 void PlayerRadioLive::setSubtitleChannel(int newChannel)
122 {
123   demuxer->setSubID(newChannel);
124 }
125
126 // ----------------------------------- Externally called events
127
128 void PlayerRadioLive::go(u4 index)
129 {
130   playerThreadMutex.lock();
131
132   struct PLInstruction i;
133   i.instruction = I_SETCHANNEL;
134   i.channelIndex = index;
135   instructions.push(i);
136
137   playerThread = std::thread([this]
138   {
139     playerThreadMutex.lock();
140     playerThreadMutex.unlock();
141     threadMethod();
142   });
143   playerThreadMutex.unlock();
144 }
145
146 void PlayerRadioLive::setChannel(u4 index)
147 {
148   logger->debug(TAG, "setChannel");
149   struct PLInstruction i;
150   i.instruction = I_SETCHANNEL;
151   i.channelIndex = index;
152   instructions.push(i);  
153   logger->debug(TAG, "posted setChannel instruction, now {} in queue", instructions.size());
154   playerThreadCond.notify_one();
155 }
156
157 void PlayerRadioLive::stop()
158 {
159   logger->debug(TAG, "stop");
160   playerThreadMutex.lock();
161   struct PLInstruction i;
162   i.instruction = I_STOP;
163   instructions.push(i);
164   playerThreadCond.notify_one();
165   playerThreadMutex.unlock();
166   playerThread.join();
167   logger->debug(TAG, "stop successful");
168 }
169
170 // ----------------------------------- Callback
171
172 void PlayerRadioLive::call(void* /*caller*/)
173 {
174 }
175
176 // -----------------------------------
177
178 void PlayerRadioLive::streamReceive(u4 flag, void* data, u4 len)
179 {
180   // Flag:
181   // 0 = normal stream packet
182   // 1 = stream end
183   // 2 = connection lost
184
185   if (flag == 1)
186   {
187     if (data) abort();
188     
189     Message* m = new Message();
190     m->from = this;
191     m->to = messageReceiver;
192     m->message = Message::PLAYER_EVENT;
193     m->parameter = PlayerRadioLive::STREAM_END;
194     messageQueue->postMessage(m);
195   }
196   
197   if (streamChunks.size() < 11)
198   {
199     StreamChunk s;
200     s.data = data;
201     s.len = len;
202     streamChunks.push(s);
203     playerThreadCond.notify_one();
204   }
205   else
206   {
207     // Too many chunks in streamChunks, drop this chunk
208     free(data);
209     logger->warn(TAG, "Dropped chunk");
210   }
211 }
212
213 void PlayerRadioLive::clearStreamChunks()
214 {
215   while(streamChunks.size())
216   {
217     logger->debug(TAG, "Dropping chunk from old stream");
218     struct StreamChunk s = streamChunks.front();
219     streamChunks.pop();
220     free(s.data);
221   }
222 }
223
224 void PlayerRadioLive::chunkToDemuxer()
225 {
226   StreamChunk s = streamChunks.front();
227   streamChunks.pop();
228   //logger->debug(TAG, "About to call demuxer with {} {}", (void*)s.data, s.len);
229   /*int a =*/ demuxer->put(static_cast<u1*>(s.data), s.len);
230   //logger->debug(TAG, "put {} to demuxer", a);
231   free(s.data);  
232 }
233
234 void PlayerRadioLive::switchState(u1 newState)
235 {
236   logger->debug(TAG, "Switch from state {} to state {}", state, newState);
237
238   switch(state)
239   {
240     case S_STOP:   // FROM S_STOP
241     {
242       switch(newState)
243       {
244         case S_PREBUFFERING:
245         {
246           audio->stop();
247           audio->unPause();
248           audio->reset();
249           audio->setStreamType(Audio::MPEG2_PES);
250           audio->systemMuteOff();      
251           audio->doMuting();              
252           audio->play();
253           audio->pause();
254           demuxer->reset();
255           afeed.start();
256           
257           state = newState;
258           preBufferCount = 0;
259           return;
260         }
261         default:
262         {
263           logger->crit(TAG, "Thread called state {} to state {} which is not supported", state, newState);
264           abort();
265           break;
266         }
267       }
268     }
269
270     case S_PREBUFFERING:    // FROM S_PREBUFFERING
271     {
272       switch(newState)
273       {
274         case S_PLAY:
275         {
276           audio->unPause();
277           state = newState;
278           return;
279         }
280         case S_STOP:
281         {
282           vdr->stopStreaming();
283           clearStreamChunks();
284           afeed.stop();
285           audio->stop();
286           audio->reset();
287           state = newState;
288           return;        
289         }
290         case S_PREBUFFERING:
291         {
292           vdr->stopStreaming();
293           clearStreamChunks();
294           afeed.stop();
295           audio->stop();
296           audio->reset();
297           audio->play();
298           audio->pause();
299           demuxer->reset();
300           afeed.start();
301
302           state = newState;
303           preBufferCount = 0;
304           return;        
305         }
306         default:
307         {
308           logger->crit(TAG, "Thread called state {} to state {} which is not supported", state, newState);
309           abort();
310           break;
311         }        
312       }
313     }
314     
315     case S_PLAY:     // FROM S_PLAY
316     {
317       switch(newState)
318       {
319         case S_STOP:
320         { 
321           vdr->stopStreaming();
322           clearStreamChunks();
323           afeed.stop();
324           audio->stop();
325           audio->reset();
326           state = newState;
327           return;
328         }
329         case S_PREBUFFERING: // IS THIS HOW IT WORKS?
330         {
331           vdr->stopStreaming();
332           clearStreamChunks();
333           afeed.stop();
334           audio->stop();
335           audio->reset();
336           audio->play();
337           audio->pause();
338           demuxer->reset();
339           afeed.start();
340
341           state = newState;
342           preBufferCount = 0;
343           return;
344         }
345         default:
346         {
347           logger->crit(TAG, "Thread called state {} to state {} which is not supported", state, newState);
348           abort();
349           break;
350         }        
351       }
352     }    
353   }  
354 }
355
356 bool PlayerRadioLive::checkError()
357 {
358   if (!vdr->isConnected())
359   {
360     switchState(S_STOP);
361     
362     Message* m = new Message();
363     m->from = this;
364     m->to = messageReceiver;
365     m->message = Message::PLAYER_EVENT;
366     m->parameter = PlayerRadioLive::CONNECTION_LOST;
367     messageQueue->postMessage(m);
368     
369     return true;
370   }   
371   return false;
372 }
373
374 void PlayerRadioLive::optimizeInstructionQueue()
375 {
376   // Walk the list
377   
378   // Currently there are only 2 instruction types, so this is a bit overkill...
379
380   struct PLInstruction i;
381   while(instructions.size() > 1)
382   {
383     i = instructions.front();
384     if (i.instruction == I_SETCHANNEL)
385     {
386       instructions.pop();  // if this is the first of more than 1 command, currently it cannot possibly be relevant
387     }
388     else if (i.instruction == I_STOP)
389     {
390       return; // return here and ensure the next instruction will be stop
391     }
392   }
393 }
394
395 void PlayerRadioLive::threadMethod()
396 {
397   std::unique_lock<std::mutex> ul(playerThreadMutex, std::defer_lock);
398
399   while(1)
400   {
401     while(!instructions.empty())
402     {
403       if (instructions.size() > 1)
404       {
405         logger->debug(TAG, "Should optimise");
406         optimizeInstructionQueue();
407       }
408
409       struct PLInstruction i = instructions.front();
410       instructions.pop();
411     
412       if (i.instruction == I_SETCHANNEL)
413       {
414         logger->debug(TAG, "start new stream");
415
416         switchState(S_PREBUFFERING);
417
418         if (!checkError())
419         {
420           Channel* chan = (*chanList)[i.channelIndex];
421           chan->loadPids();
422
423           bool found=false;
424
425           if (chan->numAPids > 0) 
426           {
427             u4 j = 0;
428             while (j < chan->numAPids && !found)
429             {
430               if (Audio::getInstance()->streamTypeSupported(chan->apids[j].type))
431               {
432                 demuxer->setAID(chan->apids[j].pid, 0, chan->apids[j].type, true);
433                 audio->setStreamType(Audio::MPEG2_PES);
434                 logger->debug(TAG, "Demuxer pids: {} {} {}", chan->vpid, chan->apids[j].pid, chan->apids[j].type);
435                 found = true;
436               }
437               j++;
438             }
439           }
440
441           if (!found)
442           {
443             if (chan->numDPids > 0 && audio->maysupportAc3())
444             {
445               u4 j = 0;
446               while (j < chan->numDPids && !found)
447               {
448                 if (Audio::getInstance()->streamTypeSupported(chan->dpids[j].type))
449                 {
450                   demuxer->setAID(chan->dpids[j].pid, 1, chan->dpids[j].type, true);
451                   audio->setStreamType(Audio::MPEG2_PES);
452                   logger->debug(TAG, "Demuxer pids: {} {} (ac3) {}", chan->vpid, chan->dpids[j].pid, chan->dpids[j].type);
453                   found=true;
454                 }
455                 j++;
456               }
457             }
458             else
459             {
460               logger->warn(TAG, "Demuxer no pids!");
461             }
462           }
463
464           int streamSuccess = vdr->streamChannel(chan->number, this);
465           if (!checkError() && !streamSuccess)
466           {      
467             Message* m = new Message();
468             m->from = this;
469             m->to = messageReceiver;
470             m->message = Message::PLAYER_EVENT;
471             m->parameter = PlayerRadioLive::STREAM_END;
472             messageQueue->postMessage(m);
473           }
474         }
475       }
476       else if (i.instruction == I_STOP)
477       {
478         logger->debug(TAG, "Stopping");
479         switchState(S_STOP);
480         checkError();
481         return;
482       }
483     }
484
485     while(streamChunks.size())
486     {
487       chunkToDemuxer();
488
489       if (state == S_PREBUFFERING)
490       {
491         ++preBufferCount;
492         u4 percentDone = (preBufferCount * 100) / preBufferAmount;
493
494         logger->debug(TAG, "Prebuffering {}%", percentDone);
495         
496         Message* m = new Message();
497         m->from = this;
498         m->to = messageReceiver;
499         m->message = Message::PLAYER_EVENT;
500         m->parameter = PlayerRadioLive::PREBUFFERING;
501         m->tag = percentDone;
502         messageQueue->postMessage(m);
503
504         if (preBufferCount == preBufferAmount)
505         {
506           switchState(S_PLAY);
507           checkError();
508         }
509       }
510     }
511     
512     ul.lock();
513     if (!instructions.empty()) { ul.unlock(); continue; }
514     playerThreadCond.wait(ul);
515     ul.unlock();
516   }
517
518   logger->debug(TAG, "End of thread");
519 }