Skip to content

Commit 0d39bcc

Browse files
authored
Merge pull request #5 from zorroblue/eventual-consistency
Eventual consistency
2 parents dc7265e + 09db4f8 commit 0d39bcc

File tree

10 files changed

+439
-160
lines changed

10 files changed

+439
-160
lines changed

clean.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rm *.pyc
2+
rm *.log

crawler.py

Lines changed: 69 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from concurrent import futures
22
import time
33
import math
4+
import thread
45

56
from argparse import ArgumentParser
67
import argparse
@@ -20,6 +21,7 @@
2021
from bson import BSON
2122

2223
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
24+
MAX_RETRIES = 3
2325

2426

2527
def build_parser():
@@ -33,6 +35,11 @@ def build_parser():
3335
default='localhost:50052',
3436
help='backup IP address',
3537
required=False)
38+
parser.add_argument('--port',
39+
dest='port',
40+
default='50060',
41+
help='Port',
42+
required=False)
3643
choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
3744
parser.add_argument('--logging',
3845
dest='logging_level', help='Logging level',
@@ -42,136 +49,82 @@ def build_parser():
4249
return parser
4350

4451

45-
def write_to_master_backup(master, backup, logging_level, lower=51, higher=70):
46-
# initialize logger
47-
logger = init_logger('crawler', logging_level)
48-
# generate data
49-
indices_list = generate_indices('pending', lower, higher)
50-
51-
# initiate 2 phase commit protocol
52-
53-
# PHASE 1
54-
logger.debug("Initiate 2 phase commit protocol")
55-
print "Phase 1: Prepare"
56-
logger.debug("Starting Phase 1: Prepare")
57-
58-
master_vote = None
59-
backup_vote = None
60-
61-
master_channel = grpc.insecure_channel(master)
62-
master_stub = search_pb2_grpc.DatabaseWriteStub(master_channel)
63-
64-
65-
66-
backup_channel = grpc.insecure_channel(backup)
67-
backup_stub = search_pb2_grpc.DatabaseWriteStub(backup_channel)
68-
69-
try:
70-
indices = json.dumps(indices_list)
71-
request = search_pb2.CommitRequest(data=indices)
72-
logger.info("Send COMMIT_REQUEST to master")
73-
master_vote = master_stub.QueryToCommit(request)
74-
print "Master Status: ", master_vote.status
75-
if master_vote.status == 1:
76-
logger.info("Received AGREED from master")
77-
else:
78-
logger.info("Received ABORT from master")
79-
except Exception as e:
80-
print str(e)
81-
print e.code()
82-
logger.error("Master not reachable due to "+ str(e.code()))
52+
class Crawler(object):
53+
def __init__(self, master, backup, logging_level, data=None):
54+
# initialize logger
55+
self.logger = init_logger('crawler', logging_level)
56+
self.master = master
57+
self.backup = backup
58+
self.data = data
59+
# TODO: add sync between backup and crawler
60+
61+
def MasterChange(self, request, context):
62+
self.master = self.backup
63+
self.logger.info("Changed master ip to "+ self.master)
64+
return search_pb2.Acknowledgement(status=1)
65+
66+
def write_to_master(self):
67+
if self.data is None:
68+
self.data = generate_indices('pending', 71, 75)
69+
70+
logger = self.logger
71+
# send to master
72+
print "Master is ", self.master
73+
master_channel = grpc.insecure_channel(self.master)
74+
master_stub = search_pb2_grpc.DatabaseWriteStub(master_channel)
75+
logger.info("Sending data to master")
76+
#try:
77+
request = search_pb2.CommitRequest(data=json.dumps(self.data))
78+
response = master_stub.WriteIndicesToTable(request)
79+
logger.info("Operation success")
80+
print "Done"
81+
8382

84-
try:
85-
indices = json.dumps(indices_list)
86-
request = search_pb2.CommitRequest(data=indices)
87-
logger.info("Send COMMIT_REQUEST to backup")
88-
backup_vote = backup_stub.QueryToCommit(request)
89-
print "Backup Status: ", backup_vote.status
90-
if backup_vote.status == 1:
91-
logger.info("Received AGREED from backup")
92-
else:
93-
logger.info("Received ABORT from backup")
94-
except Exception as e:
95-
print str(e)
96-
print e.code()
97-
logger.error("Backup not reachable due to "+ str(e.code()))
98-
99-
# PHASE 2
100-
print "Phase 2: Prepare"
101-
logger.debug("Starting Phase 2: Commit")
102-
103-
if master_vote == None or backup_vote == None or backup_vote.status == 0 or master_vote.status == 0:
104-
try:
105-
request = search_pb2.CommitStatusUpdate(code=search_pb2.ROLL_BACK)
83+
def pushWrite(crawler):
84+
while True:
85+
query = raw_input("Do you want to push the write(Y/N): ")
86+
query = query.strip()
87+
if query == 'N' or query == 'No':
88+
break
89+
elif query == 'Y' or query == 'Yes':
10690
try:
107-
logger.info("Sending ROLL_BACK to master")
108-
master_ack = master_stub.CommitPhase(request)
91+
crawler.write_to_master()
10992
except Exception as e:
11093
print str(e)
111-
logger.info("Master not able to receive ROLL_BACK due to "+ str(e.code()))
112-
113-
try:
114-
logger.info("Sending ROLL_BACK to backup")
115-
backup_ack = backup_stub.CommitPhase(request)
116-
except Exception as e:
117-
print str(e)
118-
logger.info("backup not able to receive ROLL_BACK due to "+ str(e.code()))
119-
120-
except Exception as e:
121-
print e.code()
122-
logger.info("Rolled back transaction")
123-
return False
124-
125-
# Commit Phase
126-
request = search_pb2.CommitStatusUpdate(code=search_pb2.COMMIT)
127-
master_ack_received = False
128-
backup_ack_received = False
129-
retries = 0
130-
131-
while (not master_ack_received or not backup_ack_received) and retries < 3:
132-
if retries > 0:
133-
logger.info("Retrying")
134-
if not master_ack_received:
135-
try:
136-
logger.info("Sending COMMIT to master")
137-
master_ack = master_stub.CommitPhase(request)
138-
master_ack_received = True
139-
except Exception as e:
140-
logger.info("Master failed to receive due to "+ str(e.code()))
141-
if not backup_ack_received:
142-
try:
143-
logger.info("Sending COMMIT to backup")
144-
backup_ack = backup_stub.CommitPhase(request)
145-
backup_ack_received = True
146-
except Exception as e:
147-
logger.info("Backup failed to receive due to "+ str(e.code()))
148-
retries+=1
149-
print master_ack_received, backup_ack_received
150-
15194

152-
if retries < 3:
153-
logger.info("COMMIT")
154-
else:
155-
logger.info("FAILED")
156-
# TODO rollback
157-
# not doing it now to progress further in the work
158-
# PS if one of them fails, they will resync with the crawler. The crawler is aware of this as COMMIT isn't written till then.
15995

160-
print "Phase 2:"
161-
print "Master status", master_ack.status
162-
print "backup status", backup_ack.status
163-
return master_ack.status == 1 and backup_ack.status == 1
96+
def run(master, backup, logging_level, port, data=None):
97+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
98+
# add write service to backup server to handle database updates from crawler
99+
crawler = Crawler(master, backup, logging_level, data)
100+
search_pb2_grpc.add_LeaderNoticeServicer_to_server(crawler, server)
101+
server.add_insecure_port('[::]:'+ port)
102+
print "Started crawler"
103+
crawler.logger.info("Starting server")
104+
# set up query for writes
105+
try:
106+
thread.start_new_thread(pushWrite, (crawler,))
107+
except Exception as e:
108+
print str(e)
109+
crawler.logger.error("Cannot start new thread due to " + str(e))
164110

111+
server.start()
112+
try:
113+
while True:
114+
time.sleep(_ONE_DAY_IN_SECONDS)
115+
except KeyboardInterrupt:
116+
crawler.logger.info("Shutting down server")
117+
logging.shutdown()
118+
server.stop(0)
165119

166120
def main():
167121
parser = build_parser()
168122
options = parser.parse_args()
169123
master = options.master
170124
backup = options.backup
125+
port = options.port
171126
logging_level = parse_level(options.logging_level)
172-
status = write_to_master_backup(master=master, backup=backup, logging_level=logging_level)
173-
if status:
174-
print "Write successful"
127+
run(master, backup, logging_level, port, data=None)
175128

176129
if __name__ == '__main__':
177130
main()

environment.sh

100644100755
File mode changed.

0 commit comments

Comments
 (0)