Skip to content

[Core] Optimize kubernetes cmd executions with kubernetes command runner #3157

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 88 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
1571e4e
remove job_owner
Michaelvll Jan 27, 2024
afd5660
remove some clouds.Local related code
Michaelvll Jan 27, 2024
8314263
Remove Local cloud entirely
Michaelvll Jan 27, 2024
1a025c5
remove local cloud
Michaelvll Jan 27, 2024
c9f79b0
fix
Michaelvll Jan 27, 2024
308ca0f
slurm runner
Feb 3, 2024
f417c5f
kubernetes runner
Michaelvll Feb 14, 2024
520d457
Use command runner for kubernetes
Michaelvll Feb 14, 2024
d252582
rename back to ssh
Michaelvll Feb 14, 2024
e8bbd18
refactor runners in backend
Michaelvll Feb 14, 2024
645f067
Merge branch 'remove-local-cloud' of github.com:skypilot-org/skypilot…
Michaelvll Feb 14, 2024
0744608
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Feb 14, 2024
99aea26
fix
Michaelvll Feb 14, 2024
87ab4a8
fix
Michaelvll Feb 14, 2024
19791b1
fix rsync
Michaelvll Feb 14, 2024
84eff39
Fix runner
Michaelvll Feb 14, 2024
4c5f0e2
Fix run()
Michaelvll Feb 14, 2024
ed50603
errors and fix head runner
Michaelvll Feb 14, 2024
2355521
Merge branch 'master' of github.com:skypilot-org/skypilot into remove…
Michaelvll Feb 14, 2024
7486c0f
Merge branch 'remove-local-cloud' of github.com:skypilot-org/skypilot…
Michaelvll Feb 14, 2024
eae92cb
support different mode
Michaelvll Feb 14, 2024
07b0234
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Feb 21, 2024
8c0ffe1
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Mar 29, 2024
5a6b4c7
format
Michaelvll Mar 29, 2024
cbb7ac8
use whoami instead of $USER
Michaelvll Mar 29, 2024
872cfe5
timeline for run and rsync
Michaelvll Mar 30, 2024
69816a7
lazy imports for pandas and lazy data frame
Michaelvll Mar 30, 2024
065b80d
fix fetch_aws
Michaelvll Mar 30, 2024
3a27f61
fix fetchers
Michaelvll Mar 30, 2024
769840f
avoid sync script for task
Michaelvll Mar 30, 2024
356d851
Merge branch 'lazy-imports' of github.com:skypilot-org/skypilot into …
Michaelvll Mar 30, 2024
9cfaf09
add timeline
Michaelvll Mar 30, 2024
bf1ea40
cache cluster_info
Michaelvll Mar 30, 2024
68ebfdb
format
Michaelvll Mar 30, 2024
05ae471
cache cluster info
Michaelvll Mar 30, 2024
d6a7ef8
do not stream
Michaelvll Mar 30, 2024
9dca1a3
fix skip lines
Michaelvll Mar 30, 2024
b2ec63e
format
Michaelvll Mar 30, 2024
d2e358e
avoid source bashrc or -i for internal exec
Michaelvll Mar 30, 2024
34080c5
format
Michaelvll Mar 30, 2024
326bcb1
use -i
Michaelvll Mar 31, 2024
421e624
Add None arg
Michaelvll Mar 31, 2024
95b8976
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll May 10, 2024
b739d5d
fix merge conflicts
Michaelvll May 10, 2024
ffb9463
Fix source bashrc
Michaelvll May 10, 2024
478ca11
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll May 21, 2024
b14ead8
add connect_timeout
Michaelvll May 21, 2024
623e475
format
Michaelvll May 21, 2024
05e4b16
Correctly quote the script without source bashrc
Michaelvll May 21, 2024
c77746f
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll May 23, 2024
e20d0a4
fix output
Michaelvll May 23, 2024
1f9eb6d
Fix connection output
Michaelvll May 23, 2024
0b296ee
Fix
Michaelvll May 23, 2024
1b86928
check twice
Michaelvll May 23, 2024
556b370
add Job ID
Michaelvll May 23, 2024
55d3ce2
fix
Michaelvll May 23, 2024
f0fe059
Merge branch 'fix-connection-output' of github.com:skypilot-org/skypi…
Michaelvll May 23, 2024
7a76456
format
Michaelvll May 23, 2024
7e919e8
fix ip
Michaelvll May 23, 2024
b12be89
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll May 24, 2024
1b14e27
fix rsync for kubectl command runner
Michaelvll May 24, 2024
f3f8167
format
Michaelvll May 24, 2024
8ceec9e
Enable output check for kubernetes
Michaelvll May 24, 2024
281b4ee
Fix *
Michaelvll May 24, 2024
b890eb3
Fix comments
Michaelvll May 24, 2024
557234c
longer wait
Michaelvll May 24, 2024
4cd2ea9
longer wait
Michaelvll May 24, 2024
41f286e
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll May 24, 2024
baed886
Update sky/backends/cloud_vm_ray_backend.py
Michaelvll Jun 3, 2024
6b22124
Update sky/provision/kubernetes/instance.py
Michaelvll Jun 3, 2024
8a672d6
address comments
Michaelvll Jun 3, 2024
a032f4e
refactor rsync
Michaelvll Jun 3, 2024
9816e8a
add comment
Michaelvll Jun 3, 2024
27cb9a2
fix interface
Michaelvll Jun 3, 2024
19e681d
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
Michaelvll Jun 3, 2024
8354e5d
Update sky/utils/command_runner.py
Michaelvll Jun 3, 2024
b869868
fix quote
Michaelvll Jun 3, 2024
8751688
Merge branch 'kubernetes-runner' of https://github.com/skypilot-org/s…
Michaelvll Jun 3, 2024
0acfc0a
Fix skip lines
Michaelvll Jun 3, 2024
07bead2
fix smoke
Michaelvll Jun 3, 2024
5dbc9b4
format
Michaelvll Jun 3, 2024
9c46584
fix
Michaelvll Jun 3, 2024
da3fa06
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
Michaelvll Jun 4, 2024
1ebce4e
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
Michaelvll Jun 5, 2024
9011b1d
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Jun 6, 2024
a8d09b5
fix serve failures
Michaelvll Jun 7, 2024
30397ef
Fix condition
Michaelvll Jun 7, 2024
f5018f1
trigger test
Michaelvll Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2654,27 +2654,6 @@ def stop_handler(signum, frame):
raise KeyboardInterrupt(exceptions.SIGTSTP_CODE)


def run_command_and_handle_ssh_failure(runner: command_runner.SSHCommandRunner,
command: str,
failure_message: str) -> str:
"""Runs command remotely and returns output with proper error handling."""
rc, stdout, stderr = runner.run(command,
require_outputs=True,
stream_logs=False)
if rc == 255:
# SSH failed
raise RuntimeError(
f'SSH with user {runner.ssh_user} and key {runner.ssh_private_key} '
f'to {runner.ip} failed. This is most likely due to incorrect '
'credentials or incorrect permissions for the key file. Check '
'your credentials and try again.')
subprocess_utils.handle_returncode(rc,
command,
failure_message,
stderr=stderr)
return stdout


def check_rsync_installed() -> None:
"""Checks if rsync is installed.

Expand Down
7 changes: 5 additions & 2 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3644,7 +3644,10 @@ def _rsync_down(args) -> None:
try:
os.makedirs(local_log_dir, exist_ok=True)
runner.rsync(
source=f'{remote_log_dir}/*',
# Require a `/` at the end to make sure the parent dir
# are not created locally. We do not add additional '*' as
# kubernetes's rsync does not work with an ending '*'.
source=f'{remote_log_dir}/',
Comment on lines +3647 to +3650
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - does the removal of * impact how hidden files are handled? On my mac it does not make any difference and this change should be ok, but this article seems to suggest it does. Any thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems no significant difference between /* and /, and the latter is more robust. The tailing * means the shell expands the pattern to include all files and directories in src before rsync runs, while the latter relies on rsync's own logic to sync the content of src.

target=local_log_dir,
up=False,
stream_logs=False,
Expand All @@ -3653,7 +3656,7 @@ def _rsync_down(args) -> None:
if e.returncode == exceptions.RSYNC_FILE_NOT_FOUND_CODE:
# Raised by rsync_down. Remote log dir may not exist, since
# the job can be run on some part of the nodes.
logger.debug(f'{runner.ip} does not have the tasks/*.')
logger.debug(f'{runner.node_id} does not have the tasks/*.')
else:
raise

Expand Down
1 change: 1 addition & 0 deletions sky/provision/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from sky.provision.kubernetes.config import bootstrap_instances
from sky.provision.kubernetes.instance import get_cluster_info
from sky.provision.kubernetes.instance import get_command_runners
from sky.provision.kubernetes.instance import query_instances
from sky.provision.kubernetes.instance import run_instances
from sky.provision.kubernetes.instance import stop_instances
Expand Down
196 changes: 98 additions & 98 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sky.provision import docker_utils
from sky.provision.kubernetes import config as config_lib
from sky.provision.kubernetes import utils as kubernetes_utils
from sky.utils import command_runner
from sky.utils import common_utils
from sky.utils import kubernetes_enums
from sky.utils import ux_utils
Expand Down Expand Up @@ -158,6 +159,15 @@ def _raise_pod_scheduling_errors(namespace, new_nodes):
raise config_lib.KubernetesError(f'{timeout_err_msg}')


def _raise_command_running_error(message: str, command: str, pod_name: str,
rc: int, stdout: str) -> None:
if rc == 0:
return
raise config_lib.KubernetesError(
f'Failed to {message} for pod {pod_name} with return '
f'code {rc}: {command!r}\nOutput: {stdout}.')


def _wait_for_pods_to_schedule(namespace, new_nodes, timeout: int):
"""Wait for all pods to be scheduled.

Expand Down Expand Up @@ -250,39 +260,6 @@ def _wait_for_pods_to_run(namespace, new_nodes):
time.sleep(1)


def _run_command_on_pods(node_name: str,
node_namespace: str,
command: List[str],
stream_logs: bool = False):
"""Run command on Kubernetes pods.

If `stream_logs` is True, we poll for output and error messages while the
command is executing, and the stdout and stderr is written to logger.info.
When called from the provisioner, this logger.info is written to the
provision.log file (see setup_provision_logging()).
"""
cmd_output = kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
node_name,
node_namespace,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=(not stream_logs),
_request_timeout=kubernetes.API_TIMEOUT)
if stream_logs:
while cmd_output.is_open():
cmd_output.update(timeout=1)
if cmd_output.peek_stdout():
logger.info(f'{cmd_output.read_stdout().strip()}')
if cmd_output.peek_stderr():
logger.info(f'{cmd_output.read_stderr().strip()}')
cmd_output.close()
return cmd_output


def _set_env_vars_in_pods(namespace: str, new_pods: List):
"""Setting environment variables in pods.

Expand All @@ -299,42 +276,44 @@ def _set_env_vars_in_pods(namespace: str, new_pods: List):
/etc/profile.d/, making them available for all users in future
shell sessions.
"""
set_k8s_env_var_cmd = [
'/bin/sh',
'-c',
docker_utils.SETUP_ENV_VARS_CMD,
]
set_k8s_env_var_cmd = docker_utils.SETUP_ENV_VARS_CMD

for new_pod in new_pods:
_run_command_on_pods(new_pod.metadata.name, namespace,
set_k8s_env_var_cmd)
runner = command_runner.KubernetesCommandRunner(
(namespace, new_pod.metadata.name))
rc, stdout, _ = runner.run(set_k8s_env_var_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('set env vars', set_k8s_env_var_cmd,
new_pod.metadata.name, rc, stdout)


def _check_user_privilege(namespace: str, new_nodes: List) -> None:
# Checks if the default user has sufficient privilege to set up
# the kubernetes instance pod.
check_k8s_user_sudo_cmd = [
'/bin/sh',
'-c',
(
'if [ $(id -u) -eq 0 ]; then'
# If user is root, create an alias for sudo used in skypilot setup
' echo \'alias sudo=""\' >> ~/.bashrc; '
'else '
' if command -v sudo >/dev/null 2>&1; then '
' timeout 2 sudo -l >/dev/null 2>&1 || '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' else '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' fi; '
'fi')
]
check_k8s_user_sudo_cmd = (
'if [ $(id -u) -eq 0 ]; then'
# If user is root, create an alias for sudo used in skypilot setup
' echo \'alias sudo=""\' >> ~/.bashrc; echo succeed;'
'else '
' if command -v sudo >/dev/null 2>&1; then '
' timeout 2 sudo -l >/dev/null 2>&1 && echo succeed || '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' else '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' fi; '
'fi')

for new_node in new_nodes:
privilege_check = _run_command_on_pods(new_node.metadata.name,
namespace,
check_k8s_user_sudo_cmd)
if privilege_check == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE):
runner = command_runner.KubernetesCommandRunner(
(namespace, new_node.metadata.name))
rc, stdout, _ = runner.run(check_k8s_user_sudo_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('check user privilege',
check_k8s_user_sudo_cmd,
new_node.metadata.name, rc, stdout)
if stdout == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE):
raise config_lib.KubernetesError(
'Insufficient system privileges detected. '
'Ensure the default user has root access or '
Expand All @@ -345,44 +324,43 @@ def _check_user_privilege(namespace: str, new_nodes: List) -> None:
def _setup_ssh_in_pods(namespace: str, new_nodes: List) -> None:
# Setting up ssh for the pod instance. This is already setup for
# the jump pod so it does not need to be run for it.
set_k8s_ssh_cmd = [
'/bin/sh',
'-c',
(
'set -x; '
'prefix_cmd() '
'{ if [ $(id -u) -ne 0 ]; then echo "sudo"; else echo ""; fi; }; '
'export DEBIAN_FRONTEND=noninteractive;'
'$(prefix_cmd) apt-get update;'
'$(prefix_cmd) apt install openssh-server rsync -y; '
'$(prefix_cmd) mkdir -p /var/run/sshd; '
'$(prefix_cmd) '
'sed -i "s/PermitRootLogin prohibit-password/PermitRootLogin yes/" '
'/etc/ssh/sshd_config; '
'$(prefix_cmd) sed '
'"s@session\\s*required\\s*pam_loginuid.so@session optional '
'pam_loginuid.so@g" -i /etc/pam.d/sshd; '
'cd /etc/ssh/ && $(prefix_cmd) ssh-keygen -A; '
'$(prefix_cmd) mkdir -p ~/.ssh; '
'$(prefix_cmd) chown -R $(whoami) ~/.ssh;'
'$(prefix_cmd) chmod 700 ~/.ssh; '
'$(prefix_cmd) chmod 644 ~/.ssh/authorized_keys; '
'$(prefix_cmd) cat /etc/secret-volume/ssh-publickey* > '
'~/.ssh/authorized_keys; '
'$(prefix_cmd) service ssh restart; '
# Eliminate the error
# `mesg: ttyname failed: inappropriate ioctl for device`.
# See https://www.educative.io/answers/error-mesg-ttyname-failed-inappropriate-ioctl-for-device # pylint: disable=line-too-long
'$(prefix_cmd) sed -i "s/mesg n/tty -s \\&\\& mesg n/" ~/.profile;')
]
set_k8s_ssh_cmd = (
'set -ex; '
'prefix_cmd() '
'{ if [ $(id -u) -ne 0 ]; then echo "sudo"; else echo ""; fi; }; '
'export DEBIAN_FRONTEND=noninteractive;'
'$(prefix_cmd) apt-get update;'
'$(prefix_cmd) apt install openssh-server rsync -y; '
'$(prefix_cmd) mkdir -p /var/run/sshd; '
'$(prefix_cmd) '
'sed -i "s/PermitRootLogin prohibit-password/PermitRootLogin yes/" '
'/etc/ssh/sshd_config; '
'$(prefix_cmd) sed '
'"s@session\\s*required\\s*pam_loginuid.so@session optional '
'pam_loginuid.so@g" -i /etc/pam.d/sshd; '
'cd /etc/ssh/ && $(prefix_cmd) ssh-keygen -A; '
'$(prefix_cmd) mkdir -p ~/.ssh; '
'$(prefix_cmd) chown -R $(whoami) ~/.ssh;'
'$(prefix_cmd) chmod 700 ~/.ssh; '
'$(prefix_cmd) cat /etc/secret-volume/ssh-publickey* > '
'~/.ssh/authorized_keys; '
'$(prefix_cmd) chmod 644 ~/.ssh/authorized_keys; '
'$(prefix_cmd) service ssh restart; '
# Eliminate the error
# `mesg: ttyname failed: inappropriate ioctl for device`.
# See https://www.educative.io/answers/error-mesg-ttyname-failed-inappropriate-ioctl-for-device # pylint: disable=line-too-long
'$(prefix_cmd) sed -i "s/mesg n/tty -s \\&\\& mesg n/" ~/.profile;')

# TODO(romilb): Parallelize the setup of SSH in pods for multi-node clusters
for new_node in new_nodes:
pod_name = new_node.metadata.name
runner = command_runner.KubernetesCommandRunner((namespace, pod_name))
logger.info(f'{"-"*20}Start: Set up SSH in pod {pod_name!r} {"-"*20}')
_run_command_on_pods(new_node.metadata.name,
namespace,
set_k8s_ssh_cmd,
stream_logs=True)
rc, stdout, _ = runner.run(set_k8s_ssh_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('setup ssh', set_k8s_ssh_cmd, pod_name, rc,
stdout)
logger.info(f'{"-"*20}End: Set up SSH in pod {pod_name!r} {"-"*20}')


Expand Down Expand Up @@ -709,11 +687,15 @@ def get_cluster_info(
assert cpu_request is not None, 'cpu_request should not be None'

ssh_user = 'sky'
get_k8s_ssh_user_cmd = ['/bin/sh', '-c', ('echo $(whoami)')]
get_k8s_ssh_user_cmd = 'echo $(whoami)'
assert head_pod_name is not None
ssh_user = _run_command_on_pods(head_pod_name, namespace,
get_k8s_ssh_user_cmd)
ssh_user = ssh_user.strip()
runner = command_runner.KubernetesCommandRunner((namespace, head_pod_name))
rc, stdout, _ = runner.run(get_k8s_ssh_user_cmd,
require_outputs=True,
stream_logs=False)
_raise_command_running_error('get ssh user', get_k8s_ssh_user_cmd,
head_pod_name, rc, stdout)
ssh_user = stdout.strip()
logger.debug(
f'Using ssh user {ssh_user} for cluster {cluster_name_on_cloud}')

Expand Down Expand Up @@ -776,3 +758,21 @@ def query_instances(
continue
cluster_status[pod.metadata.name] = pod_status
return cluster_status


def get_command_runners(
cluster_info: common.ClusterInfo,
**credentials: Dict[str, Any],
) -> List[command_runner.CommandRunner]:
"""Get a command runner for the given cluster."""
assert cluster_info.provider_config is not None, cluster_info
instances = cluster_info.instances
namespace = _get_namespace(cluster_info.provider_config)
node_list = []
if cluster_info.head_instance_id is not None:
node_list = [(namespace, cluster_info.head_instance_id)]
node_list.extend((namespace, pod_name)
for pod_name in instances.keys()
if pod_name != cluster_info.head_instance_id)
return command_runner.KubernetesCommandRunner.make_runner_list(
node_list=node_list, **credentials)
Loading
Loading