|
| 1 | +""" |
| 2 | +This script generates a Buildkite pipeline from test files. |
| 3 | +
|
| 4 | +The script will generate two pipelines: |
| 5 | +
|
| 6 | +tests/smoke_tests |
| 7 | +├── test_*.py -> release pipeline |
| 8 | +├── test_quick_tests_core.py -> run quick tests on PR before merging |
| 9 | +
|
| 10 | +run `PYTHONPATH=$(pwd)/tests:$PYTHONPATH python .buildkite/generate_pipeline.py` |
| 11 | +to generate the pipeline for testing. The CI will run this script as a pre-step, |
| 12 | +and use the generated pipeline to run the tests. |
| 13 | +
|
| 14 | +1. release pipeline, which runs all smoke tests by default, generates all |
| 15 | + smoke tests for all clouds. |
| 16 | +2. pre-merge pipeline, which generates all smoke tests for all clouds, |
| 17 | + author should specify which clouds to run by setting env in the step. |
| 18 | +
|
| 19 | +We only have credentials for aws/azure/gcp/kubernetes(CLOUD_QUEUE_MAP and |
| 20 | +SERVE_CLOUD_QUEUE_MAP) now, smoke tests for those clouds are generated, other |
| 21 | +clouds are not supported yet, smoke tests for those clouds are not generated. |
| 22 | +""" |
| 23 | + |
| 24 | +import ast |
| 25 | +import os |
| 26 | +import random |
| 27 | +from typing import Any, Dict, List, Optional |
| 28 | + |
| 29 | +from conftest import cloud_to_pytest_keyword |
| 30 | +from conftest import default_clouds_to_run |
| 31 | +import yaml |
| 32 | + |
| 33 | +DEFAULT_CLOUDS_TO_RUN = default_clouds_to_run |
| 34 | +PYTEST_TO_CLOUD_KEYWORD = {v: k for k, v in cloud_to_pytest_keyword.items()} |
| 35 | + |
| 36 | +QUEUE_GENERIC_CLOUD = 'generic_cloud' |
| 37 | +QUEUE_GENERIC_CLOUD_SERVE = 'generic_cloud_serve' |
| 38 | +QUEUE_KUBERNETES = 'kubernetes' |
| 39 | +QUEUE_KUBERNETES_SERVE = 'kubernetes_serve' |
| 40 | +# Only aws, gcp, azure, and kubernetes are supported for now. |
| 41 | +# Other clouds do not have credentials. |
| 42 | +CLOUD_QUEUE_MAP = { |
| 43 | + 'aws': QUEUE_GENERIC_CLOUD, |
| 44 | + 'gcp': QUEUE_GENERIC_CLOUD, |
| 45 | + 'azure': QUEUE_GENERIC_CLOUD, |
| 46 | + 'kubernetes': QUEUE_KUBERNETES |
| 47 | +} |
| 48 | +# Serve tests runs long, and different test steps usually requires locks. |
| 49 | +# Its highly likely to fail if multiple serve tests are running concurrently. |
| 50 | +# So we use a different queue that runs only one concurrent test at a time. |
| 51 | +SERVE_CLOUD_QUEUE_MAP = { |
| 52 | + 'aws': QUEUE_GENERIC_CLOUD_SERVE, |
| 53 | + 'gcp': QUEUE_GENERIC_CLOUD_SERVE, |
| 54 | + 'azure': QUEUE_GENERIC_CLOUD_SERVE, |
| 55 | + 'kubernetes': QUEUE_KUBERNETES_SERVE |
| 56 | +} |
| 57 | + |
| 58 | +GENERATED_FILE_HEAD = ('# This is an auto-generated Buildkite pipeline by ' |
| 59 | + '.buildkite/generate_pipeline.py, Please do not ' |
| 60 | + 'edit directly.\n') |
| 61 | + |
| 62 | + |
| 63 | +def _get_full_decorator_path(decorator: ast.AST) -> str: |
| 64 | + """Recursively get the full path of a decorator.""" |
| 65 | + if isinstance(decorator, ast.Attribute): |
| 66 | + return f'{_get_full_decorator_path(decorator.value)}.{decorator.attr}' |
| 67 | + elif isinstance(decorator, ast.Name): |
| 68 | + return decorator.id |
| 69 | + elif isinstance(decorator, ast.Call): |
| 70 | + return _get_full_decorator_path(decorator.func) |
| 71 | + raise ValueError(f'Unknown decorator type: {type(decorator)}') |
| 72 | + |
| 73 | + |
| 74 | +def _extract_marked_tests(file_path: str) -> Dict[str, List[str]]: |
| 75 | + """Extract test functions and filter clouds using pytest.mark |
| 76 | + from a Python test file. |
| 77 | +
|
| 78 | + We separate each test_function_{cloud} into different pipeline steps |
| 79 | + to maximize the parallelism of the tests via the buildkite CI job queue. |
| 80 | + This allows us to visualize the test results and rerun failures at the |
| 81 | + granularity of each test_function_{cloud}. |
| 82 | +
|
| 83 | + If we make pytest --serve a job, it could contain dozens of test_functions |
| 84 | + and run for hours. This makes it hard to visualize the test results and |
| 85 | + rerun failures. Additionally, the parallelism would be controlled by pytest |
| 86 | + instead of the buildkite job queue. |
| 87 | + """ |
| 88 | + with open(file_path, 'r', encoding='utf-8') as file: |
| 89 | + tree = ast.parse(file.read(), filename=file_path) |
| 90 | + |
| 91 | + for node in ast.walk(tree): |
| 92 | + for child in ast.iter_child_nodes(node): |
| 93 | + setattr(child, 'parent', node) |
| 94 | + |
| 95 | + function_cloud_map = {} |
| 96 | + for node in ast.walk(tree): |
| 97 | + if isinstance(node, ast.FunctionDef) and node.name.startswith('test_'): |
| 98 | + class_name = None |
| 99 | + if hasattr(node, 'parent') and isinstance(node.parent, |
| 100 | + ast.ClassDef): |
| 101 | + class_name = node.parent.name |
| 102 | + |
| 103 | + clouds_to_include = [] |
| 104 | + clouds_to_exclude = [] |
| 105 | + is_serve_test = False |
| 106 | + for decorator in node.decorator_list: |
| 107 | + if isinstance(decorator, ast.Call): |
| 108 | + # We only need to consider the decorator with no arguments |
| 109 | + # to extract clouds. |
| 110 | + continue |
| 111 | + full_path = _get_full_decorator_path(decorator) |
| 112 | + if full_path.startswith('pytest.mark.'): |
| 113 | + assert isinstance(decorator, ast.Attribute) |
| 114 | + suffix = decorator.attr |
| 115 | + if suffix.startswith('no_'): |
| 116 | + clouds_to_exclude.append(suffix[3:]) |
| 117 | + else: |
| 118 | + if suffix == 'serve': |
| 119 | + is_serve_test = True |
| 120 | + continue |
| 121 | + if suffix not in PYTEST_TO_CLOUD_KEYWORD: |
| 122 | + # This mark does not specify a cloud, so we skip it. |
| 123 | + continue |
| 124 | + clouds_to_include.append( |
| 125 | + PYTEST_TO_CLOUD_KEYWORD[suffix]) |
| 126 | + clouds_to_include = (clouds_to_include if clouds_to_include else |
| 127 | + DEFAULT_CLOUDS_TO_RUN) |
| 128 | + clouds_to_include = [ |
| 129 | + cloud for cloud in clouds_to_include |
| 130 | + if cloud not in clouds_to_exclude |
| 131 | + ] |
| 132 | + cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP |
| 133 | + final_clouds_to_include = [ |
| 134 | + cloud for cloud in clouds_to_include if cloud in cloud_queue_map |
| 135 | + ] |
| 136 | + if clouds_to_include and not final_clouds_to_include: |
| 137 | + print(f'Warning: {file_path}:{node.name} ' |
| 138 | + f'is marked to run on {clouds_to_include}, ' |
| 139 | + f'but we do not have credentials for those clouds. ' |
| 140 | + f'Skipped.') |
| 141 | + continue |
| 142 | + if clouds_to_include != final_clouds_to_include: |
| 143 | + excluded_clouds = set(clouds_to_include) - set( |
| 144 | + final_clouds_to_include) |
| 145 | + print( |
| 146 | + f'Warning: {file_path}:{node.name} ' |
| 147 | + f'is marked to run on {clouds_to_include}, ' |
| 148 | + f'but we only have credentials for {final_clouds_to_include}. ' |
| 149 | + f'clouds {excluded_clouds} are skipped.') |
| 150 | + function_name = (f'{class_name}::{node.name}' |
| 151 | + if class_name else node.name) |
| 152 | + function_cloud_map[function_name] = (final_clouds_to_include, [ |
| 153 | + cloud_queue_map[cloud] for cloud in final_clouds_to_include |
| 154 | + ]) |
| 155 | + return function_cloud_map |
| 156 | + |
| 157 | + |
| 158 | +def _generate_pipeline(test_file: str) -> Dict[str, Any]: |
| 159 | + """Generate a Buildkite pipeline from test files.""" |
| 160 | + steps = [] |
| 161 | + function_cloud_map = _extract_marked_tests(test_file) |
| 162 | + for test_function, clouds_and_queues in function_cloud_map.items(): |
| 163 | + for cloud, queue in zip(*clouds_and_queues): |
| 164 | + step = { |
| 165 | + 'label': f'{test_function} on {cloud}', |
| 166 | + 'command': f'pytest {test_file}::{test_function} --{cloud}', |
| 167 | + 'agents': { |
| 168 | + # Separate agent pool for each cloud. |
| 169 | + # Since they require different amount of resources and |
| 170 | + # concurrency control. |
| 171 | + 'queue': queue |
| 172 | + }, |
| 173 | + 'if': f'build.env("{cloud}") == "1"' |
| 174 | + } |
| 175 | + steps.append(step) |
| 176 | + return {'steps': steps} |
| 177 | + |
| 178 | + |
| 179 | +def _dump_pipeline_to_file(yaml_file_path: str, |
| 180 | + pipelines: List[Dict[str, Any]], |
| 181 | + extra_env: Optional[Dict[str, str]] = None): |
| 182 | + default_env = {'LOG_TO_STDOUT': '1', 'PYTHONPATH': '${PYTHONPATH}:$(pwd)'} |
| 183 | + if extra_env: |
| 184 | + default_env.update(extra_env) |
| 185 | + with open(yaml_file_path, 'w', encoding='utf-8') as file: |
| 186 | + file.write(GENERATED_FILE_HEAD) |
| 187 | + all_steps = [] |
| 188 | + for pipeline in pipelines: |
| 189 | + all_steps.extend(pipeline['steps']) |
| 190 | + # Shuffle the steps to avoid flakyness, consecutive runs of the same |
| 191 | + # kind of test may fail for requiring locks on the same resources. |
| 192 | + random.shuffle(all_steps) |
| 193 | + final_pipeline = {'steps': all_steps, 'env': default_env} |
| 194 | + yaml.dump(final_pipeline, file, default_flow_style=False) |
| 195 | + |
| 196 | + |
| 197 | +def _convert_release(test_files: List[str]): |
| 198 | + yaml_file_path = '.buildkite/pipeline_smoke_tests_release.yaml' |
| 199 | + output_file_pipelines = [] |
| 200 | + for test_file in test_files: |
| 201 | + print(f'Converting {test_file} to {yaml_file_path}') |
| 202 | + pipeline = _generate_pipeline(test_file) |
| 203 | + output_file_pipelines.append(pipeline) |
| 204 | + print(f'Converted {test_file} to {yaml_file_path}\n\n') |
| 205 | + # Enable all clouds by default for release pipeline. |
| 206 | + _dump_pipeline_to_file(yaml_file_path, |
| 207 | + output_file_pipelines, |
| 208 | + extra_env={cloud: '1' for cloud in CLOUD_QUEUE_MAP}) |
| 209 | + |
| 210 | + |
| 211 | +def _convert_quick_tests_core(test_files: List[str]): |
| 212 | + yaml_file_path = '.buildkite/pipeline_smoke_tests_quick_tests_core.yaml' |
| 213 | + output_file_pipelines = [] |
| 214 | + for test_file in test_files: |
| 215 | + print(f'Converting {test_file} to {yaml_file_path}') |
| 216 | + # We want enable all clouds by default for each test function |
| 217 | + # for pre-merge. And let the author controls which clouds |
| 218 | + # to run by parameter. |
| 219 | + pipeline = _generate_pipeline(test_file) |
| 220 | + pipeline['steps'].append({ |
| 221 | + 'label': 'Backward compatibility test', |
| 222 | + 'command': 'bash tests/backward_compatibility_tests.sh', |
| 223 | + 'agents': { |
| 224 | + 'queue': 'back_compat' |
| 225 | + } |
| 226 | + }) |
| 227 | + output_file_pipelines.append(pipeline) |
| 228 | + print(f'Converted {test_file} to {yaml_file_path}\n\n') |
| 229 | + _dump_pipeline_to_file(yaml_file_path, |
| 230 | + output_file_pipelines, |
| 231 | + extra_env={'SKYPILOT_SUPPRESS_SENSITIVE_LOG': '1'}) |
| 232 | + |
| 233 | + |
| 234 | +def main(): |
| 235 | + test_files = os.listdir('tests/smoke_tests') |
| 236 | + release_files = [] |
| 237 | + quick_tests_core_files = [] |
| 238 | + for test_file in test_files: |
| 239 | + if not test_file.startswith('test_'): |
| 240 | + continue |
| 241 | + test_file_path = os.path.join('tests/smoke_tests', test_file) |
| 242 | + if "test_quick_tests_core" in test_file: |
| 243 | + quick_tests_core_files.append(test_file_path) |
| 244 | + else: |
| 245 | + release_files.append(test_file_path) |
| 246 | + |
| 247 | + _convert_release(release_files) |
| 248 | + _convert_quick_tests_core(quick_tests_core_files) |
| 249 | + |
| 250 | + |
| 251 | +if __name__ == '__main__': |
| 252 | + main() |
0 commit comments