From d87563378771768205d33a394950d8c928e68cb7 Mon Sep 17 00:00:00 2001 From: Chris Tallon Date: Fri, 3 Apr 2020 17:42:34 +0100 Subject: [PATCH] Convert VDR to std::thread --- osdvector.h | 2 ++ tcp.cc | 9 ++++-- tcp.h | 3 +- vdr.cc | 85 +++++++++++++++++++++++++++++++++++++++++------------ vdr.h | 11 ++++--- vwelcome.cc | 1 + 6 files changed, 85 insertions(+), 26 deletions(-) diff --git a/osdvector.h b/osdvector.h index a4c165b..a901cda 100644 --- a/osdvector.h +++ b/osdvector.h @@ -28,6 +28,8 @@ #include #include +#include "defines.h" +#include "threadsystem.h" #include "osd.h" #include "colour.h" #include "tvmedia.h" diff --git a/tcp.cc b/tcp.cc index 7ab755b..e230374 100644 --- a/tcp.cc +++ b/tcp.cc @@ -146,6 +146,11 @@ bool TCP::status() return connected; } +void TCP::abortCall() +{ + ::write(abortPipe[1], "X", 1); +} + bool TCP::write(void* src, ULONG numBytes) { if (!connected) return false; @@ -173,7 +178,7 @@ bool TCP::read(void* dst, ULONG numBytes) { if (abortCount == 5) { - logger->log("TCP", Log::ERR, "abortCount = 5"); + logger->log("TCP", Log::DEBUG, "abortCount = 5"); return false; } @@ -188,7 +193,7 @@ bool TCP::read(void* dst, ULONG numBytes) if (selectResult == -1) { shutdown(); return false; } if (selectResult == 0) return false; - if (FD_ISSET(abortPipe[0], &readfds)) return false; + if (FD_ISSET(abortPipe[0], &readfds)) { logger->log("TCP", Log::DEBUG, "aborting..."); return false; } int recvResult = recv(sockfd, pointer, numBytes - totalReceived, 0); if (recvResult == -1) { shutdown(); return false; } diff --git a/tcp.h b/tcp.h index 8a26df5..c149e74 100644 --- a/tcp.h +++ b/tcp.h @@ -37,7 +37,8 @@ class TCP ~TCP(); bool init(); // Must call this first, once on any new TCP object void shutdown(); // Optional. Closes connection. Can call connect() next - + void abortCall(); // causes a read/connect call to immediately abort + bool connect(const std::string& ip, USHORT port); bool read(void* dest, ULONG numBytes); bool write(void* src, ULONG numBytes); diff --git a/vdr.cc b/vdr.cc index fe9969b..d2d1dc0 100644 --- a/vdr.cc +++ b/vdr.cc @@ -17,7 +17,7 @@ along with VOMP. If not, see . */ -#include "vdr.h" +#include #include "recman.h" #include "recinfo.h" @@ -39,7 +39,8 @@ #include "seriesinfo.h" #include "osdvector.h" #include "tvmedia.h" -#include + +#include "vdr.h" #define VOMP_PROTOCOL_VERSION 0x00000500 @@ -166,21 +167,35 @@ int VDR::connect() channelNumberWidth = 1; tcp.shutdown(); - if (tcp.connect(serverIP, serverPort)) - { - connected = true; - threadStart(); - return 1; - } - else + if (!tcp.connect(serverIP, serverPort)) return 0; + + connected = true; + + threadStartProtect.lock(); + vdrThread = std::thread( [this] { - return 0; - } + threadStartProtect.lock(); + threadStartProtect.unlock(); + threadMethod(); + }); + threadStartProtect.unlock(); + + return 1; } void VDR::disconnect() { - threadCancel(); + logger->log("VDR", Log::DEBUG, "Disconnect start"); + + if (vdrThread.joinable()) + { + threadReqStop = true; + tcp.abortCall(); + vdrThread.join(); + threadReqStop = false; + logger->log("VDR", Log::DEBUG, "done thread stop"); + } + connected = false; logger->log("VDR", Log::DEBUG, "Disconnect"); } @@ -196,8 +211,6 @@ void VDR::threadMethod() { logger->log("VDR", Log::DEBUG, "VDR RUN"); - threadSetKillable(); // FIXME - change this to deal with the EDRs - ULONG channelID; ULONG requestID; @@ -216,9 +229,13 @@ void VDR::threadMethod() while(1) { + if (threadReqStop) return; + timeNow = time(NULL); - readSuccess = tcp.read(&channelID, sizeof(ULONG)); // 2s timeout atm + readSuccess = tcp.read(&channelID, sizeof(ULONG)); + + if (threadReqStop) return; if (!readSuccess) { @@ -240,6 +257,8 @@ void VDR::threadMethod() return; } lastKAsent = timeNow; + + if (threadReqStop) return; } } else @@ -261,8 +280,10 @@ void VDR::threadMethod() if (channelID == CHANNEL_REQUEST_RESPONSE) { if (!tcp.read(&requestID, sizeof(ULONG))) break; + if (threadReqStop) return; requestID = ntohl(requestID); if (!tcp.read(&userDataLength, sizeof(ULONG))) break; + if (threadReqStop) return; userDataLength = ntohl(userDataLength); if (userDataLength > 5000000) break; // how big can these packets get? userData = NULL; @@ -270,11 +291,22 @@ void VDR::threadMethod() { userData = malloc(userDataLength); if (!userData) break; - if (!tcp.read(userData, userDataLength)) break; + if (!tcp.read(userData, userDataLength)) + { + free(userData); + break; + } + + if (threadReqStop) + { + free(userData); + return; + } } vresp = new VDR_ResponsePacket(); vresp->setResponse(requestID, reinterpret_cast(userData), userDataLength); + // vresp now owns userData unless something calls vresp->getUserData() // logger->log("VDR", Log::DEBUG, "Rxd a response packet, requestID=%lu, len=%lu", requestID, userDataLength); if (!edFindAndCall(vresp)) // makes ED lock, find receiver for vresp (using ed_cb_find() ) and then call (using ed_cb_call() ) @@ -283,23 +315,37 @@ void VDR::threadMethod() // else, delete vresp here. delete vresp; } + if (threadReqStop) return; } else if (channelID == CHANNEL_STREAM || channelID == CHANNEL_TVMEDIA) { if (!tcp.read(&streamID, sizeof(ULONG))) break; + if (threadReqStop) return; streamID = ntohl(streamID); if (!tcp.read(&flag, sizeof(ULONG))) break; + if (threadReqStop) return; flag = ntohl(flag); if (!tcp.read(&userDataLength, sizeof(ULONG))) break; + if (threadReqStop) return; userDataLength = ntohl(userDataLength); userData = NULL; if (userDataLength > 0) { userData = malloc(userDataLength); if (!userData) break; - if (!tcp.read(userData, userDataLength)) break; + if (!tcp.read(userData, userDataLength)) + { + free(userData); + break; + } + + if (threadReqStop) + { + free(userData); + return; + } } vresp = new VDR_ResponsePacket(); @@ -312,11 +358,14 @@ void VDR::threadMethod() // else, delete vresp here. delete vresp; } + + if (threadReqStop) return; } else if (channelID == CHANNEL_KEEPALIVE) { ULONG KAreply = 0; if (!tcp.read(&KAreply, sizeof(ULONG))) break; + if (threadReqStop) return; KAreply = ntohl(KAreply); if (KAreply == lastKAsent) // successful KA response { @@ -330,8 +379,6 @@ void VDR::threadMethod() logger->log("VDR", Log::ERR, "Rxd a response packet on channel %lu !!", channelID); break; } - threadCheckExit(); - // Who deletes vresp? // If RR, the individual protocol functions must delete vresp. diff --git a/vdr.h b/vdr.h index c00ea11..168ab1a 100644 --- a/vdr.h +++ b/vdr.h @@ -29,8 +29,8 @@ #include #include #include - -#include "threadsystem.h" +#include +#include #include "defines.h" #include "rectimer.h" @@ -95,8 +95,7 @@ class VDR_PacketReceiver : public EDReceiver // implementation in vdr.cc class RecMan; -class VDR : public Thread_TYPE, -public EventDispatcher, +class VDR : public EventDispatcher, #ifdef VOMP_MEDIAPLAYER public MediaProvider, #endif @@ -241,6 +240,10 @@ public ExternLogger ULONG channelNumberWidth{1}; VDR_PacketReceiver* TEMP_SINGLE_VDR_PR; + std::mutex threadStartProtect; + std::thread vdrThread; + bool threadReqStop{}; + #ifdef VOMP_MEDIAPLAYER ULONG providerId; ULONG subRange; diff --git a/vwelcome.cc b/vwelcome.cc index efc1d2b..4cfaa24 100644 --- a/vwelcome.cc +++ b/vwelcome.cc @@ -125,6 +125,7 @@ VWelcome::VWelcome() info= new TVMediaInfo(); info->setStaticArtwork(sa_restart); + // NCONFIG #ifndef VOMP_HAS_EXIT sl.addOption(tr("7. Reboot"), reinterpret_cast(7), 0,info); #else -- 2.39.2