]> git.vomp.tv Git - vompserver.git/blob - mvpreceiver.c
Live TV priority overriding
[vompserver.git] / mvpreceiver.c
1 #include "mvpreceiver.h"
2
3 MVPReceiver* MVPReceiver::create(cChannel* channel, int priority)
4 {
5   bool NeedsDetachReceivers;
6   cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
7
8   if (!device)
9   {
10     Log::getInstance()->log("MVPReceiver", Log::DEBUG, "No device found to receive this channel at this priority");
11     return NULL;
12   }
13
14 /*
15   if (NeedsDetachReceivers)
16   {
17     // can't really happen since we stream with priority zero. if a rec has pri zero maybe
18     Log::getInstance()->log("MVPReceiver", Log::DEBUG, "Needs detach receivers");
19     return NULL;
20   }
21 */
22
23   // What should we do if NeedsDetachReceivers is true?
24
25   MVPReceiver* m = new MVPReceiver(channel, device);
26   return m;
27 }
28
29 MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
30 #if VDRVERSNUM < 10300
31 : cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid())
32 #else
33 : cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
34 #endif
35 {
36   logger = Log::getInstance();
37   vdrActivated = false;
38   inittedOK = 0;
39   remuxer = NULL;
40   unprocessed = NULL;
41
42   // Init
43
44   // Get the remuxer for audio or video
45
46 #if VDRVERSNUM < 10300
47 //  if ((channel->Vpid() == 0) || (channel->Vpid() == 1) || (channel->Vpid() == 0x1FFF))
48 //  {
49 //    remuxer = new cTS2ESRemux(channel->Apid1());
50 //    logger->log("MVPReceiver", Log::DEBUG, "Created new < 1.3 TS->ES");
51 //  }
52 //  else
53 //  {
54     remuxer = new cTS2PSRemux(channel->Vpid(), channel->Apid1(), 0, 0, 0, 0);
55     logger->log("MVPReceiver", Log::DEBUG, "Created new < 1.3 TS->PS");
56 //  }
57 #else
58 //  if ((channel->Vpid() == 0) || (channel->Vpid() == 1) || (channel->Vpid() == 0x1FFF))
59 //  {
60 //    remuxer = new cTS2ESRemux(channel->Apid(0));
61 //    logger->log("MVPReceiver", Log::DEBUG, "Created new > 1.3 TS->ES");
62 //  }
63 //  else
64 //  {
65     remuxer = new cTS2PSRemux(channel->Vpid(), channel->Apid(0), 0, 0, 0, 0);
66     logger->log("MVPReceiver", Log::DEBUG, "Created new > 1.3 TS->PS");
67 //  }
68 #endif
69
70   unprocessed = new cRingBufferLinear(1000000, TS_SIZE * 2, false);
71
72   if (!processed.init(1000000)) return;
73   pthread_mutex_init(&processedRingLock, NULL);
74
75   if (!threadStart()) return;
76
77   // OK
78
79   inittedOK = 1;
80   device->SwitchChannel(channel, false);
81   device->AttachReceiver(this);
82 }
83
84 int MVPReceiver::init()
85 {
86   return inittedOK;
87 }
88
89 MVPReceiver::~MVPReceiver()
90 {
91   Detach();
92   if (threadIsActive()) threadCancel();
93   if (unprocessed) delete unprocessed;
94   if (remuxer) delete remuxer;
95 }
96
97 void MVPReceiver::Activate(bool on)
98 {
99   vdrActivated = on;
100   if (on) logger->log("MVPReceiver", Log::DEBUG, "VDR active");
101   else logger->log("MVPReceiver", Log::DEBUG, "VDR inactive");
102 }
103
104 bool MVPReceiver::isVdrActivated()
105 {
106   return vdrActivated;
107 }
108
109 void MVPReceiver::Receive(UCHAR* data, int length)
110 {
111   static int receiveCount = 0;
112
113 //  int p = unprocessed->Put(data, length);
114 //  if (p != length) printf("Buffer overrun\n");
115
116   unprocessed->Put(data, length);
117
118   if (++receiveCount == 15)
119   {
120     threadSignal();
121     receiveCount = 0;
122   }
123 }
124
125 void MVPReceiver::threadMethod()
126 {
127   int amountGot;
128   UCHAR* dataGot;
129
130   int remuxTook;
131   UCHAR* remuxedData;
132   int outputSize;
133
134   while(1)
135   {
136     threadWaitForSignal();
137
138     while(1)
139     {
140       dataGot = unprocessed->Get(amountGot);
141       if (dataGot && (amountGot > 0))
142       {
143         outputSize = 0;
144         remuxTook = amountGot;
145         remuxedData = remuxer->Process(dataGot, remuxTook, outputSize);
146         unprocessed->Del(remuxTook);
147
148         pthread_mutex_lock(&processedRingLock);
149         processed.put(remuxedData, outputSize);
150         pthread_mutex_unlock(&processedRingLock);
151
152 //        logger->log("MVPReceiver", Log::DEBUG, "Got from unprocessed: %i, Got from remux: %p %i, consumed: %i",
153 //               amountGot, remuxedData, outputSize, remuxTook);
154       }
155       else
156       {
157         break;
158       }
159     }
160   }
161 }
162
163 unsigned long MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
164 {
165   pthread_mutex_lock(&processedRingLock);
166
167   int numTries = 0;
168
169   while ((unsigned long)processed.getContent() < amount)
170   {
171     pthread_mutex_unlock(&processedRingLock);
172     if (++numTries == 10) // 5s
173     {
174       logger->log("MVPReceiver", Log::DEBUG, "getBlock timeout");
175       return 0;
176     }
177     usleep(500000);
178     pthread_mutex_lock(&processedRingLock);
179   }
180
181   unsigned long amountReceived = processed.get(buffer, amount);
182   pthread_mutex_unlock(&processedRingLock);
183   return amountReceived;
184 }