BCNP 3.2.1
Batched Command Network Protocol
Loading...
Searching...
No Matches
stream_parser.cpp
Go to the documentation of this file.
1
10#include "bcnp/stream_parser.h"
11
12#include <algorithm>
13#include <cstring>
14
15namespace bcnp {
16
24StreamParser::StreamParser(PacketCallback onPacket, ErrorCallback onError, std::size_t bufferSize)
25 : m_onPacket(std::move(onPacket)), m_onError(std::move(onError)) {
26 if (bufferSize < kHeaderSize + kChecksumSize) {
27 bufferSize = kHeaderSize + kChecksumSize;
28 }
29 m_buffer.resize(bufferSize);
30 m_decodeScratch.resize(bufferSize);
31 }
32
46void StreamParser::Push(const uint8_t* data, std::size_t length) {
47 if (length == 0 || !data) {
48 return;
49 }
50
51 std::size_t iterationBudget = kMaxParseIterationsPerPush;
52 std::size_t remaining = length;
53
54 while (remaining > 0) {
55 if (m_size == m_buffer.size()) {
56 ParseBuffer(iterationBudget);
57 }
58
59 if (m_size == m_buffer.size()) {
60 const auto errorOffset = m_streamOffset;
61 EmitError(PacketError::TooManyCommands, errorOffset);
62 m_streamOffset += m_size;
63 m_head = 0;
64 m_size = 0;
65 return;
66 }
67
68 if (iterationBudget == 0) {
69 return;
70 }
71
72 const std::size_t writable = std::min(remaining, m_buffer.size() - m_size);
73 if (writable == 0) {
74 break;
75 }
76
77 const std::size_t chunkOffset = length - remaining;
78 WriteToBuffer(data + chunkOffset, writable);
79 remaining -= writable;
80
81 ParseBuffer(iterationBudget);
82 if (iterationBudget == 0 && remaining > 0) {
83 return;
84 }
85 }
86
87 if (iterationBudget > 0) {
88 ParseBuffer(iterationBudget);
89 }
90}
91
100void StreamParser::Reset(bool resetErrorState) {
101 m_head = 0;
102 m_size = 0;
103 if (resetErrorState) {
104 m_consecutiveErrors = 0;
105 m_streamOffset = 0;
106 }
107}
108
118void StreamParser::WriteToBuffer(const uint8_t* data, std::size_t length) {
119 const std::size_t tailIndex = (m_head + m_size) % m_buffer.size();
120 const std::size_t firstChunk = std::min(length, m_buffer.size() - tailIndex);
121 std::memcpy(&m_buffer[tailIndex], data, firstChunk);
122
123 const std::size_t remaining = length - firstChunk;
124 if (remaining > 0) {
125 std::memcpy(&m_buffer[0], data + firstChunk, remaining);
126 }
127 m_size += length;
128}
129
140void StreamParser::CopyOut(std::size_t offset, std::size_t length, uint8_t* dest) const {
141 const std::size_t startIndex = (m_head + offset) % m_buffer.size();
142 const std::size_t firstChunk = std::min(length, m_buffer.size() - startIndex);
143 std::memcpy(dest, &m_buffer[startIndex], firstChunk);
144
145 const std::size_t remaining = length - firstChunk;
146 if (remaining > 0) {
147 std::memcpy(dest + firstChunk, &m_buffer[0], remaining);
148 }
149}
150
159void StreamParser::Discard(std::size_t count) {
160 if (count == 0 || m_size == 0) {
161 return;
162 }
163 if (count > m_size) {
164 count = m_size;
165 }
166 m_head = (m_head + count) % m_buffer.size();
167 m_size -= count;
168 m_streamOffset += count;
169}
170
181void StreamParser::ParseBuffer(std::size_t& iterationBudget) {
182 while (iterationBudget > 0 && m_size >= kHeaderSizeV3) {
183 --iterationBudget;
184
185 CopyOut(0, kHeaderSizeV3, m_decodeScratch.data());
186
187 if (m_decodeScratch[kHeaderMajorIndex] != kProtocolMajorV3 ||
188 m_decodeScratch[kHeaderMinorIndex] != kProtocolMinorV3) {
189 const auto offset = m_streamOffset;
190 EmitError(PacketError::UnsupportedVersion, offset);
191 const std::size_t skip = FindNextHeaderCandidate();
192 Discard(skip > 0 ? skip : 1);
193 continue;
194 }
195
196 const uint16_t msgTypeId = detail::LoadU16(&m_decodeScratch[kHeaderMsgTypeIndex]);
197 const uint16_t messageCount = detail::LoadU16(&m_decodeScratch[kHeaderMsgCountIndex]);
198
199 // Lookup message type to get wire size
200 const std::size_t wireSize = LookupWireSize(static_cast<MessageTypeId>(msgTypeId));
201 if (wireSize == 0) {
202 const auto offset = m_streamOffset;
203 EmitError(PacketError::UnknownMessageType, offset);
204 Discard(1);
205 continue;
206 }
207
208 if (messageCount > kMaxMessagesPerPacket) {
209 const auto offset = m_streamOffset;
210 EmitError(PacketError::TooManyCommands, offset);
211 Discard(1);
212 continue;
213 }
214
215 const std::size_t expected = kHeaderSizeV3 + (messageCount * wireSize) + kChecksumSize;
216
217 const std::size_t available = std::min(expected, m_size);
218 CopyOut(0, available, m_decodeScratch.data());
219 auto result = DecodePacketViewWithSize(m_decodeScratch.data(), available, wireSize);
220
221 if (result.error == PacketError::Truncated) {
222 break;
223 }
224
225 if (!result.view) {
226 const auto offset = m_streamOffset;
227 EmitError(result.error, offset);
228 // Poison packet: only discard 1 byte to resync, not the entire calculated frame
229 if (result.error == PacketError::ChecksumMismatch || result.error == PacketError::InvalidFloat) {
230 Discard(1);
231 } else {
232 const std::size_t consumed = result.bytesConsumed > 0 ? result.bytesConsumed : 1;
233 Discard(consumed);
234 }
235 continue;
236 }
237
238 EmitPacket(*result.view);
239 m_consecutiveErrors = 0;
240 Discard(result.bytesConsumed);
241 }
242}
243
252std::size_t StreamParser::FindNextHeaderCandidate() const {
253 if (m_size <= 1) {
254 return m_size == 0 ? 0 : 1;
255 }
256
257 for (std::size_t offset = 1; offset < m_size; ++offset) {
258 const std::size_t firstIndex = (m_head + offset) % m_buffer.size();
259 if (m_buffer[firstIndex] != kProtocolMajorV3) {
260 continue;
261 }
262
263 if (offset + 1 >= m_size) {
264 return offset;
265 }
266
267 const std::size_t secondIndex = (m_head + offset + 1) % m_buffer.size();
268 if (m_buffer[secondIndex] == kProtocolMinorV3) {
269 return offset;
270 }
271 }
272 return 1;
273}
274
284std::size_t StreamParser::LookupWireSize(MessageTypeId typeId) const {
285 // Use custom lookup if provided
286 if (m_wireSizeLookup) {
287 return m_wireSizeLookup(typeId);
288 }
289 // Fall back to global registry
290 auto info = GetMessageInfo(typeId);
291 return info ? info->wireSize : 0;
292}
293
298void StreamParser::EmitPacket(const PacketView& packet) {
299 if (m_onPacket) {
300 m_onPacket(packet);
301 }
302}
303
312void StreamParser::EmitError(PacketError error, std::size_t offset) {
313 if (m_onError) {
314 ErrorInfo info{error, offset, ++m_consecutiveErrors};
315 m_onError(info);
316 }
317}
318
319} // namespace bcnp
static constexpr std::size_t kMaxParseIterationsPerPush
std::function< void(const PacketView &)> PacketCallback
StreamParser(PacketCallback onPacket, ErrorCallback onError={}, std::size_t bufferSize=4096)
Construct a stream parser with callbacks and buffer size.
void Reset(bool resetErrorState=true)
Reset the parser to its initial state.
std::function< void(const ErrorInfo &)> ErrorCallback
void Push(const uint8_t *data, std::size_t length)
Push raw bytes into the parser for processing.
constexpr std::size_t kHeaderMajorIndex
Byte offset of major version in header.
Definition packet.h:57
constexpr std::size_t kHeaderMinorIndex
Byte offset of minor version in header.
Definition packet.h:60
constexpr std::size_t kMaxMessagesPerPacket
Maximum number of messages allowed in a single packet.
Definition packet.h:34
constexpr std::size_t kChecksumSize
Size of CRC32 checksum in bytes.
Definition packet.h:31
constexpr std::size_t kHeaderSize
Size of packet header in bytes.
Definition packet.h:46
uint16_t LoadU16(const uint8_t *p)
constexpr std::size_t kHeaderMsgCountIndex
std::optional< MessageInfo > GetMessageInfo(MessageTypeId typeId)
constexpr std::size_t kHeaderMsgTypeIndex
constexpr uint8_t kProtocolMinorV3
PacketError
Error codes returned by packet decoding operations.
Definition packet.h:268
@ UnsupportedVersion
Protocol version mismatch.
@ ChecksumMismatch
CRC32 validation failed.
@ Truncated
Buffer ends before expected packet length.
@ InvalidFloat
NaN or Inf detected in float field.
@ UnknownMessageType
Message type ID not in registry.
DecodeViewResult DecodePacketViewWithSize(const uint8_t *data, std::size_t length, std::size_t wireSize)
Decode a packet with explicitly provided message wire size.
Definition packet.cpp:76
constexpr std::size_t kHeaderSizeV3
constexpr uint8_t kProtocolMajorV3