BCNP 3.2.1
Batched Command Network Protocol
Loading...
Searching...
No Matches
tcp_posix.cpp
Go to the documentation of this file.
1
16
17#include <algorithm>
18#include <arpa/inet.h>
19#include <cerrno>
20#include <chrono>
21#include <cstring>
22#include <fcntl.h>
23#include <iostream>
24#include <netinet/tcp.h>
25#include <sys/socket.h>
26#include <unistd.h>
27
28namespace bcnp {
29
30namespace {
32constexpr auto kReconnectInterval = std::chrono::milliseconds(500);
33
35constexpr auto kLogThrottle = std::chrono::seconds(1);
36} // namespace
37
54TcpPosixAdapter::TcpPosixAdapter(uint16_t listenPort, const char* targetIp, uint16_t targetPort) {
55 m_txBuffer = std::make_unique<uint8_t[]>(kTxBufferCapacity);
56
57 if (listenPort > 0) {
58 m_isServer = true;
59 if (!CreateBaseSocket()) {
60 return;
61 }
62
63 sockaddr_in bindAddr{};
64 bindAddr.sin_family = AF_INET;
65 bindAddr.sin_port = htons(listenPort);
66 bindAddr.sin_addr.s_addr = INADDR_ANY;
67
68 if (bind(m_socket, reinterpret_cast<sockaddr*>(&bindAddr), sizeof(bindAddr)) < 0) {
69 LogError("bind");
70 ::close(m_socket);
71 m_socket = -1;
72 return;
73 }
74
75 if (listen(m_socket, 1) < 0) {
76 LogError("listen");
77 ::close(m_socket);
78 m_socket = -1;
79 return;
80 }
81 } else if (targetIp && targetPort > 0) {
82 m_isServer = false;
83 sockaddr_in targetAddr{};
84 targetAddr.sin_family = AF_INET;
85 targetAddr.sin_port = htons(targetPort);
86 if (inet_pton(AF_INET, targetIp, &targetAddr.sin_addr) <= 0) {
87 LogError("inet_pton (invalid target IP)");
88 return;
89 }
90
91 m_peerAddr = targetAddr;
92 m_peerAddrValid = true;
93 BeginClientConnect(true);
94 }
95}
96
101 if (m_clientSocket >= 0) {
102 ::close(m_clientSocket);
103 }
104 if (m_socket >= 0) {
105 ::close(m_socket);
106 }
107}
108
117bool TcpPosixAdapter::CreateBaseSocket() {
118 if (m_socket >= 0) {
119 ::close(m_socket);
120 m_socket = -1;
121 }
122
123 m_socket = ::socket(AF_INET, SOCK_STREAM, 0);
124 if (m_socket < 0) {
125 LogError("socket");
126 return false;
127 }
128
129 int yes = 1;
130 if (setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
131 LogError("setsockopt(SO_REUSEADDR)");
132 }
133
134 if (!ConfigureSocket(m_socket)) {
135 ::close(m_socket);
136 m_socket = -1;
137 return false;
138 }
139 m_isConnected = false;
140 m_connectInProgress = false;
141 return true;
142}
143
152void TcpPosixAdapter::BeginClientConnect(bool forceImmediate) {
153 if (m_isServer || !m_peerAddrValid) {
154 return;
155 }
156
157 const auto now = std::chrono::steady_clock::now();
158 if (!forceImmediate && now < m_nextReconnectAttempt) {
159 return;
160 }
161 m_nextReconnectAttempt = now + kReconnectInterval;
162
163 if (!CreateBaseSocket()) {
164 return;
165 }
166
167 if (connect(m_socket, reinterpret_cast<sockaddr*>(&m_peerAddr), sizeof(m_peerAddr)) < 0) {
168 if (errno == EINPROGRESS || errno == EALREADY) {
169 m_connectInProgress = true;
170 return;
171 }
172 LogError("connect");
173 ::close(m_socket);
174 m_socket = -1;
175 m_connectInProgress = false;
176 m_isConnected = false;
177 return;
178 }
179
180 // Synchronous connect succeeded (rare for non-blocking socket)
181 m_isConnected = true;
182 m_connectInProgress = false;
183}
184
196void TcpPosixAdapter::PollConnection() {
197 if (m_isServer) {
198 if (m_socket < 0) {
199 return;
200 }
201
202 if (m_clientSocket >= 0) {
203 // Check for zombie client timeout
204 const auto now = std::chrono::steady_clock::now();
205 if (m_lastServerRx != std::chrono::steady_clock::time_point{} &&
206 now - m_lastServerRx > m_serverClientTimeout) {
207 ::close(m_clientSocket);
208 m_clientSocket = -1;
209 m_isConnected = false;
210 m_lastServerRx = {};
211 return;
212 }
213 m_isConnected = true;
214 return;
215 }
216
217 sockaddr_in clientAddr{};
218 socklen_t len = sizeof(clientAddr);
219 int clientSock = ::accept(m_socket, reinterpret_cast<sockaddr*>(&clientAddr), &len);
220 if (clientSock >= 0) {
221 if (!ConfigureSocket(clientSock)) {
222 ::close(clientSock);
223 return;
224 }
225 m_clientSocket = clientSock;
226 m_isConnected = true;
227 m_lastServerRx = std::chrono::steady_clock::now();
228 TryFlushTxBuffer(m_clientSocket);
229 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
230 LogError("accept");
231 }
232 return;
233 }
234
235 if (m_socket < 0) {
236 BeginClientConnect(false);
237 return;
238 }
239
240 if (!m_isConnected && m_connectInProgress) {
241 int err = 0;
242 socklen_t errLen = sizeof(err);
243 if (getsockopt(m_socket, SOL_SOCKET, SO_ERROR, &err, &errLen) < 0) {
244 LogError("getsockopt(SO_ERROR)");
245 return;
246 }
247
248 if (err == 0) {
249 m_isConnected = true;
250 m_connectInProgress = false;
251 TryFlushTxBuffer(m_socket);
252 return;
253 }
254
255 if (err == EINPROGRESS || err == EALREADY) {
256 return;
257 }
258
259 errno = err;
260 LogError("connect (async)");
261 ::close(m_socket);
262 m_socket = -1;
263 m_connectInProgress = false;
264 BeginClientConnect(false);
265 return;
266 }
267
268 if (!m_isConnected) {
269 BeginClientConnect(false);
270 }
271}
272
279void TcpPosixAdapter::HandleConnectionLoss() {
280 m_isConnected = false;
281 m_handshakeComplete = false;
282 m_handshakeSent = false;
283 m_schemaValidated = false;
284 m_handshakeReceived = 0;
285 m_remoteSchemaHash = 0;
286 DropPendingTx();
287
288 if (m_isServer) {
289 if (m_clientSocket >= 0) {
290 ::close(m_clientSocket);
291 m_clientSocket = -1;
292 }
293 m_lastServerRx = {};
294 return;
295 }
296
297 // Client mode - close broken socket and trigger reconnection
298 if (m_socket >= 0) {
299 ::close(m_socket);
300 m_socket = -1;
301 }
302 m_connectInProgress = false;
303 BeginClientConnect(true);
304}
305
316bool TcpPosixAdapter::SendBytes(const uint8_t* data, std::size_t length) {
317 if (!data || length == 0) {
318 return true;
319 }
320
321 PollConnection();
322
323 int targetSock = m_isServer ? m_clientSocket : m_socket;
324 if (targetSock < 0 || !m_isConnected) {
325 return false;
326 }
327
328 if (length > kTxBufferCapacity) {
329 LogError("send payload exceeds tx buffer capacity");
330 return false;
331 }
332
333 if (!EnqueueTx(data, length)) {
334 return false;
335 }
336
337 TryFlushTxBuffer(targetSock);
338 return true;
339}
340
351std::size_t TcpPosixAdapter::ReceiveChunk(uint8_t* buffer, std::size_t maxLength) {
352 if (m_socket < 0 || !buffer || maxLength == 0) {
353 return 0;
354 }
355
356 PollConnection();
357
358 int targetSock = m_isServer ? m_clientSocket : m_socket;
359 if (targetSock < 0 || !m_isConnected) {
360 return 0;
361 }
362
363 TryFlushTxBuffer(targetSock);
364
365 // Send handshake if connected but not sent yet
366 if (!m_handshakeSent) {
368 }
369
370 ssize_t received;
371 do {
372 received = ::recv(targetSock, buffer, maxLength, 0);
373 } while (received < 0 && errno == EINTR);
374
375 if (received > 0) {
376 if (m_isServer) {
377 m_lastServerRx = std::chrono::steady_clock::now();
378 }
379
380 // Process handshake if not complete
381 if (!m_handshakeComplete) {
382 std::size_t consumed = std::min(static_cast<std::size_t>(received),
383 kHandshakeSize - m_handshakeReceived);
384 ProcessHandshake(buffer, consumed);
385
386 // If handshake consumed all data, return 0
387 if (consumed >= static_cast<std::size_t>(received)) {
388 return 0;
389 }
390
391 // Move remaining data to start of buffer
392 std::size_t remaining = static_cast<std::size_t>(received) - consumed;
393 std::memmove(buffer, buffer + consumed, remaining);
394 return remaining;
395 }
396
397 return static_cast<std::size_t>(received);
398 } else if (received == 0) {
399 HandleConnectionLoss();
400 return 0;
401 } else {
402 if (errno == EAGAIN || errno == EWOULDBLOCK) {
403 return 0;
404 }
405 if (errno == ENOTCONN || errno == ECONNRESET) {
406 HandleConnectionLoss();
407 } else {
408 LogError("recv");
409 }
410 return 0;
411 }
412}
413
422void TcpPosixAdapter::TryFlushTxBuffer(int targetSock) {
423 uint8_t* buffer = m_txBuffer.get();
424 if (!buffer) {
425 return;
426 }
427
428 while (m_txSize > 0 && targetSock >= 0 && m_isConnected) {
429 const std::size_t contiguous = std::min(m_txSize, kTxBufferCapacity - m_txHead);
430 ssize_t sent = ::send(targetSock, buffer + m_txHead, contiguous, MSG_NOSIGNAL);
431 if (sent > 0) {
432 const std::size_t consumed = static_cast<std::size_t>(sent);
433 m_txHead = (m_txHead + consumed) % kTxBufferCapacity;
434 m_txSize -= consumed;
435 continue;
436 }
437
438 if (sent == 0) {
439 HandleConnectionLoss();
440 DropPendingTx();
441 return;
442 }
443
444 if (errno == EINTR) {
445 continue;
446 }
447
448 if (errno == EAGAIN || errno == EWOULDBLOCK) {
449 return;
450 }
451
452 if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
453 HandleConnectionLoss();
454 } else {
455 LogError("send");
456 }
457 DropPendingTx();
458 return;
459 }
460}
461
472bool TcpPosixAdapter::EnqueueTx(const uint8_t* data, std::size_t length) {
473 if (!data || length == 0) {
474 return true;
475 }
476
477 // Real-time control: reject new packets when buffer > 50% to prevent runaway buffering
478 // This avoids mid-packet corruption that would occur if we dropped the buffer during flush
479 if (m_txSize > kTxBufferCapacity / 2) {
480 LogError("tx buffer congested - rejecting new packet");
481 return false;
482 }
483
484 if (length > kTxBufferCapacity - m_txSize) {
485 LogError("tx buffer full - dropping packet");
486 return false;
487 }
488
489 const auto* src = data;
490 const std::size_t firstChunk = std::min(length, kTxBufferCapacity - m_txTail);
491 std::memcpy(m_txBuffer.get() + m_txTail, src, firstChunk);
492
493 const std::size_t remaining = length - firstChunk;
494 if (remaining > 0) {
495 std::memcpy(m_txBuffer.get(), src + firstChunk, remaining);
496 }
497
498 m_txTail = (m_txTail + length) % kTxBufferCapacity;
499 m_txSize += length;
500 return true;
501}
511bool TcpPosixAdapter::ConfigureSocket(int sock) {
512 int yes = 1;
513 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&yes, sizeof(int)) < 0) {
514 LogError("setsockopt(TCP_NODELAY)");
515 return false;
516 }
517
518 if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
519 LogError("fcntl(O_NONBLOCK)");
520 return false;
521 }
522 return true;
523}
524
532void TcpPosixAdapter::LogError(const char* message) {
533 const auto now = std::chrono::steady_clock::now();
534 if (now - m_lastErrorLog < kLogThrottle) {
535 return;
536 }
537 m_lastErrorLog = now;
538 std::cerr << "TCP adapter error: " << message << " errno=" << errno << std::endl;
539}
540
547void TcpPosixAdapter::DropPendingTx() {
548 m_txHead = 0;
549 m_txTail = 0;
550 m_txSize = 0;
551}
552
562 int targetSock = m_isServer ? m_clientSocket : m_socket;
563 if (targetSock < 0 || !m_isConnected) {
564 return false;
565 }
566
567 uint8_t handshake[kHandshakeSize];
568 // Use custom hash if set, otherwise use default
569 if (m_expectedSchemaHash != 0) {
570 if (!EncodeHandshakeWithHash(handshake, sizeof(handshake), m_expectedSchemaHash)) {
571 return false;
572 }
573 } else {
574 if (!EncodeHandshake(handshake, sizeof(handshake))) {
575 return false;
576 }
577 }
578
579 // Queue handshake for sending
580 if (!EnqueueTx(handshake, sizeof(handshake))) {
581 return false;
582 }
583
584 TryFlushTxBuffer(targetSock);
585 m_handshakeSent = true;
586 return true;
587}
588
594uint32_t TcpPosixAdapter::GetExpectedSchemaHash() const {
595 return m_expectedSchemaHash != 0 ? m_expectedSchemaHash : kSchemaHash;
596}
597
609bool TcpPosixAdapter::ProcessHandshake(const uint8_t* data, std::size_t length) {
610 // Accumulate handshake bytes
611 std::size_t toRead = std::min(length, kHandshakeSize - m_handshakeReceived);
612 std::memcpy(m_handshakeBuffer + m_handshakeReceived, data, toRead);
613 m_handshakeReceived += toRead;
614
615 if (m_handshakeReceived < kHandshakeSize) {
616 return false; // Not complete yet
617 }
618
619 // Extract and validate against expected hash
620 m_remoteSchemaHash = ExtractSchemaHash(m_handshakeBuffer, kHandshakeSize);
621 const uint32_t expected = GetExpectedSchemaHash();
622
623 if (m_remoteSchemaHash != expected) {
624 std::cerr << "TCP adapter: Schema mismatch! Local=0x" << std::hex << expected
625 << " Remote=0x" << m_remoteSchemaHash << std::dec << std::endl;
626 m_schemaValidated = false;
627 m_handshakeComplete = true;
628 return true;
629 }
630
631 m_schemaValidated = true;
632 m_handshakeComplete = true;
633
634 if (!m_handshakeSent) {
636 }
637
638 return true;
639}
640
641} // namespace bcnp
bool SendBytes(const uint8_t *data, std::size_t length) override
Sends bytes through the TCP connection.
bool SendHandshake()
Sends the V3 protocol handshake to the peer.
~TcpPosixAdapter() override
Destructor. Closes all open sockets.
std::size_t ReceiveChunk(uint8_t *buffer, std::size_t maxLength) override
Receives bytes from the TCP connection.
TcpPosixAdapter(uint16_t listenPort, const char *targetIp=nullptr, uint16_t targetPort=0)
Construct TCP adapter.
Definition tcp_posix.cpp:54
bool EncodeHandshake(uint8_t *out, std::size_t capacity)
Encode handshake with default schema hash.
bool EncodeHandshakeWithHash(uint8_t *out, std::size_t capacity, uint32_t schemaHash)
Encode handshake with custom schema hash (for testing)
constexpr std::size_t kHandshakeSize
uint32_t ExtractSchemaHash(const uint8_t *data, std::size_t length)
constexpr uint32_t kSchemaHash