]> git.vomp.tv Git - vompserver.git/blob - mvpreceiver.c
Bug fixes to new protocol
[vompserver.git] / mvpreceiver.c
1 #include "mvpreceiver.h"
2
3 MVPReceiver* MVPReceiver::create(cChannel* channel, int priority)
4 {
5   bool NeedsDetachReceivers;
6   cDevice* device = cDevice::GetDevice(channel, priority, &NeedsDetachReceivers);
7
8   if (!device)
9   {
10     Log::getInstance()->log("MVPReceiver", Log::DEBUG, "No device found to receive this channel at this priority");
11     return NULL;
12   }
13
14   if (NeedsDetachReceivers)
15   {
16     Log::getInstance()->log("MVPReceiver", Log::DEBUG, "Needs detach receivers");
17
18     // Need to detach other receivers or VDR will shut down??
19   }
20
21   MVPReceiver* m = new MVPReceiver(channel, device);
22   return m;
23 }
24
25 MVPReceiver::MVPReceiver(cChannel* channel, cDevice* device)
26 #if VDRVERSNUM < 10300
27 : cReceiver(channel->Ca(), 0, 7, channel->Vpid(), channel->Ppid(), channel->Apid1(), channel->Apid2(), channel->Dpid1(), channel->Dpid2(), channel->Tpid())
28 #elif VDRVERSNUM < 10500
29 : cReceiver(channel->Ca(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
30 #else
31 : cReceiver(channel->GetChannelID(), 0, channel->Vpid(), channel->Apids(), channel->Dpids(), channel->Spids())
32 #endif
33 {
34   logger = Log::getInstance();
35   vdrActivated = false;
36   inittedOK = 0;
37   streamID = 0;
38   tcp = NULL;
39
40 //  logger->log("MVPReceiver", Log::DEBUG, "Channel has VPID %i APID %i", channel->Vpid(), channel->Apid(0));
41
42   if (!processed.init(1000000)) return;
43   pthread_mutex_init(&processedRingLock, NULL);
44
45   // OK
46
47   inittedOK = 1;
48   device->SwitchChannel(channel, false);
49   device->AttachReceiver(this);
50 }
51
52 int MVPReceiver::init(TCP* ttcp, ULONG tstreamID)
53 {
54   tcp = ttcp;
55   streamID = tstreamID;
56   return inittedOK;
57 }
58
59 MVPReceiver::~MVPReceiver()
60 {
61   Detach();
62   threadStop();
63 }
64
65 void MVPReceiver::Activate(bool on)
66 {
67   vdrActivated = on;
68   if (on) 
69   {
70     logger->log("MVPReceiver", Log::DEBUG, "VDR active");
71     threadStart();
72   }
73   else
74   {
75     logger->log("MVPReceiver", Log::DEBUG, "VDR inactive");
76     threadStop();
77   }
78 }
79
80 bool MVPReceiver::isVdrActivated()
81 {
82   return vdrActivated;
83 }
84
85 void MVPReceiver::Receive(UCHAR* data, int length)
86 {
87   pthread_mutex_lock(&processedRingLock);
88   processed.put(data, length);
89   if (processed.getContent() > streamChunkSize) threadSignal();
90   pthread_mutex_unlock(&processedRingLock);
91 }
92
93 void MVPReceiver::threadMethod()
94 {
95   UCHAR buffer[streamChunkSize + 12];
96   int amountReceived;
97
98 //   threadSetKillable(); ??
99
100   while(1)
101   {
102     threadWaitForSignal();
103     threadCheckExit();
104     
105     pthread_mutex_lock(&processedRingLock);
106     amountReceived = processed.get(buffer+12, streamChunkSize);
107     pthread_mutex_unlock(&processedRingLock);
108     
109     *(ULONG*)&buffer[0] = htonl(2); // stream channel
110     *(ULONG*)&buffer[4] = htonl(streamID);
111     *(ULONG*)&buffer[8] = htonl(amountReceived);
112     tcp->sendPacket(buffer, amountReceived + 12);
113   }  
114 }
115
116 ULONG MVPReceiver::getBlock(unsigned char* buffer, unsigned long amount)
117 {
118 /*
119   pthread_mutex_lock(&processedRingLock);
120
121   int numTries = 0;
122
123   while ((unsigned long)processed.getContent() < amount)
124   {
125     pthread_mutex_unlock(&processedRingLock);
126     if (++numTries == 30) // 15s
127     {
128       logger->log("MVPReceiver", Log::DEBUG, "getBlock timeout");
129       return 0;
130     }
131     usleep(500000);
132     pthread_mutex_lock(&processedRingLock);
133   }
134
135   unsigned long amountReceived = processed.get(buffer, amount);
136   pthread_mutex_unlock(&processedRingLock);
137   return amountReceived;
138   */
139   sleep(10);
140   return 0;
141 }