BCNP 3.2.1
Batched Command Network Protocol
Loading...
Searching...
No Matches
message_queue.h
Go to the documentation of this file.
1#pragma once
2
15#include <algorithm>
16#include <chrono>
17#include <cstddef>
18#include <cstdint>
19#include <mutex>
20#include <optional>
21#include <vector>
22
23namespace bcnp {
24
29 std::size_t capacity{200};
30 std::chrono::milliseconds connectionTimeout{200};
31 std::chrono::milliseconds maxCommandLag{100};
32};
33
38 uint64_t messagesReceived{0};
39 uint64_t queueOverflows{0};
40 uint64_t messagesSkipped{0};
41};
42
51template<typename T>
53 template<typename U>
54 static auto test(int) -> decltype(std::declval<U>().durationMs, std::true_type{});
55 template<typename>
56 static std::false_type test(...);
57 static constexpr bool value = decltype(test<T>(0))::value;
58};
59
83template<typename MsgType>
85 static_assert(HasDurationMs<MsgType>::value,
86 "MsgType must have a uint16_t durationMs field");
87
88public:
89 using Clock = std::chrono::steady_clock;
90
91 explicit MessageQueue(MessageQueueConfig config = {})
92 : m_config(config) {
93 ClampConfig();
94 m_storage.resize(m_config.capacity);
95 }
96
103 void Clear() {
104 std::lock_guard<std::mutex> lock(m_mutex);
105 ClearUnlocked();
106 }
107
116 bool Push(const MsgType& message) {
117 std::lock_guard<std::mutex> lock(m_mutex);
118 if (!PushUnlocked(message)) {
119 ++m_metrics.queueOverflows;
120 return false;
121 }
122 ++m_metrics.messagesReceived;
123 return true;
124 }
125
130 std::size_t Size() const {
131 std::lock_guard<std::mutex> lock(m_mutex);
132 return m_count;
133 }
134
144 void NotifyReceived(Clock::time_point now) {
145 std::lock_guard<std::mutex> lock(m_mutex);
146 m_lastRx = now;
147 }
148
160 void Update(Clock::time_point now) {
161 std::lock_guard<std::mutex> lock(m_mutex);
162 if (!IsConnectedUnlocked(now)) {
163 ClearUnlocked();
164 m_hasVirtualCursor = false;
165 return;
166 }
167
168 while (true) {
169 if (m_active) {
170 const auto elapsed = now - m_active->start;
171 const auto duration = std::chrono::milliseconds(m_active->message.durationMs);
172
173 if (elapsed < duration) {
174 break;
175 }
176
177 const auto endTime = m_active->start + duration;
178 m_active.reset();
179 m_virtualCursor = endTime;
180 m_hasVirtualCursor = true;
181 }
182
183 if (!m_active) {
184 PromoteNext(now);
185 if (!m_active) {
186 break;
187 }
188 }
189 }
190 }
191
200 std::optional<MsgType> ActiveMessage() const {
201 std::lock_guard<std::mutex> lock(m_mutex);
202 if (m_active) {
203 return m_active->message;
204 }
205 return std::nullopt;
206 }
207
214 bool IsConnected(Clock::time_point now) const {
215 std::lock_guard<std::mutex> lock(m_mutex);
216 return IsConnectedUnlocked(now);
217 }
218
224 std::lock_guard<std::mutex> lock(m_mutex);
225 return m_metrics;
226 }
227
232 std::lock_guard<std::mutex> lock(m_mutex);
233 m_metrics = {};
234 }
235
243 void SetConfig(const MessageQueueConfig& config) {
244 std::lock_guard<std::mutex> lock(m_mutex);
245 m_config = config;
246 ClampConfig();
247 if (m_storage.size() != m_config.capacity) {
248 ClearUnlocked();
249 m_storage.resize(m_config.capacity);
250 }
251 }
252
254 std::lock_guard<std::mutex> lock(m_mutex);
255 return m_config;
256 }
257
275 public:
276 explicit Transaction(MessageQueue& queue)
277 : m_queue(queue), m_lock(queue.m_mutex) {}
278
279 bool Push(const MsgType& message) {
280 if (!m_queue.PushUnlocked(message)) {
281 ++m_queue.m_metrics.queueOverflows;
282 return false;
283 }
284 ++m_queue.m_metrics.messagesReceived;
285 return true;
286 }
287
288 void Clear() {
289 m_queue.ClearUnlocked();
290 }
291
292 private:
293 MessageQueue& m_queue;
294 std::lock_guard<std::mutex> m_lock;
295 };
296
298
299private:
300 struct ActiveSlot {
301 MsgType message;
302 Clock::time_point start;
303 };
304
305 void PromoteNext(Clock::time_point now) {
306 if (!m_hasVirtualCursor || m_virtualCursor == Clock::time_point::min()) {
307 m_virtualCursor = now;
308 m_hasVirtualCursor = true;
309 }
310
311 if (m_count == 0) {
312 m_virtualCursor = std::max(m_virtualCursor, now);
313 return;
314 }
315
316 const auto lagFloor = now - m_config.maxCommandLag;
317
318 while (m_count > 0) {
319 const MsgType& next = FrontUnlocked();
320 const auto duration = std::chrono::milliseconds(next.durationMs);
321 auto projectedStart = m_virtualCursor;
322 auto projectedEnd = projectedStart + duration;
323
324 if (projectedEnd <= lagFloor) {
325 PopUnlocked();
326 m_virtualCursor = projectedEnd;
327 ++m_metrics.messagesSkipped;
328 continue;
329 }
330
331 if (projectedStart < lagFloor) {
332 projectedStart = lagFloor;
333 }
334
335 m_active = ActiveSlot{next, projectedStart};
336 PopUnlocked();
337 m_virtualCursor = projectedStart + duration;
338 return;
339 }
340 }
341
342 void ClearUnlocked() {
343 m_head = 0;
344 m_tail = 0;
345 m_count = 0;
346 m_active.reset();
347 m_virtualCursor = Clock::time_point::min();
348 m_hasVirtualCursor = false;
349 }
350
351 bool PushUnlocked(const MsgType& message) {
352 if (m_count >= EffectiveDepth()) {
353 return false;
354 }
355 m_storage[m_tail] = message;
356 m_tail = (m_tail + 1) % Capacity();
357 ++m_count;
358 return true;
359 }
360
361 void PopUnlocked() {
362 if (m_count == 0) return;
363 m_head = (m_head + 1) % Capacity();
364 --m_count;
365 }
366
367 const MsgType& FrontUnlocked() const {
368 return m_storage[m_head];
369 }
370
371 std::size_t Capacity() const { return m_storage.size(); }
372 std::size_t EffectiveDepth() const { return std::min(m_config.capacity, Capacity()); }
373
374 void ClampConfig() {
375 if (m_config.capacity == 0) {
376 m_config.capacity = 200;
377 }
378 if (m_config.maxCommandLag <= std::chrono::milliseconds::zero()) {
379 m_config.maxCommandLag = std::chrono::milliseconds(1);
380 }
381 }
382
383 bool IsConnectedUnlocked(Clock::time_point now) const {
384 if (m_lastRx == Clock::time_point::min()) {
385 return false;
386 }
387 return (now - m_lastRx) <= m_config.connectionTimeout;
388 }
389
390 MessageQueueConfig m_config{};
391 MessageQueueMetrics m_metrics{};
392 std::vector<MsgType> m_storage;
393 std::size_t m_head{0};
394 std::size_t m_tail{0};
395 std::size_t m_count{0};
396 std::optional<ActiveSlot> m_active;
397 Clock::time_point m_virtualCursor{Clock::time_point::min()};
398 bool m_hasVirtualCursor{false};
399 Clock::time_point m_lastRx{Clock::time_point::min()};
400 mutable std::mutex m_mutex;
401};
402
407template<typename MsgType>
409
410} // namespace bcnp
RAII transaction for atomic batch operations.
Transaction(MessageQueue &queue)
bool Push(const MsgType &message)
Generic timed message queue for any message type with durationMs field.
MessageQueueMetrics GetMetrics() const
Get current queue metrics.
std::size_t Size() const
Get the current number of queued messages.
MessageQueue(MessageQueueConfig config={})
std::chrono::steady_clock Clock
void Update(Clock::time_point now)
Update queue state - call once per control loop iteration.
void ResetMetrics()
Reset all metrics to zero.
Transaction BeginTransaction()
void SetConfig(const MessageQueueConfig &config)
Update queue configuration.
bool IsConnected(Clock::time_point now) const
Check if the connection is still active.
MessageQueueConfig GetConfig() const
std::optional< MsgType > ActiveMessage() const
Get the currently executing message.
bool Push(const MsgType &message)
Add a message to the back of the queue.
void NotifyReceived(Clock::time_point now)
Notify that messages were received from the network.
void Clear()
Remove all messages from the queue.
Type trait to detect messages with a durationMs field.
static constexpr bool value
static std::false_type test(...)
static auto test(int) -> decltype(std::declval< U >().durationMs, std::true_type{})
Configuration parameters for a message queue.
std::chrono::milliseconds connectionTimeout
Time before declaring disconnect.
std::size_t capacity
Maximum messages in queue.
std::chrono::milliseconds maxCommandLag
Max lag before clamping virtual time.
Runtime metrics for queue diagnostics.
uint64_t messagesSkipped
Messages skipped due to lag compensation.
uint64_t messagesReceived
Total messages pushed to queue.
uint64_t queueOverflows
Push attempts when queue was full.