mavtables  0.2.1
MAVLink router and firewall.
PacketQueue.cpp
Go to the documentation of this file.
1 // MAVLink router and firewall.
2 // Copyright (C) 2018 Michael R. Shannon <mrshannon.aerospace@gmail.com>
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 
17 
18 #include <condition_variable>
19 #include <memory>
20 #include <mutex>
21 #include <optional>
22 #include <queue>
23 #include <stdexcept>
24 #include <utility>
25 
26 #include "Packet.hpp"
27 #include "PacketQueue.hpp"
28 #include "QueuedPacket.hpp"
29 
30 
31 /** Get packet from queue.
32  *
33  * \note This is an internal method and thus the internal mutex must be locked
34  * before calling.
35  *
36  * \returns The next packet or nullptr if the queue has been closed or is
37  * empty.
38  */
39 std::shared_ptr<const Packet> PacketQueue::get_packet_()
40 {
41  if (running_ && !queue_.empty())
42  {
43  std::shared_ptr<const Packet> packet = queue_.top().packet();
44  queue_.pop();
45  return packet;
46  }
47 
48  return nullptr;
49 }
50 
51 
52 /** Construct a packet queue.
53  *
54  * \param callback A function to call whenever a new packet is added to the
55  * queue. This allows the queue to signal when it has become non empty.
56  * The default is no callback {}.
57  */
58 PacketQueue::PacketQueue(std::optional<std::function<void(void)>> callback)
59  : callback_(std::move(callback)), ticket_(0), running_(true)
60 {
61 }
62 
63 
64 /** Close the queue.
65  *
66  * This will release any blocking calls to \ref pop.
67  * \remarks
68  * Threadsafe (locking).
69  */
71 {
72  {
73  std::lock_guard<std::mutex> lock(mutex_);
74  running_ = false;
75  }
76  cv_.notify_all();
77 }
78 
79 
80 /** Determine if the packet queue is empty or not.
81  *
82  * retval true There are no packets in the queue.
83  * retval false There is at least one packet in the queue.
84  * \remarks
85  * Threadsafe (locking).
86  */
88 {
89  std::lock_guard<std::mutex> lock(mutex_);
90  return queue_.empty();
91 }
92 
93 
94 /** Remove and return the packet at the front of the queue.
95  *
96  * This version will block on an empty queue and will not return until the
97  * queue becomes non empty or is closed with \ref close.
98  *
99  * \returns The packet that was at the front of the queue, or nullptr if the
100  * queue was closed.
101  * \remarks
102  * Threadsafe (locking).
103  * \sa pop(const std::chrono::nanoseconds &)
104  */
105 std::shared_ptr<const Packet> PacketQueue::pop()
106 {
107  std::unique_lock<std::mutex> lock(mutex_);
108  // Wait for available packet.
109  cv_.wait(lock, [this]()
110  {
111  return !running_ || !queue_.empty();
112  });
113  // Return the packet if the queue is running and is not empty.
114  return get_packet_();
115 }
116 
117 
118 /** Remove and return the packet at the front of the queue.
119  *
120  * This version will block on an empty queue and will not return until the
121  * queue becomes non empty, is closed with \ref close, or the \p timeout has
122  * expired.
123  *
124  * \param timeout How long to block waiting for an empty queue. Set to 0s for
125  * non blocking.
126  * \returns The packet that was at the front of the queue, or nullptr if the
127  * queue was closed or the timeout expired.
128  * \remarks
129  * Threadsafe (locking).
130  * \sa pop()
131  */
132 std::shared_ptr<const Packet> PacketQueue::pop(
133  const std::chrono::nanoseconds &timeout)
134 {
135  std::unique_lock<std::mutex> lock(mutex_);
136 
137  if (timeout > std::chrono::nanoseconds::zero())
138  {
139  // Wait for available packet (or the queue to be closed).
140  cv_.wait_for(lock, timeout, [this]()
141  {
142  return !running_ || !queue_.empty();
143  });
144  }
145 
146  // Return the packet if the queue is running and is not empty.
147  return get_packet_();
148 }
149 
150 
151 /** Add a new packet to the queue, with a priority.
152  *
153  * A higher \p priority will result in the \p packet being pushed to the front
154  * of the queue. When priorities are equal the order in which the packets were
155  * added to the queue is maintained.
156  *
157  * \param packet The packet to add to the queue. It must not be nullptr.
158  * \param priority The priority to use when adding it to the queue. The
159  * default is 0.
160  * \throws std::invalid_argument if the packet pointer is null.
161  * \remarks
162  * Threadsafe (locking).
163  */
164 void PacketQueue::push(std::shared_ptr<const Packet> packet, int priority)
165 {
166  if (packet == nullptr)
167  {
168  throw std::invalid_argument("Given packet pointer is null.");
169  }
170 
171  // Add the packet to the queue.
172  {
173  std::lock_guard<std::mutex> lock(mutex_);
174  queue_.emplace(std::move(packet), priority, ticket_++);
175  }
176  // Notify a waiting pop.
177  cv_.notify_one();
178 
179  // Trigger the callback.
180  if (callback_)
181  {
182  (*callback_)();
183  }
184 }
TEST_VIRTUAL void close()
Definition: PacketQueue.cpp:70
TEST_VIRTUAL std::shared_ptr< const Packet > pop()
STL namespace.
PacketQueue(std::optional< std::function< void(void)>> callback={})
Definition: PacketQueue.cpp:58
TEST_VIRTUAL void push(std::shared_ptr< const Packet > packet, int priority=0)
TEST_VIRTUAL bool empty()
Definition: PacketQueue.cpp:87