2 Copyright 2004-2005 Chris Tallon
4 This file is part of VOMP.
6 VOMP is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 VOMP is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with VOMP. If not, see <https://www.gnu.org/licenses/>.
22 #include "messagequeue.h"
27 static const char* TAG = "MessageQueue";
29 MessageQueue* MessageQueue::instance{};
31 MessageQueue::MessageQueue()
34 logger = LogNT::getInstance();
37 MessageQueue::~MessageQueue() { instance = NULL; }
39 MessageQueue* MessageQueue::getInstance() { return instance; }
41 void MessageQueue::postMessage(Message* m)
43 LogNT::getInstance()->debug(TAG, "PostMessage");
44 std::lock_guard<std::mutex> lg(messageQueueMutex); // Get the lock
45 messages.push_back(m);
46 LogNT::getInstance()->debug(TAG, "Pushed message. Notify...");
47 messageQueueCond.notify_one();
51 void MessageQueue::flushMessageQueue()
53 std::lock_guard<std::mutex> lg(messageQueueMutex); // Get the lock
55 for (auto m : messages) delete m;
59 bool MessageQueue::receiverExists(MessageReceiver* mr) // call with lock..
61 ReceiversI ri = std::find(receivers.begin(), receivers.end(), mr);
62 return (ri != receivers.end());
65 void MessageQueue::addReceiver(MessageReceiver* newMR)
67 std::lock_guard<std::mutex> lg(messageQueueMutex);
69 if (std::find(receivers.begin(), receivers.end(), newMR) == receivers.end())
71 logger->debug(TAG, "addReceiver: not found, adding {}", static_cast<void*>(newMR));
72 receivers.push_back(newMR);
76 void MessageQueue::removeReceiver(MessageReceiver* toRemove)
78 std::unique_lock<std::mutex> ul(messageQueueMutex);
80 // Ignore the issue of the pre-defined targets
81 // They outlive the message queue processing loop and therefore always exist
82 // So just check message->to
86 logger->debug(TAG, "Attempt remove receiver {}", static_cast<void*>(toRemove));
88 if (messageBeingProcessed && (messageBeingProcessed->to == toRemove))
90 // The message currently being processed by Control is with the receiver we're trying to remove
91 // Release mutex, delay and retry
92 logger->info(TAG, "Remove delay! Does this ever happen? {}", static_cast<void*>(toRemove));
95 std::this_thread::sleep_for(std::chrono::milliseconds(100));
100 ReceiversI toRemoveI = std::find(receivers.begin(), receivers.end(), toRemove);
101 if (toRemoveI == receivers.end())
103 logger->error(TAG, "Remove error 1 {}", static_cast<void*>(toRemove));
107 receivers.erase(toRemoveI);
108 logger->debug(TAG, "Removed receiver {}", static_cast<void*>(toRemove));
114 void MessageQueue::messageLoop()
116 std::unique_lock<std::mutex> lockWrapper(messageQueueMutex); // locks. unlocks on out-of-scope
118 while(messageLoopRun)
120 messageQueueCond.wait(lockWrapper, [&] { return !messageLoopRun || !messages.empty(); });
122 logger->debug(TAG, "woke");
124 if (!messageLoopRun) break;
126 while(!messages.empty())
128 messageBeingProcessed = messages.front();
129 messages.pop_front();
131 if (!messageBeingProcessed->p_to && !receiverExists(messageBeingProcessed->to))
133 // Receiver for this message has been deleted already
134 logger->debug(TAG, "Dropping message {} for non-existent receiver {}", static_cast<void*>(messageBeingProcessed), static_cast<void*>(messageBeingProcessed->to));
135 delete messageBeingProcessed;
136 messageBeingProcessed = NULL;
140 logger->debug(TAG, "Dispatching message {} to {}", static_cast<void*>(messageBeingProcessed), static_cast<void*>(messageBeingProcessed->to));
142 lockWrapper.unlock();
143 dispatchMessage(messageBeingProcessed);
146 delete messageBeingProcessed;
147 messageBeingProcessed = NULL;