54 static auto test(
int) ->
decltype(std::declval<U>().durationMs, std::true_type{});
56 static std::false_type
test(...);
83template<
typename MsgType>
86 "MsgType must have a uint16_t durationMs field");
89 using Clock = std::chrono::steady_clock;
104 std::lock_guard<std::mutex> lock(m_mutex);
116 bool Push(
const MsgType& message) {
117 std::lock_guard<std::mutex> lock(m_mutex);
118 if (!PushUnlocked(message)) {
131 std::lock_guard<std::mutex> lock(m_mutex);
145 std::lock_guard<std::mutex> lock(m_mutex);
161 std::lock_guard<std::mutex> lock(m_mutex);
162 if (!IsConnectedUnlocked(now)) {
164 m_hasVirtualCursor =
false;
170 const auto elapsed = now - m_active->start;
171 const auto duration = std::chrono::milliseconds(m_active->message.durationMs);
173 if (elapsed < duration) {
177 const auto endTime = m_active->start + duration;
179 m_virtualCursor = endTime;
180 m_hasVirtualCursor =
true;
201 std::lock_guard<std::mutex> lock(m_mutex);
203 return m_active->message;
215 std::lock_guard<std::mutex> lock(m_mutex);
216 return IsConnectedUnlocked(now);
224 std::lock_guard<std::mutex> lock(m_mutex);
232 std::lock_guard<std::mutex> lock(m_mutex);
244 std::lock_guard<std::mutex> lock(m_mutex);
247 if (m_storage.size() != m_config.
capacity) {
249 m_storage.resize(m_config.
capacity);
254 std::lock_guard<std::mutex> lock(m_mutex);
277 : m_queue(queue), m_lock(queue.m_mutex) {}
279 bool Push(
const MsgType& message) {
280 if (!m_queue.PushUnlocked(message)) {
289 m_queue.ClearUnlocked();
294 std::lock_guard<std::mutex> m_lock;
302 Clock::time_point start;
305 void PromoteNext(Clock::time_point now) {
306 if (!m_hasVirtualCursor || m_virtualCursor == Clock::time_point::min()) {
307 m_virtualCursor = now;
308 m_hasVirtualCursor =
true;
312 m_virtualCursor = std::max(m_virtualCursor, now);
318 while (m_count > 0) {
319 const MsgType& next = FrontUnlocked();
320 const auto duration = std::chrono::milliseconds(next.durationMs);
321 auto projectedStart = m_virtualCursor;
322 auto projectedEnd = projectedStart + duration;
324 if (projectedEnd <= lagFloor) {
326 m_virtualCursor = projectedEnd;
331 if (projectedStart < lagFloor) {
332 projectedStart = lagFloor;
335 m_active = ActiveSlot{next, projectedStart};
337 m_virtualCursor = projectedStart + duration;
342 void ClearUnlocked() {
347 m_virtualCursor = Clock::time_point::min();
348 m_hasVirtualCursor =
false;
351 bool PushUnlocked(
const MsgType& message) {
352 if (m_count >= EffectiveDepth()) {
355 m_storage[m_tail] = message;
356 m_tail = (m_tail + 1) % Capacity();
362 if (m_count == 0)
return;
363 m_head = (m_head + 1) % Capacity();
367 const MsgType& FrontUnlocked()
const {
368 return m_storage[m_head];
371 std::size_t Capacity()
const {
return m_storage.size(); }
372 std::size_t EffectiveDepth()
const {
return std::min(m_config.
capacity, Capacity()); }
378 if (m_config.
maxCommandLag <= std::chrono::milliseconds::zero()) {
383 bool IsConnectedUnlocked(Clock::time_point now)
const {
384 if (m_lastRx == Clock::time_point::min()) {
390 MessageQueueConfig m_config{};
391 MessageQueueMetrics m_metrics{};
392 std::vector<MsgType> m_storage;
393 std::size_t m_head{0};
394 std::size_t m_tail{0};
395 std::size_t m_count{0};
396 std::optional<ActiveSlot> m_active;
397 Clock::time_point m_virtualCursor{Clock::time_point::min()};
398 bool m_hasVirtualCursor{
false};
399 Clock::time_point m_lastRx{Clock::time_point::min()};
400 mutable std::mutex m_mutex;
407template<
typename MsgType>
RAII transaction for atomic batch operations.
Transaction(MessageQueue &queue)
bool Push(const MsgType &message)
Generic timed message queue for any message type with durationMs field.
MessageQueueMetrics GetMetrics() const
Get current queue metrics.
std::size_t Size() const
Get the current number of queued messages.
MessageQueue(MessageQueueConfig config={})
std::chrono::steady_clock Clock
void Update(Clock::time_point now)
Update queue state - call once per control loop iteration.
void ResetMetrics()
Reset all metrics to zero.
Transaction BeginTransaction()
void SetConfig(const MessageQueueConfig &config)
Update queue configuration.
bool IsConnected(Clock::time_point now) const
Check if the connection is still active.
MessageQueueConfig GetConfig() const
std::optional< MsgType > ActiveMessage() const
Get the currently executing message.
bool Push(const MsgType &message)
Add a message to the back of the queue.
void NotifyReceived(Clock::time_point now)
Notify that messages were received from the network.
void Clear()
Remove all messages from the queue.
Type trait to detect messages with a durationMs field.
static constexpr bool value
static std::false_type test(...)
static auto test(int) -> decltype(std::declval< U >().durationMs, std::true_type{})
Configuration parameters for a message queue.
std::chrono::milliseconds connectionTimeout
Time before declaring disconnect.
std::size_t capacity
Maximum messages in queue.
std::chrono::milliseconds maxCommandLag
Max lag before clamping virtual time.
Runtime metrics for queue diagnostics.
uint64_t messagesSkipped
Messages skipped due to lag compensation.
uint64_t messagesReceived
Total messages pushed to queue.
uint64_t queueOverflows
Push attempts when queue was full.