24#include <netinet/tcp.h>
25#include <sys/socket.h>
32constexpr auto kReconnectInterval = std::chrono::milliseconds(500);
35constexpr auto kLogThrottle = std::chrono::seconds(1);
55 m_txBuffer = std::make_unique<uint8_t[]>(kTxBufferCapacity);
59 if (!CreateBaseSocket()) {
63 sockaddr_in bindAddr{};
64 bindAddr.sin_family = AF_INET;
65 bindAddr.sin_port = htons(listenPort);
66 bindAddr.sin_addr.s_addr = INADDR_ANY;
68 if (bind(m_socket,
reinterpret_cast<sockaddr*
>(&bindAddr),
sizeof(bindAddr)) < 0) {
75 if (listen(m_socket, 1) < 0) {
81 }
else if (targetIp && targetPort > 0) {
83 sockaddr_in targetAddr{};
84 targetAddr.sin_family = AF_INET;
85 targetAddr.sin_port = htons(targetPort);
86 if (inet_pton(AF_INET, targetIp, &targetAddr.sin_addr) <= 0) {
87 LogError(
"inet_pton (invalid target IP)");
91 m_peerAddr = targetAddr;
92 m_peerAddrValid =
true;
93 BeginClientConnect(
true);
101 if (m_clientSocket >= 0) {
102 ::close(m_clientSocket);
117bool TcpPosixAdapter::CreateBaseSocket() {
123 m_socket = ::socket(AF_INET, SOCK_STREAM, 0);
130 if (setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
131 LogError(
"setsockopt(SO_REUSEADDR)");
134 if (!ConfigureSocket(m_socket)) {
139 m_isConnected =
false;
140 m_connectInProgress =
false;
152void TcpPosixAdapter::BeginClientConnect(
bool forceImmediate) {
153 if (m_isServer || !m_peerAddrValid) {
157 const auto now = std::chrono::steady_clock::now();
158 if (!forceImmediate && now < m_nextReconnectAttempt) {
161 m_nextReconnectAttempt = now + kReconnectInterval;
163 if (!CreateBaseSocket()) {
167 if (connect(m_socket,
reinterpret_cast<sockaddr*
>(&m_peerAddr),
sizeof(m_peerAddr)) < 0) {
168 if (errno == EINPROGRESS || errno == EALREADY) {
169 m_connectInProgress =
true;
175 m_connectInProgress =
false;
176 m_isConnected =
false;
181 m_isConnected =
true;
182 m_connectInProgress =
false;
196void TcpPosixAdapter::PollConnection() {
202 if (m_clientSocket >= 0) {
204 const auto now = std::chrono::steady_clock::now();
205 if (m_lastServerRx != std::chrono::steady_clock::time_point{} &&
206 now - m_lastServerRx > m_serverClientTimeout) {
207 ::close(m_clientSocket);
209 m_isConnected =
false;
213 m_isConnected =
true;
217 sockaddr_in clientAddr{};
218 socklen_t len =
sizeof(clientAddr);
219 int clientSock = ::accept(m_socket,
reinterpret_cast<sockaddr*
>(&clientAddr), &len);
220 if (clientSock >= 0) {
221 if (!ConfigureSocket(clientSock)) {
225 m_clientSocket = clientSock;
226 m_isConnected =
true;
227 m_lastServerRx = std::chrono::steady_clock::now();
228 TryFlushTxBuffer(m_clientSocket);
229 }
else if (errno != EAGAIN && errno != EWOULDBLOCK) {
236 BeginClientConnect(
false);
240 if (!m_isConnected && m_connectInProgress) {
242 socklen_t errLen =
sizeof(err);
243 if (getsockopt(m_socket, SOL_SOCKET, SO_ERROR, &err, &errLen) < 0) {
244 LogError(
"getsockopt(SO_ERROR)");
249 m_isConnected =
true;
250 m_connectInProgress =
false;
251 TryFlushTxBuffer(m_socket);
255 if (err == EINPROGRESS || err == EALREADY) {
260 LogError(
"connect (async)");
263 m_connectInProgress =
false;
264 BeginClientConnect(
false);
268 if (!m_isConnected) {
269 BeginClientConnect(
false);
279void TcpPosixAdapter::HandleConnectionLoss() {
280 m_isConnected =
false;
281 m_handshakeComplete =
false;
282 m_handshakeSent =
false;
283 m_schemaValidated =
false;
284 m_handshakeReceived = 0;
285 m_remoteSchemaHash = 0;
289 if (m_clientSocket >= 0) {
290 ::close(m_clientSocket);
302 m_connectInProgress =
false;
303 BeginClientConnect(
true);
317 if (!data || length == 0) {
323 int targetSock = m_isServer ? m_clientSocket : m_socket;
324 if (targetSock < 0 || !m_isConnected) {
328 if (length > kTxBufferCapacity) {
329 LogError(
"send payload exceeds tx buffer capacity");
333 if (!EnqueueTx(data, length)) {
337 TryFlushTxBuffer(targetSock);
352 if (m_socket < 0 || !buffer || maxLength == 0) {
358 int targetSock = m_isServer ? m_clientSocket : m_socket;
359 if (targetSock < 0 || !m_isConnected) {
363 TryFlushTxBuffer(targetSock);
366 if (!m_handshakeSent) {
372 received = ::recv(targetSock, buffer, maxLength, 0);
373 }
while (received < 0 && errno == EINTR);
377 m_lastServerRx = std::chrono::steady_clock::now();
381 if (!m_handshakeComplete) {
382 std::size_t consumed = std::min(
static_cast<std::size_t
>(received),
384 ProcessHandshake(buffer, consumed);
387 if (consumed >=
static_cast<std::size_t
>(received)) {
392 std::size_t remaining =
static_cast<std::size_t
>(received) - consumed;
393 std::memmove(buffer, buffer + consumed, remaining);
397 return static_cast<std::size_t
>(received);
398 }
else if (received == 0) {
399 HandleConnectionLoss();
402 if (errno == EAGAIN || errno == EWOULDBLOCK) {
405 if (errno == ENOTCONN || errno == ECONNRESET) {
406 HandleConnectionLoss();
422void TcpPosixAdapter::TryFlushTxBuffer(
int targetSock) {
423 uint8_t* buffer = m_txBuffer.get();
428 while (m_txSize > 0 && targetSock >= 0 && m_isConnected) {
429 const std::size_t contiguous = std::min(m_txSize, kTxBufferCapacity - m_txHead);
430 ssize_t sent = ::send(targetSock, buffer + m_txHead, contiguous, MSG_NOSIGNAL);
432 const std::size_t consumed =
static_cast<std::size_t
>(sent);
433 m_txHead = (m_txHead + consumed) % kTxBufferCapacity;
434 m_txSize -= consumed;
439 HandleConnectionLoss();
444 if (errno == EINTR) {
448 if (errno == EAGAIN || errno == EWOULDBLOCK) {
452 if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
453 HandleConnectionLoss();
472bool TcpPosixAdapter::EnqueueTx(
const uint8_t* data, std::size_t length) {
473 if (!data || length == 0) {
479 if (m_txSize > kTxBufferCapacity / 2) {
480 LogError(
"tx buffer congested - rejecting new packet");
484 if (length > kTxBufferCapacity - m_txSize) {
485 LogError(
"tx buffer full - dropping packet");
489 const auto* src = data;
490 const std::size_t firstChunk = std::min(length, kTxBufferCapacity - m_txTail);
491 std::memcpy(m_txBuffer.get() + m_txTail, src, firstChunk);
493 const std::size_t remaining = length - firstChunk;
495 std::memcpy(m_txBuffer.get(), src + firstChunk, remaining);
498 m_txTail = (m_txTail + length) % kTxBufferCapacity;
511bool TcpPosixAdapter::ConfigureSocket(
int sock) {
513 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (
char*)&yes,
sizeof(
int)) < 0) {
514 LogError(
"setsockopt(TCP_NODELAY)");
518 if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
519 LogError(
"fcntl(O_NONBLOCK)");
532void TcpPosixAdapter::LogError(
const char* message) {
533 const auto now = std::chrono::steady_clock::now();
534 if (now - m_lastErrorLog < kLogThrottle) {
537 m_lastErrorLog = now;
538 std::cerr <<
"TCP adapter error: " << message <<
" errno=" << errno << std::endl;
547void TcpPosixAdapter::DropPendingTx() {
562 int targetSock = m_isServer ? m_clientSocket : m_socket;
563 if (targetSock < 0 || !m_isConnected) {
569 if (m_expectedSchemaHash != 0) {
580 if (!EnqueueTx(handshake,
sizeof(handshake))) {
584 TryFlushTxBuffer(targetSock);
585 m_handshakeSent =
true;
594uint32_t TcpPosixAdapter::GetExpectedSchemaHash()
const {
595 return m_expectedSchemaHash != 0 ? m_expectedSchemaHash :
kSchemaHash;
609bool TcpPosixAdapter::ProcessHandshake(
const uint8_t* data, std::size_t length) {
611 std::size_t toRead = std::min(length,
kHandshakeSize - m_handshakeReceived);
612 std::memcpy(m_handshakeBuffer + m_handshakeReceived, data, toRead);
613 m_handshakeReceived += toRead;
621 const uint32_t expected = GetExpectedSchemaHash();
623 if (m_remoteSchemaHash != expected) {
624 std::cerr <<
"TCP adapter: Schema mismatch! Local=0x" << std::hex << expected
625 <<
" Remote=0x" << m_remoteSchemaHash << std::dec << std::endl;
626 m_schemaValidated =
false;
627 m_handshakeComplete =
true;
631 m_schemaValidated =
true;
632 m_handshakeComplete =
true;
634 if (!m_handshakeSent) {
bool SendBytes(const uint8_t *data, std::size_t length) override
Sends bytes through the TCP connection.
bool SendHandshake()
Sends the V3 protocol handshake to the peer.
~TcpPosixAdapter() override
Destructor. Closes all open sockets.
std::size_t ReceiveChunk(uint8_t *buffer, std::size_t maxLength) override
Receives bytes from the TCP connection.
TcpPosixAdapter(uint16_t listenPort, const char *targetIp=nullptr, uint16_t targetPort=0)
Construct TCP adapter.
bool EncodeHandshake(uint8_t *out, std::size_t capacity)
Encode handshake with default schema hash.
bool EncodeHandshakeWithHash(uint8_t *out, std::size_t capacity, uint32_t schemaHash)
Encode handshake with custom schema hash (for testing)
constexpr std::size_t kHandshakeSize
uint32_t ExtractSchemaHash(const uint8_t *data, std::size_t length)
constexpr uint32_t kSchemaHash