Skip to content

Commit 467efeb

Browse files
CuriousGeorgiydrewdzzz
authored andcommitted
client: encapsulate free functions on ConnectionImpl and Connection
Currently, we have a bunch of free functions on ConnectionImpl and Connection, such as `hasSentBytes`, `hasNotRecvBytes`, `hasDataToSend`, `hasDataToDecode`, etc. All of them have a `ConnectionImpl` or `Connection` as an argument. This is an anti-pattern, and these functions need to be encapsulated in the corresponding classes. Closes #152
1 parent 2547fe0 commit 467efeb

File tree

4 files changed

+146
-135
lines changed

4 files changed

+146
-135
lines changed

src/Client/Connection.hpp

Lines changed: 131 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ struct ConnectionImpl
9494
void prepare_auth(std::string_view user, std::string_view passwd);
9595
void commit_auth(std::string_view user, std::string_view passwd);
9696

97+
void hasSentBytes(size_t bytes);
98+
void hasNotRecvBytes(size_t bytes);
99+
bool hasDataToSend();
100+
bool hasDataToDecode();
101+
102+
DecodeStatus processResponse(int req_sync, Response<BUFFER> *result);
103+
int decodeGreeting();
104+
105+
void inputBufGC();
106+
97107
Connector<BUFFER, NetProvider> &connector;
98108
BUFFER inBuf;
99109
static constexpr size_t GC_STEP_CNT = 100;
@@ -200,6 +210,123 @@ ConnectionImpl<BUFFER, NetProvider>::commit_auth(std::string_view user, std::str
200210
connector.readyToSend(this);
201211
}
202212

213+
template <class BUFFER, class NetProvider>
214+
void
215+
ConnectionImpl<BUFFER, NetProvider>::hasSentBytes(size_t bytes)
216+
{
217+
// dropBack()/dropFront() interfaces require number of bytes be greater
218+
// than zero so let's check it first.
219+
if (bytes > 0)
220+
getOutBuf().dropFront(bytes);
221+
}
222+
223+
template <class BUFFER, class NetProvider>
224+
void
225+
ConnectionImpl<BUFFER, NetProvider>::hasNotRecvBytes(size_t bytes)
226+
{
227+
if (bytes > 0)
228+
getInBuf().dropBack(bytes);
229+
}
230+
231+
template <class BUFFER, class NetProvider>
232+
bool
233+
ConnectionImpl<BUFFER, NetProvider>::hasDataToSend()
234+
{
235+
// We drop content of input buffer once it has been sent. So to detect
236+
// if there's any data to send it's enough to check buffer's emptiness.
237+
return !getOutBuf().empty();
238+
}
239+
240+
template <class BUFFER, class NetProvider>
241+
bool
242+
ConnectionImpl<BUFFER, NetProvider>::hasDataToDecode()
243+
{
244+
assert(endDecoded < getInBuf().end() || endDecoded == getInBuf().end());
245+
return endDecoded != getInBuf().end();
246+
}
247+
248+
template <class BUFFER, class NetProvider>
249+
void
250+
ConnectionImpl<BUFFER, NetProvider>::inputBufGC()
251+
{
252+
if (gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
253+
TNT_LOG_DEBUG("Flushed input buffer of the connection %p", this);
254+
getInBuf().flush();
255+
}
256+
}
257+
258+
template <class BUFFER, class NetProvider>
259+
DecodeStatus
260+
ConnectionImpl<BUFFER, NetProvider>::processResponse(int req_sync, Response<BUFFER> *result)
261+
{
262+
// Decode response. In case of success - fill in feature map
263+
// and adjust end-of-decoded data pointer. Call GC if needed.
264+
if (!getInBuf().has(endDecoded, MP_RESPONSE_SIZE))
265+
return DECODE_NEEDMORE;
266+
267+
Response<BUFFER> response;
268+
response.size = dec.decodeResponseSize();
269+
if (response.size < 0) {
270+
TNT_LOG_ERROR("Failed to decode response size");
271+
// In case of corrupted response size all other data in the buffer
272+
// is likely to be decoded in the wrong way (since we don't
273+
// know how much bytes should be skipped). So let's simply
274+
// terminate here.
275+
std::abort();
276+
}
277+
response.size += MP_RESPONSE_SIZE;
278+
if (!getInBuf().has(endDecoded, response.size)) {
279+
// Response was received only partially. Reset decoder position
280+
// to the start of response to make this function re-entered.
281+
dec.reset(endDecoded);
282+
return DECODE_NEEDMORE;
283+
}
284+
if (dec.decodeResponse(response) != 0) {
285+
setError("Failed to decode response, skipping bytes..");
286+
endDecoded += response.size;
287+
return DECODE_ERR;
288+
}
289+
TNT_LOG_DEBUG("Header: sync=", response.header.sync, ", code=", response.header.code,
290+
", schema=", response.header.schema_id);
291+
if (result != nullptr && response.header.sync == req_sync) {
292+
*result = std::move(response);
293+
} else {
294+
futures.insert({response.header.sync, std::move(response)});
295+
}
296+
endDecoded += response.size;
297+
inputBufGC();
298+
return DECODE_SUCC;
299+
}
300+
301+
template <class BUFFER, class NetProvider>
302+
int
303+
ConnectionImpl<BUFFER, NetProvider>::decodeGreeting()
304+
{
305+
// TODO: that's not zero-copy, should be rewritten in that pattern.
306+
assert(getInBuf().has(endDecoded, Iproto::GREETING_SIZE));
307+
char greeting_buf[Iproto::GREETING_SIZE];
308+
endDecoded.read({greeting_buf, sizeof(greeting_buf)});
309+
dec.reset(endDecoded);
310+
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, greeting) != 0)
311+
return -1;
312+
is_greeting_received = true;
313+
TNT_LOG_DEBUG("Version: ", greeting.version_id);
314+
315+
#ifndef NDEBUG
316+
// print salt in hex format.
317+
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
318+
const char *hex = "0123456789abcdef";
319+
for (size_t i = 0; i < greeting.salt_size; i++) {
320+
uint8_t u = greeting.salt[i];
321+
hex_salt[i * 2] = hex[u / 16];
322+
hex_salt[i * 2 + 1] = hex[u % 16];
323+
}
324+
hex_salt[greeting.salt_size * 2] = 0;
325+
TNT_LOG_DEBUG("Salt: ", hex_salt);
326+
#endif
327+
return 0;
328+
}
329+
203330
/** Each connection is supposed to be bound to a single socket. */
204331
template<class BUFFER, class NetProvider>
205332
class Connection
@@ -241,6 +368,8 @@ class Connection
241368
void flush();
242369
size_t getFutureCount() const;
243370

371+
bool hasDataToDecode();
372+
244373
template <class T>
245374
rid_t call(std::string_view func, const T &args);
246375
rid_t ping();
@@ -515,129 +644,11 @@ Connection<BUFFER, NetProvider>::getOutBuf()
515644
return impl->getOutBuf();
516645
}
517646

518-
template <class BUFFER, class NetProvider>
519-
void
520-
hasSentBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
521-
{
522-
//dropBack()/dropFront() interfaces require number of bytes be greater
523-
//than zero so let's check it first.
524-
if (bytes > 0)
525-
conn->getOutBuf().dropFront(bytes);
526-
}
527-
528-
template <class BUFFER, class NetProvider>
529-
void
530-
hasNotRecvBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
531-
{
532-
if (bytes > 0)
533-
conn->getInBuf().dropBack(bytes);
534-
}
535-
536-
template <class BUFFER, class NetProvider>
537-
bool
538-
hasDataToSend(ConnectionImpl<BUFFER, NetProvider> *conn)
539-
{
540-
//We drop content of input buffer once it has been sent. So to detect
541-
//if there's any data to send it's enough to check buffer's emptiness.
542-
return !conn->getOutBuf().empty();
543-
}
544-
545647
template <class BUFFER, class NetProvider>
546648
bool
547-
hasDataToDecode(Connection<BUFFER, NetProvider> &conn)
548-
{
549-
return hasDataToDecode(conn.getImpl());
550-
}
551-
552-
template <class BUFFER, class NetProvider>
553-
bool
554-
hasDataToDecode(ConnectionImpl<BUFFER, NetProvider> *conn)
555-
{
556-
assert(conn->endDecoded < conn->getInBuf().end() || conn->endDecoded == conn->getInBuf().end());
557-
return conn->endDecoded != conn->getInBuf().end();
558-
}
559-
560-
template <class BUFFER, class NetProvider>
561-
static void
562-
inputBufGC(ConnectionImpl<BUFFER, NetProvider> *conn)
649+
Connection<BUFFER, NetProvider>::hasDataToDecode()
563650
{
564-
if (conn->gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
565-
TNT_LOG_DEBUG("Flushed input buffer of the connection %p", conn);
566-
conn->getInBuf().flush();
567-
}
568-
}
569-
570-
template <class BUFFER, class NetProvider>
571-
DecodeStatus
572-
processResponse(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync, Response<BUFFER> *result)
573-
{
574-
//Decode response. In case of success - fill in feature map
575-
//and adjust end-of-decoded data pointer. Call GC if needed.
576-
if (!conn->getInBuf().has(conn->endDecoded, MP_RESPONSE_SIZE))
577-
return DECODE_NEEDMORE;
578-
579-
Response<BUFFER> response;
580-
response.size = conn->dec.decodeResponseSize();
581-
if (response.size < 0) {
582-
TNT_LOG_ERROR("Failed to decode response size");
583-
//In case of corrupted response size all other data in the buffer
584-
//is likely to be decoded in the wrong way (since we don't
585-
// know how much bytes should be skipped). So let's simply
586-
//terminate here.
587-
std::abort();
588-
589-
}
590-
response.size += MP_RESPONSE_SIZE;
591-
if (!conn->getInBuf().has(conn->endDecoded, response.size)) {
592-
//Response was received only partially. Reset decoder position
593-
//to the start of response to make this function re-entered.
594-
conn->dec.reset(conn->endDecoded);
595-
return DECODE_NEEDMORE;
596-
}
597-
if (conn->dec.decodeResponse(response) != 0) {
598-
conn->setError("Failed to decode response, skipping bytes..");
599-
conn->endDecoded += response.size;
600-
return DECODE_ERR;
601-
}
602-
TNT_LOG_DEBUG("Header: sync=", response.header.sync, ", code=", response.header.code,
603-
", schema=", response.header.schema_id);
604-
if (result != nullptr && response.header.sync == req_sync) {
605-
*result = std::move(response);
606-
} else {
607-
conn->futures.insert({response.header.sync, std::move(response)});
608-
}
609-
conn->endDecoded += response.size;
610-
inputBufGC(conn);
611-
return DECODE_SUCC;
612-
}
613-
614-
template <class BUFFER, class NetProvider>
615-
int
616-
decodeGreeting(ConnectionImpl<BUFFER, NetProvider> *conn)
617-
{
618-
//TODO: that's not zero-copy, should be rewritten in that pattern.
619-
assert(conn->getInBuf().has(conn->endDecoded, Iproto::GREETING_SIZE));
620-
char greeting_buf[Iproto::GREETING_SIZE];
621-
conn->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
622-
conn->dec.reset(conn->endDecoded);
623-
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, conn->greeting) != 0)
624-
return -1;
625-
conn->is_greeting_received = true;
626-
TNT_LOG_DEBUG("Version: ", conn->greeting.version_id);
627-
628-
#ifndef NDEBUG
629-
//print salt in hex format.
630-
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
631-
const char *hex = "0123456789abcdef";
632-
for (size_t i = 0; i < conn->greeting.salt_size; i++) {
633-
uint8_t u = conn->greeting.salt[i];
634-
hex_salt[i * 2] = hex[u / 16];
635-
hex_salt[i * 2 + 1] = hex[u % 16];
636-
}
637-
hex_salt[conn->greeting.salt_size * 2] = 0;
638-
TNT_LOG_DEBUG("Salt: ", hex_salt);
639-
#endif
640-
return 0;
651+
return hasDataToDecode(getImpl());
641652
}
642653

643654
////////////////////////////BOX-like interface functions////////////////////////

src/Client/Connector.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,15 +204,15 @@ int
204204
Connector<BUFFER, NetProvider>::connectionDecodeResponses(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync,
205205
Response<BUFFER> *result)
206206
{
207-
if (!hasDataToDecode(conn))
207+
if (!conn->hasDataToDecode())
208208
return 0;
209209

210210
/* Ready to decode connection must be in the corresponding set. */
211211
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
212212

213213
int rc = 0;
214-
while (hasDataToDecode(conn)) {
215-
DecodeStatus status = processResponse(conn, req_sync, result);
214+
while (conn->hasDataToDecode()) {
215+
DecodeStatus status = conn->processResponse(req_sync, result);
216216
if (status == DECODE_ERR) {
217217
rc = -1;
218218
break;
@@ -227,7 +227,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(ConnectionImpl<BUFFER,
227227
assert(status == DECODE_SUCC);
228228
}
229229
/* A connection that has no data to decode must not be left in the set. */
230-
if (!hasDataToDecode(conn))
230+
if (!conn->hasDataToDecode())
231231
m_ReadyToDecode.erase(conn);
232232
return rc;
233233
}
@@ -393,7 +393,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
393393
return std::nullopt;
394394
}
395395
auto *conn = *m_ReadyToDecode.begin();
396-
assert(hasDataToDecode(conn));
396+
assert(conn->hasDataToDecode());
397397
if (connectionDecodeResponses(conn) != 0)
398398
return std::nullopt;
399399
return conn;

src/Client/EpollNetProvider.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ EpollNetProvider<BUFFER, Stream>::recv(ConnImpl_t *conn)
184184
size_t iov_cnt = buf.getIOV(itr, iov, IOVEC_MAX_SIZE);
185185

186186
ssize_t rcvd = conn->get_strm().recv(iov, iov_cnt);
187-
hasNotRecvBytes(conn, CONN_READAHEAD - (rcvd < 0 ? 0 : rcvd));
187+
conn->hasNotRecvBytes(CONN_READAHEAD - (rcvd < 0 ? 0 : rcvd));
188188
if (rcvd < 0) {
189189
conn->setError(std::string("Failed to receive response: ") + strerror(errno), errno);
190190
return -1;
@@ -201,7 +201,7 @@ EpollNetProvider<BUFFER, Stream>::recv(ConnImpl_t *conn)
201201
return 0;
202202
/* Receive and decode greetings. */
203203
TNT_LOG_DEBUG("Greetings are received, read bytes ", rcvd);
204-
if (decodeGreeting(conn) != 0) {
204+
if (conn->decodeGreeting() != 0) {
205205
conn->setError("Failed to decode greetings");
206206
return -1;
207207
}
@@ -220,7 +220,7 @@ template <class BUFFER, class Stream>
220220
int
221221
EpollNetProvider<BUFFER, Stream>::send(ConnImpl_t *conn)
222222
{
223-
while (hasDataToSend(conn)) {
223+
while (conn->hasDataToSend()) {
224224
struct iovec iov[IOVEC_MAX_SIZE];
225225
auto &buf = conn->getOutBuf();
226226
size_t iov_cnt = buf.getIOV(buf.template begin<true>(),
@@ -236,7 +236,7 @@ EpollNetProvider<BUFFER, Stream>::send(ConnImpl_t *conn)
236236
setPollSetting(conn, EPOLLIN | EPOLLOUT);
237237
return 1;
238238
} else {
239-
hasSentBytes(conn, sent);
239+
conn->hasSentBytes(sent);
240240
}
241241
}
242242
/* All data from connection has been successfully written. */
@@ -284,7 +284,7 @@ EpollNetProvider<BUFFER, Stream>::wait(int timeout)
284284
int rc = recv(conn);
285285
if (rc < 0)
286286
return -1;
287-
if (hasDataToDecode(conn))
287+
if (conn->hasDataToDecode())
288288
m_Connector.readyToDecode(conn);
289289
}
290290

0 commit comments

Comments
 (0)