BCNP 3.2.1
Batched Command Network Protocol
Loading...
Searching...
No Matches
telemetry_accumulator.h
Go to the documentation of this file.
1#pragma once
2
3#include "bcnp/packet.h"
6
7#include <chrono>
8#include <cstddef>
9#include <cstdint>
10#include <functional>
11#include <mutex>
12#include <vector>
13
14namespace bcnp {
15
21 std::size_t flushIntervalTicks{2};
22
24 std::size_t maxBufferedMessages{64};
25};
26
54template<typename MsgType, typename Storage = StaticVector<MsgType, 64>>
56public:
57 using Clock = std::chrono::steady_clock;
58
60 : m_config(config) {}
61
71 bool Record(const MsgType& msg) {
72 std::lock_guard<std::mutex> lock(m_mutex);
73
74 // Ring-buffer behavior: drop oldest when full
75 if (m_buffer.size() >= m_config.maxBufferedMessages) {
76 DropOldestUnlocked();
77 ++m_metrics.bufferOverflows;
78 }
79
80 m_buffer.push_back(msg);
81 ++m_metrics.messagesRecorded;
82 return true;
83 }
84
90 template<typename InputIt>
91 void RecordBatch(InputIt first, InputIt last) {
92 std::lock_guard<std::mutex> lock(m_mutex);
93 for (auto it = first; it != last; ++it) {
94 if (m_buffer.size() >= m_config.maxBufferedMessages) {
95 DropOldestUnlocked();
96 ++m_metrics.bufferOverflows;
97 }
98 m_buffer.push_back(*it);
99 ++m_metrics.messagesRecorded;
100 }
101 }
102
111 template<typename Adapter>
112 bool MaybeFlush(Adapter& adapter) {
113 Storage localBuffer;
114 std::size_t messageCount = 0;
115
116 {
117 std::lock_guard<std::mutex> lock(m_mutex);
118
119 ++m_tickCount;
120 if (m_tickCount < m_config.flushIntervalTicks) {
121 return false;
122 }
123
124 m_tickCount = 0;
125
126 if (m_buffer.empty()) {
127 return false;
128 }
129
130 // Swap buffer out while holding lock
131 localBuffer = std::move(m_buffer);
132 m_buffer = Storage{};
133 messageCount = localBuffer.size();
134 }
135 // Lock released - now safe to do blocking I/O
136
137 return SendBuffer(adapter, std::move(localBuffer), messageCount);
138 }
139
146 template<typename Adapter>
147 bool ForceFlush(Adapter& adapter) {
148 Storage localBuffer;
149 std::size_t messageCount = 0;
150
151 {
152 std::lock_guard<std::mutex> lock(m_mutex);
153 m_tickCount = 0;
154
155 if (m_buffer.empty()) {
156 return false;
157 }
158
159 // Swap buffer out while holding lock
160 localBuffer = std::move(m_buffer);
161 m_buffer = Storage{};
162 messageCount = localBuffer.size();
163 }
164 // Lock released - now safe to do blocking I/O
165
166 return SendBuffer(adapter, std::move(localBuffer), messageCount);
167 }
168
172 std::size_t BufferedCount() const {
173 std::lock_guard<std::mutex> lock(m_mutex);
174 return m_buffer.size();
175 }
176
180 void Clear() {
181 std::lock_guard<std::mutex> lock(m_mutex);
182 m_buffer.clear();
183 m_tickCount = 0;
184 }
185
189 struct Metrics {
190 uint64_t messagesRecorded{0};
191 uint64_t messagesSent{0};
192 uint64_t packetsSent{0};
193 uint64_t bufferOverflows{0};
194 uint64_t sendFailures{0};
195 };
196
198 std::lock_guard<std::mutex> lock(m_mutex);
199 return m_metrics;
200 }
201
203 std::lock_guard<std::mutex> lock(m_mutex);
204 m_metrics = {};
205 }
206
211 std::lock_guard<std::mutex> lock(m_mutex);
212 m_config = config;
213 }
214
215private:
222 template<typename Adapter>
223 bool SendBuffer(Adapter& adapter, Storage&& buffer, std::size_t messageCount) {
224 // Build packet from extracted buffer
226 packet.messages = std::move(buffer);
227
228 // Encode to wire format
229 m_wireBuffer.clear();
230 if (!EncodeTypedPacket(packet, m_wireBuffer)) {
231 std::lock_guard<std::mutex> lock(m_mutex);
232 ++m_metrics.sendFailures;
233 return false;
234 }
235
236 // Send (potentially blocking syscall - but we don't hold the lock!)
237 if (!adapter.SendBytes(m_wireBuffer.data(), m_wireBuffer.size())) {
238 std::lock_guard<std::mutex> lock(m_mutex);
239 ++m_metrics.sendFailures;
240 return false;
241 }
242
243 // Update metrics
244 {
245 std::lock_guard<std::mutex> lock(m_mutex);
246 m_metrics.messagesSent += messageCount;
247 ++m_metrics.packetsSent;
248 }
249 return true;
250 }
251
252 TelemetryAccumulatorConfig m_config;
253 Storage m_buffer{};
254 std::vector<uint8_t> m_wireBuffer; // Persistent buffer, capacity retained across flushes
255 std::size_t m_tickCount{0};
256 Metrics m_metrics{};
257 mutable std::mutex m_mutex;
258
259 void DropOldestUnlocked() {
260 if (m_buffer.empty()) {
261 return;
262 }
263
264 // Shift everything left by one; O(n) but small (default 64)
265 for (std::size_t i = 1; i < m_buffer.size(); ++i) {
266 m_buffer[i - 1] = std::move(m_buffer[i]);
267 }
268 m_buffer.pop_back();
269 }
270};
271
275template<typename MsgType>
277
281template<typename MsgType, std::size_t Capacity = 64>
283
284} // namespace bcnp
Accumulates high-frequency telemetry data and batches into packets.
bool Record(const MsgType &msg)
Record a telemetry reading.
std::chrono::steady_clock Clock
void RecordBatch(InputIt first, InputIt last)
Record multiple telemetry readings at once.
bool MaybeFlush(Adapter &adapter)
Flush if interval has elapsed.
void SetConfig(const TelemetryAccumulatorConfig &config)
Update configuration.
void Clear()
Clear all buffered messages without sending.
std::size_t BufferedCount() const
Get the number of buffered messages waiting to be sent.
TelemetryAccumulator(TelemetryAccumulatorConfig config={})
bool ForceFlush(Adapter &adapter)
Force an immediate flush regardless of interval.
bool EncodeTypedPacket(const TypedPacket< MsgType, Storage > &packet, uint8_t *output, std::size_t capacity, std::size_t &bytesWritten)
Encode a typed packet to a pre-allocated buffer.
Definition packet.h:322
BCNP packet structures, encoding, and decoding utilities.
Fixed-capacity vector with stack allocation (no heap).
Configuration for telemetry accumulator.
std::size_t maxBufferedMessages
Maximum messages to accumulate before forcing a flush.
std::size_t flushIntervalTicks
Flush interval: send telemetry every N control loop ticks.
Generic packet containing messages of a specific type.
Definition packet.h:242
Storage messages
Definition packet.h:247