Skip to content

Commit ef81fb0

Browse files
committed
when using --fast, check if config_hash matches, and if not, provision
1 parent fab22d0 commit ef81fb0

File tree

5 files changed

+132
-57
lines changed

5 files changed

+132
-57
lines changed

sky/backends/backend.py

+20-15
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,23 @@ def check_resources_fit_cluster(self, handle: _ResourceHandleType,
4545
@timeline.event
4646
@usage_lib.messages.usage.update_runtime('provision')
4747
def provision(
48-
self,
49-
task: 'task_lib.Task',
50-
to_provision: Optional['resources.Resources'],
51-
dryrun: bool,
52-
stream_logs: bool,
53-
cluster_name: Optional[str] = None,
54-
retry_until_up: bool = False) -> Optional[_ResourceHandleType]:
48+
self,
49+
task: 'task_lib.Task',
50+
to_provision: Optional['resources.Resources'],
51+
dryrun: bool,
52+
stream_logs: bool,
53+
cluster_name: Optional[str] = None,
54+
retry_until_up: bool = False,
55+
skip_if_config_hash_matches: Optional[str] = None
56+
) -> Optional[_ResourceHandleType]:
5557
if cluster_name is None:
5658
cluster_name = sky.backends.backend_utils.generate_cluster_name()
5759
usage_lib.record_cluster_name_for_current_operation(cluster_name)
5860
usage_lib.messages.usage.update_actual_task(task)
5961
with rich_utils.safe_status(ux_utils.spinner_message('Launching')):
6062
return self._provision(task, to_provision, dryrun, stream_logs,
61-
cluster_name, retry_until_up)
63+
cluster_name, retry_until_up,
64+
skip_if_config_hash_matches)
6265

6366
@timeline.event
6467
@usage_lib.messages.usage.update_runtime('sync_workdir')
@@ -126,13 +129,15 @@ def register_info(self, **kwargs) -> None:
126129

127130
# --- Implementations of the APIs ---
128131
def _provision(
129-
self,
130-
task: 'task_lib.Task',
131-
to_provision: Optional['resources.Resources'],
132-
dryrun: bool,
133-
stream_logs: bool,
134-
cluster_name: str,
135-
retry_until_up: bool = False) -> Optional[_ResourceHandleType]:
132+
self,
133+
task: 'task_lib.Task',
134+
to_provision: Optional['resources.Resources'],
135+
dryrun: bool,
136+
stream_logs: bool,
137+
cluster_name: str,
138+
retry_until_up: bool = False,
139+
skip_if_config_hash_matches: Optional[str] = None
140+
) -> Optional[_ResourceHandleType]:
136141
raise NotImplementedError
137142

138143
def _sync_workdir(self, handle: _ResourceHandleType, workdir: Path) -> None:

sky/backends/cloud_vm_ray_backend.py

+38-13
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,7 @@ def _retry_zones(
13141314
prev_cluster_status: Optional[status_lib.ClusterStatus],
13151315
prev_handle: Optional['CloudVmRayResourceHandle'],
13161316
prev_cluster_ever_up: bool,
1317+
skip_if_config_hash_matches: Optional[str],
13171318
) -> Dict[str, Any]:
13181319
"""The provision retry loop."""
13191320
# Get log_path name
@@ -1424,8 +1425,16 @@ def _retry_zones(
14241425
raise exceptions.ResourcesUnavailableError(
14251426
f'Failed to provision on cloud {to_provision.cloud} due to '
14261427
f'invalid cloud config: {common_utils.format_exception(e)}')
1428+
1429+
if skip_if_config_hash_matches == config_dict['config_hash']:
1430+
logger.info(
1431+
'Skipping provisioning of cluster with matching config hash.'
1432+
)
1433+
return config_dict
1434+
14271435
if dryrun:
14281436
return config_dict
1437+
14291438
cluster_config_file = config_dict['ray']
14301439

14311440
launched_resources = to_provision.copy(region=region.name)
@@ -1937,6 +1946,7 @@ def provision_with_retries(
19371946
to_provision_config: ToProvisionConfig,
19381947
dryrun: bool,
19391948
stream_logs: bool,
1949+
skip_if_config_hash_matches: Optional[str],
19401950
) -> Dict[str, Any]:
19411951
"""Provision with retries for all launchable resources."""
19421952
cluster_name = to_provision_config.cluster_name
@@ -1986,7 +1996,8 @@ def provision_with_retries(
19861996
cloud_user_identity=cloud_user,
19871997
prev_cluster_status=prev_cluster_status,
19881998
prev_handle=prev_handle,
1989-
prev_cluster_ever_up=prev_cluster_ever_up)
1999+
prev_cluster_ever_up=prev_cluster_ever_up,
2000+
skip_if_config_hash_matches=skip_if_config_hash_matches)
19902001
if dryrun:
19912002
return config_dict
19922003
except (exceptions.InvalidClusterNameError,
@@ -2687,13 +2698,15 @@ def check_resources_fit_cluster(
26872698
return valid_resource
26882699

26892700
def _provision(
2690-
self,
2691-
task: task_lib.Task,
2692-
to_provision: Optional[resources_lib.Resources],
2693-
dryrun: bool,
2694-
stream_logs: bool,
2695-
cluster_name: str,
2696-
retry_until_up: bool = False) -> Optional[CloudVmRayResourceHandle]:
2701+
self,
2702+
task: task_lib.Task,
2703+
to_provision: Optional[resources_lib.Resources],
2704+
dryrun: bool,
2705+
stream_logs: bool,
2706+
cluster_name: str,
2707+
retry_until_up: bool = False,
2708+
skip_if_config_hash_matches: Optional[str] = None
2709+
) -> Optional[CloudVmRayResourceHandle]:
26972710
"""Provisions using 'ray up'.
26982711
26992712
Raises:
@@ -2779,7 +2792,8 @@ def _provision(
27792792
rich_utils.force_update_status(
27802793
ux_utils.spinner_message('Launching', log_path))
27812794
config_dict = retry_provisioner.provision_with_retries(
2782-
task, to_provision_config, dryrun, stream_logs)
2795+
task, to_provision_config, dryrun, stream_logs,
2796+
skip_if_config_hash_matches)
27832797
break
27842798
except exceptions.ResourcesUnavailableError as e:
27852799
# Do not remove the stopped cluster from the global state
@@ -2829,6 +2843,15 @@ def _provision(
28292843
record = global_user_state.get_cluster_from_name(cluster_name)
28302844
return record['handle'] if record is not None else None
28312845

2846+
config_hash = config_dict['config_hash']
2847+
2848+
if skip_if_config_hash_matches is not None:
2849+
record = global_user_state.get_cluster_from_name(cluster_name)
2850+
if (record is not None and skip_if_config_hash_matches ==
2851+
config_hash == record['config_hash']):
2852+
logger.info('skip remaining')
2853+
return record['handle']
2854+
28322855
if 'provision_record' in config_dict:
28332856
# New provisioner is used here.
28342857
handle = config_dict['handle']
@@ -2868,7 +2891,7 @@ def _provision(
28682891
self._update_after_cluster_provisioned(
28692892
handle, to_provision_config.prev_handle, task,
28702893
prev_cluster_status, handle.external_ips(),
2871-
handle.external_ssh_ports(), lock_path)
2894+
handle.external_ssh_ports(), lock_path, config_hash)
28722895
return handle
28732896

28742897
cluster_config_file = config_dict['ray']
@@ -2940,7 +2963,8 @@ def _get_zone(runner):
29402963

29412964
self._update_after_cluster_provisioned(
29422965
handle, to_provision_config.prev_handle, task,
2943-
prev_cluster_status, ip_list, ssh_port_list, lock_path)
2966+
prev_cluster_status, ip_list, ssh_port_list, lock_path,
2967+
config_hash)
29442968
return handle
29452969

29462970
def _open_ports(self, handle: CloudVmRayResourceHandle) -> None:
@@ -2958,8 +2982,8 @@ def _update_after_cluster_provisioned(
29582982
prev_handle: Optional[CloudVmRayResourceHandle],
29592983
task: task_lib.Task,
29602984
prev_cluster_status: Optional[status_lib.ClusterStatus],
2961-
ip_list: List[str], ssh_port_list: List[int],
2962-
lock_path: str) -> None:
2985+
ip_list: List[str], ssh_port_list: List[int], lock_path: str,
2986+
config_hash: str) -> None:
29632987
usage_lib.messages.usage.update_cluster_resources(
29642988
handle.launched_nodes, handle.launched_resources)
29652989
usage_lib.messages.usage.update_final_cluster_status(
@@ -3019,6 +3043,7 @@ def _update_after_cluster_provisioned(
30193043
handle,
30203044
set(task.resources),
30213045
ready=True,
3046+
config_hash=config_hash,
30223047
)
30233048
usage_lib.messages.usage.update_final_cluster_status(
30243049
status_lib.ClusterStatus.UP)

sky/backends/local_docker_backend.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,14 @@ def check_resources_fit_cluster(self, handle: 'LocalDockerResourceHandle',
131131
pass
132132

133133
def _provision(
134-
self,
135-
task: 'task_lib.Task',
136-
to_provision: Optional['resources.Resources'],
137-
dryrun: bool,
138-
stream_logs: bool,
139-
cluster_name: str,
140-
retry_until_up: bool = False
134+
self,
135+
task: 'task_lib.Task',
136+
to_provision: Optional['resources.Resources'],
137+
dryrun: bool,
138+
stream_logs: bool,
139+
cluster_name: str,
140+
retry_until_up: bool = False,
141+
skip_if_config_hash_matches: Optional[str] = None
141142
) -> Optional[LocalDockerResourceHandle]:
142143
"""Builds docker image for the task and returns cluster name as handle.
143144
@@ -153,6 +154,9 @@ def _provision(
153154
logger.warning(
154155
f'Retrying until up is not supported in backend: {self.NAME}. '
155156
'Ignored the flag.')
157+
if skip_if_config_matches is not None:
158+
logger.warning(f'Config hashing is not supported in backend: '
159+
f'{self.NAME}. Ignored skip_if_config_hash_matches.')
156160
if stream_logs:
157161
logger.info(
158162
'Streaming build logs is not supported in LocalDockerBackend. '

sky/execution.py

+33-11
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def _execute(
108108
idle_minutes_to_autostop: Optional[int] = None,
109109
no_setup: bool = False,
110110
clone_disk_from: Optional[str] = None,
111+
skip_unecessary_provisioning: bool = False,
111112
# Internal only:
112113
# pylint: disable=invalid-name
113114
_is_launched_by_jobs_controller: bool = False,
@@ -128,8 +129,9 @@ def _execute(
128129
Note that if errors occur during provisioning/data syncing/setting up,
129130
the cluster will not be torn down for debugging purposes.
130131
stream_logs: bool; whether to stream all tasks' outputs to the client.
131-
handle: Optional[backends.ResourceHandle]; if provided, execution will use
132-
an existing backend cluster handle instead of provisioning a new one.
132+
handle: Optional[backends.ResourceHandle]; if provided, execution will
133+
attempt to use an existing backend cluster handle instead of
134+
provisioning a new one.
133135
backend: Backend; backend to use for executing the tasks. Defaults to
134136
CloudVmRayBackend()
135137
retry_until_up: bool; whether to retry the provisioning until the cluster
@@ -150,6 +152,11 @@ def _execute(
150152
idle_minutes_to_autostop: int; if provided, the cluster will be set to
151153
autostop after this many minutes of idleness.
152154
no_setup: bool; whether to skip setup commands or not when (re-)launching.
155+
clone_disk_from: Optional[str]; if set, clone the disk from the specified
156+
cluster.
157+
skip_unecessary_provisioning: bool; if True, compare the calculated
158+
cluster config to the current cluster's config. If they match, shortcut
159+
provisioning even if we have Stage.PROVISION.
153160
154161
Returns:
155162
job_id: Optional[int]; the job ID of the submitted job. None if the
@@ -179,9 +186,13 @@ def _execute(
179186
f'{colorama.Style.RESET_ALL}')
180187

181188
cluster_exists = False
189+
existing_config_hash = None
182190
if cluster_name is not None:
183191
cluster_record = global_user_state.get_cluster_from_name(cluster_name)
184-
cluster_exists = cluster_record is not None
192+
if cluster_record is not None:
193+
cluster_exists = True
194+
if skip_unecessary_provisioning:
195+
existing_config_hash = cluster_record['config_hash']
185196
# TODO(woosuk): If the cluster exists, print a warning that
186197
# `cpus` and `memory` are not used as a job scheduling constraint,
187198
# unlike `gpus`.
@@ -279,13 +290,18 @@ def _execute(
279290

280291
try:
281292
if Stage.PROVISION in stages:
282-
if handle is None:
283-
handle = backend.provision(task,
284-
task.best_resources,
285-
dryrun=dryrun,
286-
stream_logs=stream_logs,
287-
cluster_name=cluster_name,
288-
retry_until_up=retry_until_up)
293+
assert handle is None or skip_unecessary_provisioning, (
294+
'Provisioning requested, but handle is already set. PROVISION '
295+
'should be excluded from stages or skip_unecessary_provisioning '
296+
'should be set. ')
297+
handle = backend.provision(
298+
task,
299+
task.best_resources,
300+
dryrun=dryrun,
301+
stream_logs=stream_logs,
302+
cluster_name=cluster_name,
303+
retry_until_up=retry_until_up,
304+
skip_if_config_hash_matches=existing_config_hash)
289305

290306
if handle is None:
291307
assert dryrun, ('If not dryrun, handle must be set or '
@@ -459,6 +475,7 @@ def launch(
459475

460476
handle = None
461477
stages = None
478+
skip_unecessary_provisioning = False
462479
# Check if cluster exists and we are doing fast provisioning
463480
if fast and cluster_name is not None:
464481
maybe_handle = global_user_state.get_handle_from_cluster_name(
@@ -472,14 +489,18 @@ def launch(
472489
check_cloud_vm_ray_backend=False,
473490
dryrun=dryrun)
474491
handle = maybe_handle
475-
# Get all stages
492+
logger.info('provision')
476493
stages = [
494+
# Provisioning will be skipped if the existing cluster
495+
# config hash matches the calculated one.
496+
Stage.PROVISION,
477497
Stage.SYNC_WORKDIR,
478498
Stage.SYNC_FILE_MOUNTS,
479499
Stage.PRE_EXEC,
480500
Stage.EXEC,
481501
Stage.DOWN,
482502
]
503+
skip_unecessary_provisioning = True
483504
except exceptions.ClusterNotUpError:
484505
# Proceed with normal provisioning
485506
pass
@@ -500,6 +521,7 @@ def launch(
500521
idle_minutes_to_autostop=idle_minutes_to_autostop,
501522
no_setup=no_setup,
502523
clone_disk_from=clone_disk_from,
524+
skip_unecessary_provisioning=skip_unecessary_provisioning,
503525
_is_launched_by_jobs_controller=_is_launched_by_jobs_controller,
504526
_is_launched_by_sky_serve_controller=
505527
_is_launched_by_sky_serve_controller,

0 commit comments

Comments
 (0)