From d76b002fe36794db0a97e7bb2957220eba11833a Mon Sep 17 00:00:00 2001 From: whoarethebritons Date: Wed, 6 Nov 2019 16:21:29 -0800 Subject: [PATCH 1/3] fix a race condition that can end up with appservers selecting the same port --- .../instance_manager/instance_manager.py | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/AdminServer/appscale/admin/instance_manager/instance_manager.py b/AdminServer/appscale/admin/instance_manager/instance_manager.py index 772018b89e..d58065ab5f 100644 --- a/AdminServer/appscale/admin/instance_manager/instance_manager.py +++ b/AdminServer/appscale/admin/instance_manager/instance_manager.py @@ -4,6 +4,7 @@ import monotonic import json import os +import socket import urllib2 from tornado import gen @@ -442,7 +443,7 @@ def _stop_app_instance(self, instance): yield self._clean_old_sources() - def _get_lowest_port(self): + def _get_lowest_port(self, excluded_ports): """ Determines the lowest usuable port for a new instance. Returns: @@ -451,12 +452,23 @@ def _get_lowest_port(self): existing_ports = {instance.port for instance in self._running_instances} port = STARTING_INSTANCE_PORT while True: - if port in existing_ports: + if port in existing_ports or port in excluded_ports or not self._try_port(port): port += 1 continue return port + def _try_port(self, port): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = False + try: + sock.bind(("0.0.0.0", port)) + result = True + except: + logger.info("Port {} is in use".format(port)) + sock.close() + return result + @gen.coroutine def _restart_unrouted_instances(self): """ Restarts instances that the router considers offline. """ @@ -535,7 +547,9 @@ def _fulfill_assignments(self): for instance in to_stop: yield self._stop_app_instance(instance) - + excluded_ports = set() + for assigned_ports in self._assignments: + excluded_ports.update(assigned_ports) for version_key, assigned_ports in self._assignments.items(): try: version = self._projects_manager.version_from_key(version_key) @@ -571,7 +585,9 @@ def _fulfill_assignments(self): and instance.port not in assigned_ports] to_start = max(new_assignment_count - len(candidates), 0) for _ in range(to_start): - yield self._start_instance(version, self._get_lowest_port()) + port = self._get_lowest_port(excluded_ports) + excluded_ports.add(port) + yield self._start_instance(version, port) @gen.coroutine def _enforce_instance_details(self): From 22d251abce066e08c2c3d6a82257763a626ad621 Mon Sep 17 00:00:00 2001 From: whoarethebritons Date: Thu, 7 Nov 2019 14:37:49 -0800 Subject: [PATCH 2/3] fix/add docstrings --- .../admin/instance_manager/instance_manager.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/AdminServer/appscale/admin/instance_manager/instance_manager.py b/AdminServer/appscale/admin/instance_manager/instance_manager.py index d58065ab5f..7cb0898bad 100644 --- a/AdminServer/appscale/admin/instance_manager/instance_manager.py +++ b/AdminServer/appscale/admin/instance_manager/instance_manager.py @@ -445,7 +445,10 @@ def _stop_app_instance(self, instance): def _get_lowest_port(self, excluded_ports): """ Determines the lowest usuable port for a new instance. - + Args: + excluded_ports: A set of ports that the caller is planning on using. + This is used to make sure AppServers do not pick the same port since + it's possible for the port to not be in self._running_instances yet. Returns: An integer specifying a free port. """ @@ -459,6 +462,11 @@ def _get_lowest_port(self, excluded_ports): return port def _try_port(self, port): + """ Helper method to check if the port is actually free or not. + + Returns: + True if port is free, False otherwise. + """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = False try: From 27e8efb9f50c884eac80d7726c16e58c38790c76 Mon Sep 17 00:00:00 2001 From: whoarethebritons Date: Fri, 15 Nov 2019 12:11:05 -0800 Subject: [PATCH 3/3] add unit test to get distinct ports and fix excluded port initialization --- .../instance_manager/instance_manager.py | 8 +- AdminServer/tests/test_instance_manager.py | 74 +++++++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/AdminServer/appscale/admin/instance_manager/instance_manager.py b/AdminServer/appscale/admin/instance_manager/instance_manager.py index 7cb0898bad..6e182f7c74 100644 --- a/AdminServer/appscale/admin/instance_manager/instance_manager.py +++ b/AdminServer/appscale/admin/instance_manager/instance_manager.py @@ -556,8 +556,11 @@ def _fulfill_assignments(self): for instance in to_stop: yield self._stop_app_instance(instance) excluded_ports = set() - for assigned_ports in self._assignments: - excluded_ports.update(assigned_ports) + for _, assigned_ports in self._assignments.items(): + for port in assigned_ports: + if port != -1: + excluded_ports.add(port) + for version_key, assigned_ports in self._assignments.items(): try: version = self._projects_manager.version_from_key(version_key) @@ -568,7 +571,6 @@ def _fulfill_assignments(self): # The number of required instances that don't have an assigned port. new_assignment_count = sum(port == -1 for port in assigned_ports) - # Stop instances that aren't assigned. If the assignment list includes # any -1s, match them to running instances that aren't in the assigned # ports list. diff --git a/AdminServer/tests/test_instance_manager.py b/AdminServer/tests/test_instance_manager.py index c489fe1369..29b62b7e10 100644 --- a/AdminServer/tests/test_instance_manager.py +++ b/AdminServer/tests/test_instance_manager.py @@ -28,6 +28,7 @@ service_helper, testing ) +from appscale.admin.instance_manager.instance import Instance from appscale.common.service_helper import ServiceOperator options.define('login_ip', '127.0.0.1') @@ -303,5 +304,78 @@ def test_wait_for_app(self): instance_started = yield instance_manager._wait_for_app(port) self.assertEqual(False, instance_started) + @gen_test + def test_parallel_appservers(self): + testing.disable_logging() + version_details = {'runtime': 'python27', + 'revision': 1, + 'deployment': { + 'zip': {'sourceUrl': 'source.tar.gz'}}, + 'appscaleExtensions': {'httpPort': '8080'} + } + version_manager = flexmock(version_details=version_details, + project_id = 'test', + revision_key = 'test_default_v1_1', + version_key = 'test_default_v1') + deployment_config = flexmock( + get_config = lambda x: {'default_max_appserver_memory': 400}) + + + source_manager = flexmock() + response = Future() + response.set_result(None) + source_manager.should_receive('ensure_source'). \ + with_args('test_default_v1_1', 'source.tar.gz', 'python27'). \ + and_return(response) + + instance_manager = InstanceManager( + None, None, None, None, deployment_config, + source_manager, None, None, None) + + # Start instance mocks + response = Future() + response.set_result((19999, [])) + flexmock(instance_manager).should_receive('_ensure_api_server').\ + and_return(response) + + # write env + flexmock(file_io).should_receive('write').and_return() + + response = Future() + response.set_result(None) + flexmock(ServiceOperator).should_receive('start_async').\ + and_return(response) + + instance_manager._zk_client = flexmock() + instance_manager._zk_client.should_receive('ensure_path') + + # Within add_routing + response = Future() + response.set_result(True) + flexmock(instance_manager).should_receive('_wait_for_app').\ + and_return(response) + instance_manager._routing_client = flexmock() + instance_manager._routing_client.should_receive('register_instance').and_return() + + flexmock(utils).should_receive("setup_logrotate").and_return() + + + # Fulfill assignments mocks + instance_manager._service_operator = flexmock( + start_async=lambda service, wants, properties: response) + + instance_manager._login_server = '192.168.33.10' + instance_manager._running_instances = {Instance('test_default_v1_1', 20000)} + instance_manager._assignments = {'test_default_v1': [20000, -1, -1, -1, -1]} + instance_manager._projects_manager = flexmock() + instance_manager._projects_manager.should_receive('version_from_key')\ + .and_return(version_manager) + + yield instance_manager._fulfill_assignments() + should_be_running = {Instance('test_default_v1_1', x) for x in + range(20000, 20005)} + assert instance_manager._running_instances == should_be_running + + if __name__ == "__main__": unittest.main()