Skip to content

Commit 979dfeb

Browse files
Add logic to support ranged GTID updates from proxysql_mysqlbinlog.
1 parent 831d5fd commit 979dfeb

File tree

4 files changed

+176
-45
lines changed

4 files changed

+176
-45
lines changed

include/proxysql_gtid.h

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,35 @@
22
#define PROXYSQL_GTID
33
// highly inspired by libslave
44
// https://github.com/vozbu/libslave/
5-
#include <unordered_map>
65
#include <list>
6+
#include <string>
7+
#include <unordered_map>
78
#include <utility>
89

910
typedef std::pair<std::string, int64_t> gtid_t;
10-
typedef std::pair<int64_t, int64_t> gtid_interval_t;
11+
12+
class Gtid_Interval {
13+
public:
14+
int64_t start;
15+
int64_t end;
16+
17+
public:
18+
explicit Gtid_Interval(const int64_t _start, const int64_t _end);
19+
explicit Gtid_Interval(const char* s);
20+
explicit Gtid_Interval(const std::string& s);
21+
22+
const std::string to_string(void);
23+
const bool contains(int64_t gtid);
24+
const bool append(const Gtid_Interval& other);
25+
const bool merge(const Gtid_Interval& other);
26+
27+
const int cmp(const Gtid_Interval& other);
28+
const bool operator<(const Gtid_Interval& other);
29+
const bool operator==(const Gtid_Interval& other);
30+
};
31+
typedef Gtid_Interval gtid_interval_t;
32+
33+
// TODO: make me a proper class.
1134
typedef std::unordered_map<std::string, std::list<gtid_interval_t>> gtid_set_t;
1235

1336
/*
@@ -30,4 +53,4 @@ class Gtid_Server_Info {
3053
};
3154
*/
3255

33-
#endif /* PROXYSQL_GTID */
56+
#endif /* PROXYSQL_GTID */

lib/GTID_Server_Data.cpp

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) {
220220
return false;
221221
}
222222
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
223-
if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) {
223+
if (itr->contains((int64_t)gtid_trxid)) {
224224
// fprintf(stderr,"YES\n");
225225
return true;
226226
}
@@ -375,12 +375,7 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
375375
s.insert(23,"-");
376376
s = s + ":";
377377
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
378-
std::string s2 = s;
379-
s2 = s2 + std::to_string(itr->first);
380-
s2 = s2 + "-";
381-
s2 = s2 + std::to_string(itr->second);
382-
s2 = s2 + ",";
383-
gtid_set = gtid_set + s2;
378+
gtid_set += s + itr->to_string() + ",";
384379
}
385380
}
386381
// Extract latest comma only in case 'gtid_executed' isn't empty
@@ -391,54 +386,54 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
391386
}
392387

393388

394-
395-
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
396-
auto it = gtid_executed.find(gtid.first);
397-
if (it == gtid_executed.end())
398-
{
399-
gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second);
389+
// Merges a GTID interval into a gitd_executed instance.
390+
void addGtid(const std::string& uuid, const gtid_interval_t &iv, gtid_set_t& gtid_executed) {
391+
auto it = gtid_executed.find(uuid);
392+
if (it == gtid_executed.end()) {
393+
// new UUID entry
394+
gtid_executed[uuid].emplace_back(iv);
400395
return;
401396
}
402397

403-
bool flag = true;
404-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
405-
{
406-
if (gtid.second >= itr->first && gtid.second <= itr->second)
407-
return;
408-
if (gtid.second + 1 == itr->first)
409-
{
410-
--itr->first;
411-
flag = false;
412-
break;
413-
}
414-
else if (gtid.second == itr->second + 1)
415-
{
416-
++itr->second;
417-
flag = false;
418-
break;
419-
}
420-
else if (gtid.second < itr->first)
421-
{
422-
it->second.emplace(itr, gtid.second, gtid.second);
398+
if (!it->second.empty()) {
399+
if (it->second.back().append(iv)) {
400+
// if appending to the last GTID range succeded, the interval list remains optimized - nothing else to do
423401
return;
424402
}
425403
}
426404

427-
if (flag)
428-
it->second.emplace_back(gtid.second, gtid.second);
405+
// insert/merge GTID interval...
406+
auto pos = it->second.begin();
407+
for (; pos != it->second.end(); ++pos) {
408+
if (pos->merge(iv))
409+
break;
410+
}
411+
if (pos == it->second.end()) {
412+
it->second.emplace_back(iv);
413+
}
429414

430-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
431-
{
432-
auto next_itr = std::next(itr);
433-
if (next_itr != it->second.end() && itr->second + 1 == next_itr->first)
434-
{
435-
itr->second = next_itr->second;
436-
it->second.erase(next_itr);
415+
// ...and merge overlapping GTID ranges, if any
416+
it->second.sort();
417+
auto a = it->second.begin();
418+
while (a != it->second.end()) {
419+
auto b = std::next(a);
420+
if (b == it->second.end()) {
437421
break;
438422
}
423+
if (a->merge(*b)) {
424+
it->second.erase(b);
425+
continue;
426+
}
427+
a++;
439428
}
440429
}
441430

431+
// Merges a single GTID into a gitd_executed instance.
432+
inline void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
433+
gtid_interval_t iv = Gtid_Interval(gtid.second, gtid.second);
434+
addGtid(gtid.first, iv, gtid_executed);
435+
}
436+
442437
void * GTID_syncer_run() {
443438
//struct ev_loop * gtid_ev_loop;
444439
//gtid_ev_loop = NULL;

lib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
7171
ProxySQL_Admin_Tests.oo ProxySQL_Admin_Tests2.oo ProxySQL_Admin_Scheduler.oo ProxySQL_Admin_Disk_Upgrade.oo ProxySQL_Admin_Stats.oo \
7272
Admin_Handler.oo Admin_FlushVariables.oo Admin_Bootstrap.oo \
7373
Base_Session.oo Base_Thread.oo \
74+
proxysql_gtid.oo \
7475
proxy_protocol_info.oo \
7576
proxysql_find_charset.oo ProxySQL_Poll.oo \
7677
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \

lib/proxysql_gtid.cpp

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#include <cstdint>
2+
#include <cstdio>
3+
#include <cstdlib>
4+
#include <string>
5+
6+
#include "proxysql_gtid.h"
7+
8+
9+
// Initializes a GTID interval from a range.
10+
Gtid_Interval::Gtid_Interval(const int64_t _start, const int64_t _end) {
11+
start = _start;
12+
end = _end;
13+
14+
if (start > end) {
15+
std::swap(start, end);
16+
}
17+
}
18+
19+
// Initializes a GTID interval from a string buffer, in [gtid]{-[gtid]} format.
20+
Gtid_Interval::Gtid_Interval(const char *s) {
21+
uint64_t _start = 0, _end = 0;
22+
23+
if (sscanf(s, "%lu-%lu", &_start, &_end) == 2) {
24+
start = _start;
25+
end = _end;
26+
} else if (sscanf(s, "%lu", &_start) == 1) {
27+
start = _start;
28+
end = _start;
29+
}
30+
31+
if (start > end) {
32+
std::swap(start, end);
33+
}
34+
}
35+
36+
Gtid_Interval::Gtid_Interval(const std::string& s) : Gtid_Interval(s.c_str()) {
37+
}
38+
39+
// Checks if a given GTID is contained in this interval.
40+
const bool Gtid_Interval::contains(int64_t gtid) {
41+
return (gtid >= start && gtid <= end);
42+
}
43+
44+
// Yields a string representation for a GTID interval.
45+
const std::string Gtid_Interval::to_string(void) {
46+
if (start == end) {
47+
return std::to_string(start);
48+
}
49+
return std::to_string(start) + "-" + std::to_string(end);
50+
}
51+
52+
// Attempts to append a new interval to this interval's end. Returns true if the append succeded, false otherwise.
53+
const bool Gtid_Interval::append(const Gtid_Interval& other) {
54+
if (other.end >= end && other.start <= (end+1)) {
55+
// other overlaps interval at end
56+
end = other.end;
57+
return true;
58+
}
59+
60+
return false;
61+
}
62+
63+
// Attempts to merge two GTID intervals. Returns true if the intervals were merged (and potentially modified), false otherwise.
64+
const bool Gtid_Interval::merge(const Gtid_Interval& other) {
65+
if (other.start >= start && other.end <= end) {
66+
// other is contained by interval
67+
return true;
68+
}
69+
if (other.start <= start && other.end >= end) {
70+
// other contains whole of existing interval
71+
start = other.start;
72+
end = other.end;
73+
return true;
74+
}
75+
if (other.start <= start && other.end >= (start-1)) {
76+
// other overlaps interval at start
77+
start = other.start;
78+
return true;
79+
}
80+
if (other.end >= end && other.start <= (end+1)) {
81+
// other overlaps interval at end
82+
end = other.end;
83+
return true;
84+
}
85+
86+
return false;
87+
}
88+
89+
// Compares two GTID intervals, by strict weak ordering.
90+
const int Gtid_Interval::cmp(const Gtid_Interval& other) {
91+
if (start < other.start) {
92+
return -1;
93+
}
94+
if (start > other.start) {
95+
return 1;
96+
}
97+
if (end < other.end) {
98+
return -1;
99+
}
100+
if (end > other.end) {
101+
return 1;
102+
}
103+
return 0;
104+
}
105+
106+
const bool Gtid_Interval::operator<(const Gtid_Interval& other) {
107+
return cmp(other) == -1;
108+
}
109+
110+
const bool Gtid_Interval::operator==(const Gtid_Interval& other) {
111+
return cmp(other) == 0;
112+
}

0 commit comments

Comments
 (0)