diff --git a/dask/dask.sh b/dask/dask.sh index d9007c3f1..9b9de8362 100644 --- a/dask/dask.sh +++ b/dask/dask.sh @@ -518,8 +518,8 @@ function main() { echo "Dask for ${DASK_RUNTIME} successfully initialized." } -function exit_handler() ( - set +e +function exit_handler() { + set +ex echo "Exit handler invoked" # Free conda cache @@ -528,16 +528,29 @@ function exit_handler() ( # Clear pip cache pip cache purge || echo "unable to purge pip cache" - # remove the tmpfs conda pkgs_dirs - if [[ -d /mnt/shm ]] ; then /opt/conda/miniconda3/bin/conda config --remove pkgs_dirs /mnt/shm ; fi + # If system memory was sufficient to mount memory-backed filesystems + if [[ "${tmpdir}" == "/mnt/shm" ]] ; then + # Stop hadoop services + systemctl list-units | perl -n -e 'qx(systemctl stop $1) if /^.*? ((hadoop|knox|hive|mapred|yarn|hdfs)\S*).service/' - # Clean up shared memory mounts - for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm ; do - if grep -q "^tmpfs ${shmdir}" /proc/mounts ; then - rm -rf ${shmdir}/* - umount -f ${shmdir} - fi - done + # remove the tmpfs conda pkgs_dirs + /opt/conda/miniconda3/bin/conda config --remove pkgs_dirs /mnt/shm || echo "unable to remove pkgs_dirs conda config" + + # remove the tmpfs pip cache-dir + pip config unset global.cache-dir || echo "unable to unset global pip cache" + + # Clean up shared memory mounts + for shmdir in /var/cache/apt/archives /var/cache/dnf /mnt/shm ; do + if grep -q "^tmpfs ${shmdir}" /proc/mounts ; then + sync + sleep 3s + execute_with_retries umount -f ${shmdir} + fi + done + + umount -f /tmp + systemctl list-units | perl -n -e 'qx(systemctl start $1) if /^.*? ((hadoop|knox|hive|mapred|yarn|hdfs)\S*).service/' + fi # Clean up OS package cache ; re-hold systemd package if is_debuntu ; then @@ -547,34 +560,65 @@ function exit_handler() ( dnf clean all fi - # print disk usage statistics - if is_debuntu ; then - # Rocky doesn't have sort -h and fails when the argument is passed - du --max-depth 3 -hx / | sort -h | tail -10 + # print disk usage statistics for large components + if is_ubuntu ; then + du -hs \ + /usr/lib/{pig,hive,hadoop,jvm,spark,google-cloud-sdk,x86_64-linux-gnu} \ + /usr/lib \ + /opt/nvidia/* \ + /usr/local/cuda-1?.? \ + /opt/conda/miniconda3 \ + "${DASK_CONDA_ENV}" + elif is_debian ; then + du -hs \ + /usr/lib/{pig,hive,hadoop,jvm,spark,google-cloud-sdk,x86_64-linux-gnu} \ + /usr/lib \ + /usr/local/cuda-1?.? \ + /opt/conda/miniconda3 \ + "${DASK_CONDA_ENV}" + else + du -hs \ + /var/lib/docker \ + /usr/lib/{pig,hive,hadoop,firmware,jvm,spark,atlas} \ + /usr/lib64/google-cloud-sdk \ + /usr/lib \ + /opt/nvidia/* \ + /usr/local/cuda-1?.? \ + /opt/conda/miniconda3 \ + "${DASK_CONDA_ENV}" fi # Process disk usage logs from installation period - rm -f "${tmpdir}/keep-running-df" - sleep 6s + rm -f /run/keep-running-df + sync + sleep 5.01s # compute maximum size of disk during installation # Log file contains logs like the following (minus the preceeding #): -#Filesystem Size Used Avail Use% Mounted on -#/dev/vda2 6.8G 2.5G 4.0G 39% / - df -h / | tee -a "${tmpdir}/disk-usage.log" - perl -e '$max=( sort - map { (split)[2] =~ /^(\d+)/ } - grep { m:^/: } )[-1]; -print( "maximum-disk-used: $max", $/ );' < "${tmpdir}/disk-usage.log" +#Filesystem 1K-blocks Used Available Use% Mounted on +#/dev/vda2 7096908 2611344 4182932 39% / + df / | tee -a "/run/disk-usage.log" + + perl -e '@siz=( sort { $a => $b } + map { (split)[2] =~ /^(\d+)/ } + grep { m:^/: } ); +$max=$siz[0]; $min=$siz[-1]; $inc=$max-$min; +print( " samples-taken: ", scalar @siz, $/, + "maximum-disk-used: $max", $/, + "minimum-disk-used: $min", $/, + " increased-by: $inc", $/ )' < "/run/disk-usage.log" echo "exit_handler has completed" # zero free disk space if [[ -n "$(get_metadata_attribute creating-image)" ]]; then - dd if=/dev/zero of=/zero ; sync ; rm -f /zero + dd if=/dev/zero of=/zero + sync + sleep 3s + rm -f /zero fi return 0 -) +} function prepare_to_install() { readonly DEFAULT_CUDA_VERSION="12.4" @@ -600,7 +644,7 @@ function prepare_to_install() { free_mem="$(awk '/^MemFree/ {print $2}' /proc/meminfo)" # Write to a ramdisk instead of churning the persistent disk - if [[ ${free_mem} -ge 5250000 ]]; then + if [[ ${free_mem} -ge 10500000 ]]; then tmpdir=/mnt/shm mkdir -p /mnt/shm mount -t tmpfs tmpfs /mnt/shm @@ -622,19 +666,19 @@ function prepare_to_install() { else tmpdir=/tmp fi - install_log="${tmpdir}/install.log" + install_log="/run/install.log" trap exit_handler EXIT - + # Monitor disk usage in a screen session if is_debuntu ; then apt-get install -y -qq screen else dnf -y -q install screen fi - rm -f "${tmpdir}/disk-usage.log" - touch "${tmpdir}/keep-running-df" + df / | tee "/run/disk-usage.log" + touch "/run/keep-running-df" screen -d -m -US keep-running-df \ - bash -c "while [[ -f ${tmpdir}/keep-running-df ]] ; do df -h / | tee -a ${tmpdir}/disk-usage.log ; sleep 5s ; done" + bash -c "while [[ -f /run/keep-running-df ]] ; do df / | tee -a /run/disk-usage.log ; sleep 5s ; done" } prepare_to_install diff --git a/dask/test_dask.py b/dask/test_dask.py index 440493511..ec907df03 100644 --- a/dask/test_dask.py +++ b/dask/test_dask.py @@ -9,78 +9,104 @@ class DaskTestCase(DataprocTestCase): - COMPONENT = 'dask' - INIT_ACTIONS = ['dask/dask.sh'] - - DASK_YARN_TEST_SCRIPT = 'verify_dask_yarn.py' - DASK_STANDALONE_TEST_SCRIPT = 'verify_dask_standalone.py' - - def verify_dask_yarn(self, name): - self._run_dask_test_script(name, self.DASK_YARN_TEST_SCRIPT) - - def verify_dask_standalone(self, name, master_hostname): - script=self.DASK_STANDALONE_TEST_SCRIPT - verify_cmd = "/opt/conda/miniconda3/envs/dask/bin/python {} {}".format( - script, - master_hostname - ) - abspath=os.path.join(os.path.dirname(os.path.abspath(__file__)),script) - self.upload_test_file(abspath, name) + COMPONENT = 'dask' + INIT_ACTIONS = [ + 'dask/dask.sh' + ] + INTERPRETER = '/opt/conda/miniconda3/envs/dask/bin/python' + + DASK_YARN_TEST_SCRIPT = 'verify_dask_yarn.py' + DASK_STANDALONE_TEST_SCRIPT = 'verify_dask_standalone.py' + + def verify_dask_standalone(self, name, master_hostname): + script=self.DASK_STANDALONE_TEST_SCRIPT + verify_cmd = "{} {} {}".format( + INTERPRETER, + script, + master_hostname + ) + abspath=os.path.join(os.path.dirname(os.path.abspath(__file__)),script) + self.upload_test_file(abspath, name) + self.assert_instance_command(name, verify_cmd) + self.remove_test_script(script, name) + + def _run_dask_test_script(self, name, script): + test_filename=os.path.join(os.path.dirname(os.path.abspath(__file__)), + script), name) + self.upload_test_file(test_filename, name) + verify_cmd = "{} {}".format( + INTERPRETER, + script) + command_asserted=0 + for try_number in range(0, 3): + try: + self.assert_instance_command(name, verify_cmd) + command_asserted=1 + break + except: + time.sleep(2**try_number) + if command_asserted == 0: + raise Exception("Unable to assert instance command [{}]".format(verify_cmd)) + + self.remove_test_script(script, name) + + def verify_dask_worker_service(self, name): + verify_cmd = "[[ X$(systemctl show dask-worker -p SubState --value)X == XrunningX ]]" + # Retry the first ssh to ensure it has enough time to propagate SSH keys + command_asserted=0 + for try_number in range(0, 3): + try: self.assert_instance_command(name, verify_cmd) - self.remove_test_script(script, name) - - def _run_dask_test_script(self, name, script): - verify_cmd = "/opt/conda/miniconda3/envs/dask/bin/python {}".format( - script) - self.upload_test_file( - os.path.join(os.path.dirname(os.path.abspath(__file__)), - script), name) - command_asserted=0 - for try_number in range(0, 7): - try: - self.assert_instance_command(name, verify_cmd) - command_asserted=1 - break - except: - time.sleep(2**try_number) - if command_asserted == 0: - raise Exception("Unable to assert instance command [{}]".format(verify_cmd)) - - self.remove_test_script(script, name) - - - @parameterized.parameters( - ("STANDARD", ["m", "w-0"], "yarn"), - ("STANDARD", ["m"], "standalone"), - ("KERBEROS", ["m"], "standalone"), + command_asserted=1 + break + except: + time.sleep(2**try_number) + if command_asserted == 0: + raise Exception("Unable to assert instance command [{}]".format(verify_cmd)) + + def verify_dask_config(self, name): + self.assert_instance_command( + name, "[[ $(wc -l /etc/dask/config.yaml) == 11 ]]") + + @parameterized.parameters( + ("STANDARD", ["m", "w-0"], "yarn"), + ("STANDARD", ["m", "w-0", "w-1"], "standalone"), + ("KERBEROS", ["m"], None), + ("HA", ["m-0"], None), + ("SINGLE", ["m"], None), + ) + def test_dask(self, configuration, machine_suffixes, dask_runtime): + + if self.getImageVersion() < pkg_resources.parse_version("2.0"): + self.skipTest("Not supported in pre-2.0 images") + + metadata = None + if dask_runtime: + metadata = "dask-runtime={}".format(dask_runtime) + + self.createCluster( + configuration, + self.INIT_ACTIONS, + metadata=metadata, + machine_type='n1-standard-8', + timeout_in_minutes=20 ) - def test_dask(self, configuration, instances, runtime): - if self.getImageVersion() < pkg_resources.parse_version("2.0"): - self.skipTest("Not supported in pre-2.0 images") + c_name=self.getClusterName() + if configuration == 'HA': + master_hostname = c_name + '-m-0' + else: + master_hostname = c_name + '-m' - metadata = None - if runtime: - metadata = "dask-runtime={}".format(runtime) + for machine_suffix in machine_suffixes: + machine_name = "{}-{}".format(c_name, machine_suffix) - self.createCluster(configuration, - self.INIT_ACTIONS, - machine_type='n1-standard-16', - metadata=metadata, - timeout_in_minutes=20) - - if configuration == 'HA': - master_hostname = self.getClusterName() + '-m-0' - else: - master_hostname = self.getClusterName() + '-m' - - for instance in instances: - name = "{}-{}".format(self.getClusterName(), instance) - - if runtime == "standalone": - self.verify_dask_standalone(name, master_hostname) - else: - self.verify_dask_yarn(name) + if dask_runtime == 'standalone' or dask_runtime == None: + self.verify_dask_worker_service(machine_name) + self.verify_dask_standalone(machine_name, master_hostname) + elif dask_runtime == 'yarn': + self.verify_dask_config(machine_name) + self._run_dask_test_script(name, self.DASK_YARN_TEST_SCRIPT) if __name__ == '__main__': - absltest.main() + absltest.main()