25 : m_onPacket(std::move(onPacket)), m_onError(std::move(onError)) {
29 m_buffer.resize(bufferSize);
30 m_decodeScratch.resize(bufferSize);
47 if (length == 0 || !data) {
52 std::size_t remaining = length;
54 while (remaining > 0) {
55 if (m_size == m_buffer.size()) {
56 ParseBuffer(iterationBudget);
59 if (m_size == m_buffer.size()) {
60 const auto errorOffset = m_streamOffset;
62 m_streamOffset += m_size;
68 if (iterationBudget == 0) {
72 const std::size_t writable = std::min(remaining, m_buffer.size() - m_size);
77 const std::size_t chunkOffset = length - remaining;
78 WriteToBuffer(data + chunkOffset, writable);
79 remaining -= writable;
81 ParseBuffer(iterationBudget);
82 if (iterationBudget == 0 && remaining > 0) {
87 if (iterationBudget > 0) {
88 ParseBuffer(iterationBudget);
103 if (resetErrorState) {
104 m_consecutiveErrors = 0;
118void StreamParser::WriteToBuffer(
const uint8_t* data, std::size_t length) {
119 const std::size_t tailIndex = (m_head + m_size) % m_buffer.size();
120 const std::size_t firstChunk = std::min(length, m_buffer.size() - tailIndex);
121 std::memcpy(&m_buffer[tailIndex], data, firstChunk);
123 const std::size_t remaining = length - firstChunk;
125 std::memcpy(&m_buffer[0], data + firstChunk, remaining);
140void StreamParser::CopyOut(std::size_t offset, std::size_t length, uint8_t* dest)
const {
141 const std::size_t startIndex = (m_head + offset) % m_buffer.size();
142 const std::size_t firstChunk = std::min(length, m_buffer.size() - startIndex);
143 std::memcpy(dest, &m_buffer[startIndex], firstChunk);
145 const std::size_t remaining = length - firstChunk;
147 std::memcpy(dest + firstChunk, &m_buffer[0], remaining);
159void StreamParser::Discard(std::size_t count) {
160 if (count == 0 || m_size == 0) {
163 if (count > m_size) {
166 m_head = (m_head + count) % m_buffer.size();
168 m_streamOffset += count;
181void StreamParser::ParseBuffer(std::size_t& iterationBudget) {
189 const auto offset = m_streamOffset;
191 const std::size_t skip = FindNextHeaderCandidate();
192 Discard(skip > 0 ? skip : 1);
200 const std::size_t wireSize = LookupWireSize(
static_cast<MessageTypeId>(msgTypeId));
202 const auto offset = m_streamOffset;
209 const auto offset = m_streamOffset;
217 const std::size_t available = std::min(expected, m_size);
218 CopyOut(0, available, m_decodeScratch.data());
226 const auto offset = m_streamOffset;
227 EmitError(result.error, offset);
232 const std::size_t consumed = result.bytesConsumed > 0 ? result.bytesConsumed : 1;
238 EmitPacket(*result.view);
239 m_consecutiveErrors = 0;
240 Discard(result.bytesConsumed);
252std::size_t StreamParser::FindNextHeaderCandidate()
const {
254 return m_size == 0 ? 0 : 1;
257 for (std::size_t offset = 1; offset < m_size; ++offset) {
258 const std::size_t firstIndex = (m_head + offset) % m_buffer.size();
263 if (offset + 1 >= m_size) {
267 const std::size_t secondIndex = (m_head + offset + 1) % m_buffer.size();
284std::size_t StreamParser::LookupWireSize(
MessageTypeId typeId)
const {
286 if (m_wireSizeLookup) {
287 return m_wireSizeLookup(typeId);
291 return info ? info->wireSize : 0;
298void StreamParser::EmitPacket(
const PacketView& packet) {
312void StreamParser::EmitError(
PacketError error, std::size_t offset) {
314 ErrorInfo info{error, offset, ++m_consecutiveErrors};
static constexpr std::size_t kMaxParseIterationsPerPush
std::function< void(const PacketView &)> PacketCallback
StreamParser(PacketCallback onPacket, ErrorCallback onError={}, std::size_t bufferSize=4096)
Construct a stream parser with callbacks and buffer size.
void Reset(bool resetErrorState=true)
Reset the parser to its initial state.
std::function< void(const ErrorInfo &)> ErrorCallback
void Push(const uint8_t *data, std::size_t length)
Push raw bytes into the parser for processing.
constexpr std::size_t kMaxMessagesPerPacket
Maximum number of messages allowed in a single packet.
constexpr std::size_t kChecksumSize
Size of CRC32 checksum in bytes.
constexpr std::size_t kHeaderSize
Size of packet header in bytes.
uint16_t LoadU16(const uint8_t *p)
constexpr std::size_t kHeaderMsgCountIndex
std::optional< MessageInfo > GetMessageInfo(MessageTypeId typeId)
constexpr std::size_t kHeaderMsgTypeIndex
constexpr uint8_t kProtocolMinorV3
PacketError
Error codes returned by packet decoding operations.
@ UnsupportedVersion
Protocol version mismatch.
@ ChecksumMismatch
CRC32 validation failed.
@ Truncated
Buffer ends before expected packet length.
@ InvalidFloat
NaN or Inf detected in float field.
@ UnknownMessageType
Message type ID not in registry.
DecodeViewResult DecodePacketViewWithSize(const uint8_t *data, std::size_t length, std::size_t wireSize)
Decode a packet with explicitly provided message wire size.
constexpr std::size_t kHeaderSizeV3
constexpr uint8_t kProtocolMajorV3