]> git.vomp.tv Git - vompclient.git/commitdiff
Convert VDR to std::thread
authorChris Tallon <chris@vomp.tv>
Fri, 3 Apr 2020 16:42:34 +0000 (17:42 +0100)
committerChris Tallon <chris@vomp.tv>
Fri, 3 Apr 2020 16:42:34 +0000 (17:42 +0100)
osdvector.h
tcp.cc
tcp.h
vdr.cc
vdr.h
vwelcome.cc

index a4c165b259847071c775c7b07134351db95e2882..a901cda6df39c852a7db09fee56a8c4281627c56 100644 (file)
@@ -28,6 +28,8 @@
 #include <string>
 #include <mutex>
 
+#include "defines.h"
+#include "threadsystem.h"
 #include "osd.h"
 #include "colour.h"
 #include "tvmedia.h"
diff --git a/tcp.cc b/tcp.cc
index 7ab755bc0282d381ac3a01b48fcb0c056f20977c..e23037428b06f4e37732c95a41b21291f6aa114f 100644 (file)
--- a/tcp.cc
+++ b/tcp.cc
@@ -146,6 +146,11 @@ bool TCP::status()
   return connected;
 }
 
+void TCP::abortCall()
+{
+  ::write(abortPipe[1], "X", 1);
+}
+
 bool TCP::write(void* src, ULONG numBytes)
 {
   if (!connected) return false;
@@ -173,7 +178,7 @@ bool TCP::read(void* dst, ULONG numBytes)
   {
     if (abortCount == 5)
     {
-      logger->log("TCP", Log::ERR, "abortCount = 5");
+      logger->log("TCP", Log::DEBUG, "abortCount = 5");
       return false;
     }
     
@@ -188,7 +193,7 @@ bool TCP::read(void* dst, ULONG numBytes)
 
     if (selectResult == -1) { shutdown(); return false; }
     if (selectResult == 0) return false;
-    if (FD_ISSET(abortPipe[0], &readfds)) return false;
+    if (FD_ISSET(abortPipe[0], &readfds)) { logger->log("TCP", Log::DEBUG, "aborting..."); return false; }
 
     int recvResult = recv(sockfd, pointer, numBytes - totalReceived, 0);
     if (recvResult == -1) { shutdown(); return false; }
diff --git a/tcp.h b/tcp.h
index 8a26df5c22920454f686e347544d4bfb692505bd..c149e7471e1fd0e77b58f23cce66df69e7269678 100644 (file)
--- a/tcp.h
+++ b/tcp.h
@@ -37,7 +37,8 @@ class TCP
     ~TCP();
     bool init(); // Must call this first, once on any new TCP object
     void shutdown(); // Optional. Closes connection. Can call connect() next
-    
+    void abortCall(); // causes a read/connect call to immediately abort
+
     bool connect(const std::string& ip, USHORT port);
     bool read(void* dest, ULONG numBytes);
     bool write(void* src, ULONG numBytes);
diff --git a/vdr.cc b/vdr.cc
index fe9969b4b792dccb0e4f1f219230944d7a38f415..d2d1dc085ea62e29c39b79a3a63573b331b4cc75 100644 (file)
--- a/vdr.cc
+++ b/vdr.cc
@@ -17,7 +17,7 @@
     along with VOMP.  If not, see <https://www.gnu.org/licenses/>.
 */
 
-#include "vdr.h"
+#include <climits>
 
 #include "recman.h"
 #include "recinfo.h"
@@ -39,7 +39,8 @@
 #include "seriesinfo.h"
 #include "osdvector.h"
 #include "tvmedia.h"
-#include <climits>
+
+#include "vdr.h"
 
 #define VOMP_PROTOCOL_VERSION 0x00000500
 
@@ -166,21 +167,35 @@ int VDR::connect()
   channelNumberWidth = 1;
 
   tcp.shutdown();
-  if (tcp.connect(serverIP, serverPort))
-  {
-    connected = true;
-    threadStart();
-    return 1;
-  }
-  else
+  if (!tcp.connect(serverIP, serverPort)) return 0;
+
+  connected = true;
+
+  threadStartProtect.lock();
+  vdrThread = std::thread( [this]
   {
-    return 0;
-  }
+    threadStartProtect.lock();
+    threadStartProtect.unlock();
+    threadMethod();
+  });
+  threadStartProtect.unlock();
+
+  return 1;
 }
 
 void VDR::disconnect()
 {
-  threadCancel();
+  logger->log("VDR", Log::DEBUG, "Disconnect start");
+
+  if (vdrThread.joinable())
+  {
+    threadReqStop = true;
+    tcp.abortCall();
+    vdrThread.join();
+    threadReqStop = false;
+    logger->log("VDR", Log::DEBUG, "done thread stop");
+  }
+
   connected = false;
   logger->log("VDR", Log::DEBUG, "Disconnect");
 }
@@ -196,8 +211,6 @@ void VDR::threadMethod()
 {
   logger->log("VDR", Log::DEBUG, "VDR RUN");  
 
-  threadSetKillable(); // FIXME - change this to deal with the EDRs
-  
   ULONG channelID;
   
   ULONG requestID;
@@ -216,9 +229,13 @@ void VDR::threadMethod()
 
   while(1)
   {
+    if (threadReqStop) return;
+
     timeNow = time(NULL);
     
-    readSuccess = tcp.read(&channelID, sizeof(ULONG));  // 2s timeout atm
+    readSuccess = tcp.read(&channelID, sizeof(ULONG));
+
+    if (threadReqStop) return;
 
     if (!readSuccess)
     {
@@ -240,6 +257,8 @@ void VDR::threadMethod()
           return;
         }
         lastKAsent = timeNow;
+
+        if (threadReqStop) return;
       }
     }
     else
@@ -261,8 +280,10 @@ void VDR::threadMethod()
     if (channelID == CHANNEL_REQUEST_RESPONSE)
     {
       if (!tcp.read(&requestID, sizeof(ULONG))) break;
+      if (threadReqStop) return;
       requestID = ntohl(requestID);
       if (!tcp.read(&userDataLength, sizeof(ULONG))) break;
+      if (threadReqStop) return;
       userDataLength = ntohl(userDataLength);
       if (userDataLength > 5000000) break; // how big can these packets get?
       userData = NULL;
@@ -270,11 +291,22 @@ void VDR::threadMethod()
       {
         userData = malloc(userDataLength);
         if (!userData) break;
-        if (!tcp.read(userData, userDataLength)) break;
+        if (!tcp.read(userData, userDataLength))
+        {
+          free(userData);
+          break;
+        }
+
+        if (threadReqStop)
+        {
+          free(userData);
+          return;
+        }
       }
 
       vresp = new VDR_ResponsePacket();  
       vresp->setResponse(requestID, reinterpret_cast<UCHAR*>(userData), userDataLength);
+      // vresp now owns userData unless something calls vresp->getUserData()
 //      logger->log("VDR", Log::DEBUG, "Rxd a response packet, requestID=%lu, len=%lu", requestID, userDataLength);
 
       if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() )
@@ -283,23 +315,37 @@ void VDR::threadMethod()
         // else, delete vresp here.
         delete vresp;
       }
+      if (threadReqStop) return;
     }
     else if (channelID == CHANNEL_STREAM || channelID == CHANNEL_TVMEDIA)
     {
       if (!tcp.read(&streamID, sizeof(ULONG))) break;
+      if (threadReqStop) return;
       streamID = ntohl(streamID);
 
       if (!tcp.read(&flag, sizeof(ULONG))) break;
+      if (threadReqStop) return;
       flag = ntohl(flag);
 
       if (!tcp.read(&userDataLength, sizeof(ULONG))) break;
+      if (threadReqStop) return;
       userDataLength = ntohl(userDataLength);
       userData = NULL;
       if (userDataLength > 0)
       {
         userData = malloc(userDataLength);
         if (!userData) break;
-        if (!tcp.read(userData, userDataLength)) break;
+        if (!tcp.read(userData, userDataLength))
+        {
+          free(userData);
+          break;
+        }
+
+        if (threadReqStop)
+        {
+          free(userData);
+          return;
+        }
       }
 
       vresp = new VDR_ResponsePacket();    
@@ -312,11 +358,14 @@ void VDR::threadMethod()
         // else, delete vresp here.
         delete vresp;
       }
+
+      if (threadReqStop) return;
     }
     else if (channelID == CHANNEL_KEEPALIVE)
     {
       ULONG KAreply = 0;
       if (!tcp.read(&KAreply, sizeof(ULONG))) break;
+      if (threadReqStop) return;
       KAreply = ntohl(KAreply);
       if (KAreply == lastKAsent) // successful KA response
       {
@@ -330,8 +379,6 @@ void VDR::threadMethod()
       logger->log("VDR", Log::ERR, "Rxd a response packet on channel %lu !!", channelID);
       break;
     }
-    threadCheckExit();
-
 
     // Who deletes vresp?
     // If RR, the individual protocol functions must delete vresp.
diff --git a/vdr.h b/vdr.h
index c00ea11fbf36bab641804bf70b2bc9d85d93262f..168ab1ada3c3c824a2132dcbc344c43cd0ffdcf8 100644 (file)
--- a/vdr.h
+++ b/vdr.h
@@ -29,8 +29,8 @@
 #include <time.h>
 #include <vector>
 #include <algorithm>
-
-#include "threadsystem.h"
+#include <thread>
+#include <mutex>
 
 #include "defines.h"
 #include "rectimer.h"
@@ -95,8 +95,7 @@ class VDR_PacketReceiver : public EDReceiver // implementation in vdr.cc
 
 class RecMan;
 
-class VDR : public Thread_TYPE,
-public EventDispatcher,
+class VDR : public EventDispatcher,
 #ifdef VOMP_MEDIAPLAYER
 public MediaProvider,
 #endif
@@ -241,6 +240,10 @@ public ExternLogger
     ULONG channelNumberWidth{1};
     VDR_PacketReceiver* TEMP_SINGLE_VDR_PR;
 
+    std::mutex threadStartProtect;
+    std::thread vdrThread;
+    bool threadReqStop{};
+
 #ifdef VOMP_MEDIAPLAYER
     ULONG providerId;
     ULONG subRange;
index efc1d2b9961bb34590b6367e071e6383c7ebf8d6..4cfaa24e3d8212f1d3d4e5c20592b04eab0b2bb3 100644 (file)
@@ -125,6 +125,7 @@ VWelcome::VWelcome()
 
   info= new TVMediaInfo();
   info->setStaticArtwork(sa_restart);
+  // NCONFIG
 #ifndef VOMP_HAS_EXIT
   sl.addOption(tr("7. Reboot"), reinterpret_cast<void*>(7), 0,info);
 #else