Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 34 additions & 65 deletions lib/resty/rediscluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ local _M = {}
local mt = { __index = _M }

local slot_cache = {}
local master_nodes = {}

local cmds_for_all_master = {
["flushall"] = true,
Expand Down Expand Up @@ -147,21 +146,25 @@ local function try_hosts_slots(self, serv_list)
local servers = { serv_list = {} }
for n = 1, #slots_info do
local sub_info = slots_info[n]
--slot info item 1 and 2 are the subrange start end slots
-- slot info item 1 and 2 are the subrange start end slots
local start_slot, end_slot = sub_info[1], sub_info[2]

-- generate new list of servers
local list = { serv_list = {} }
--from 3, here lists the host/port/nodeid of in charge nodes
for j = 3, #sub_info do
servers.serv_list[#servers.serv_list + 1 ] = { ip = sub_info[j][1], port = sub_info[j][2] }
table.insert(list.serv_list,{
ip = sub_info[j][1],
port = sub_info[j][2],
slave = (j > 3) -- first node in the list is the master
})
end

for slot = start_slot, end_slot do
local list = { serv_list = {} }
--from 3, here lists the host/port/nodeid of in charge nodes
for j = 3, #sub_info do
list.serv_list[#list.serv_list + 1] = { ip = sub_info[j][1], port = sub_info[j][2] }
slots[slot] = list
end
slots[slot] = list
end

-- append to the list of all servers
for _, serv in ipairs(list.serv_list) do
table.insert(servers.serv_list,serv)
end
end
--ngx.log(ngx.NOTICE, "finished initializing slotcache...")
Expand All @@ -170,31 +173,11 @@ local function try_hosts_slots(self, serv_list)
else
table_insert(errors, err)
end
-- cache master nodes
local nodes_res, nerr = redis_client:cluster("nodes")
if nodes_res then
local nodes_info = split(nodes_res, char(10))
for _, node in ipairs(nodes_info) do
local node_info = split(node, " ")
if #node_info > 2 then
local is_master = match(node_info[3], "master") ~= nil
if is_master then
local ip_port = split(split(node_info[2], "@")[1], ":")
table_insert(master_nodes, {
ip = ip_port[1],
port = tonumber(ip_port[2])
})
end
end
end
else
table_insert(errors, nerr)
end
release_connection(redis_client, config)

-- refresh of slots and master nodes successful
-- refresh of slots successfully
-- not required to connect/iterate over additional hosts
if nodes_res and slots_info then
if slots_info then
return true, nil
end
elseif max_connection_timeout_err then
Expand Down Expand Up @@ -321,11 +304,7 @@ function _M.new(_, config)
return inst
end


local function pick_node(self, serv_list, slot, magic_radom_seed)
local host
local port
local slave
local index
if #serv_list < 1 then
return nil, nil, nil, "serv_list for slot " .. slot .. " is empty"
Expand All @@ -336,28 +315,15 @@ local function pick_node(self, serv_list, slot, magic_radom_seed)
else
index = math.random(#serv_list)
end
host = serv_list[index].ip
port = serv_list[index].port
--cluster slots will always put the master node as first
if index > 1 then
slave = true
else
slave = false
end
--ngx.log(ngx.NOTICE, "pickup node: ", c(serv_list[index]))
else
host = serv_list[1].ip
port = serv_list[1].port
slave = false
--ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[1]))
index = 1
end
return host, port, slave
--ngx.log(ngx.NOTICE, "pickup node: ", cjson.encode(serv_list[index]))
return serv_list[index].ip, serv_list[index].port, serv_list[index].slave
end


local ask_host_and_port = {}


local function parse_ask_signal(res)
--ask signal sample:ASK 12191 127.0.0.1:7008, so we need to parse and get 127.0.0.1, 7008
if res ~= ngx.null then
Expand Down Expand Up @@ -537,19 +503,22 @@ end

local function _do_cmd_master(self, cmd, key, ...)
local errors = {}
for _, master in ipairs(master_nodes) do
local redis_client = redis:new()
redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
self.config.send_timeout or DEFAULT_SEND_TIMEOUT,
self.config.read_timeout or DEFAULT_READ_TIMEOUT)
local ok, err = redis_client:connect(master.ip, master.port, self.config.connect_opts)
if ok then
_, err = redis_client[cmd](redis_client, key, ...)
end
if err then
table_insert(errors, err)
local serv_list = slot_cache[self.config.name .. "serv_list"].serv_list
for _, server in ipairs(serv_list) do
if not server.slave then
local redis_client = redis:new()
redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
self.config.send_timeout or DEFAULT_SEND_TIMEOUT,
self.config.read_timeout or DEFAULT_READ_TIMEOUT)
local ok, err = redis_client:connect(server.ip, server.port, self.config.connect_opts)
if ok then
_, err = redis_client[cmd](redis_client, key, ...)
end
if err then
table_insert(errors, err)
end
release_connection(redis_client, self.config)
end
release_connection(redis_client, self.config)
end
return #errors == 0, table.concat(errors, ";")
end
Expand Down