1
# Helper to get the id of the currently running job in a GitHub Actions
2
# workflow. GitHub does not provide this information to workflow runs, so we
3
# need to figure it out based on what they *do* provide.
14
from typing import Any, Callable, Dict, List, Optional, Tuple
15
from urllib.request import Request, urlopen
18
def parse_json_and_links(conn: Any) -> Tuple[Any, Dict[str, Dict[str, str]]]:
20
# Extract links which GH uses for pagination
21
# see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
22
if "Link" in conn.headers:
23
for elem in re.split(", *<", conn.headers["Link"]):
25
url, params_ = elem.split(";", 1)
28
url = urllib.parse.unquote(url.strip("<> "))
29
qparams = urllib.parse.parse_qs(params_.strip(), separator=";")
32
for k, v in qparams.items()
33
if type(v) is list and len(v) > 0
37
links[params["rel"]] = params
39
return json.load(conn), links
45
headers: Optional[Dict[str, str]] = None,
46
reader: Callable[[Any], Any] = lambda x: x.read(),
47
retries: Optional[int] = 3,
48
backoff_timeout: float = 0.5,
53
with urlopen(Request(url, headers=headers)) as conn:
55
except urllib.error.HTTPError as err:
56
if isinstance(retries, (int, float)) and retries > 0:
57
time.sleep(backoff_timeout)
63
backoff_timeout=backoff_timeout,
67
f"Recieved status code '{err.code}' when attempting to retrieve {url}:\n",
68
f"{err.reason}\n\nheaders={err.headers}",
70
raise RuntimeError(exception_message) from err
73
def parse_args() -> Any:
74
parser = argparse.ArgumentParser()
76
"workflow_run_id", help="The id of the workflow run, should be GITHUB_RUN_ID"
80
help="The name of the runner to retrieve the job id, should be RUNNER_NAME",
83
return parser.parse_args()
86
def fetch_jobs(url: str, headers: Dict[str, str]) -> List[Dict[str, str]]:
87
response, links = fetch_url(url, headers=headers, reader=parse_json_and_links)
88
jobs = response["jobs"]
89
assert type(jobs) is list
90
while "next" in links.keys():
91
response, links = fetch_url(
92
links["next"]["url"], headers=headers, reader=parse_json_and_links
94
jobs.extend(response["jobs"])
99
# Our strategy is to retrieve the parent workflow run, then filter its jobs on
100
# RUNNER_NAME to figure out which job we're currently running.
102
# Why RUNNER_NAME? Because it's the only thing that uniquely identifies a job within a workflow.
103
# GITHUB_JOB doesn't work, as it corresponds to the job yaml id
104
# (https://bit.ly/37e78oI), which has two problems:
105
# 1. It's not present in the workflow job JSON object, so we can't use it as a filter.
106
# 2. It isn't unique; for matrix jobs the job yaml id is the same for all jobs in the matrix.
108
# RUNNER_NAME on the other hand is unique across the pool of runners. Also,
109
# since only one job can be scheduled on a runner at a time, we know that
110
# looking for RUNNER_NAME will uniquely identify the job we're currently
114
def find_job_id_name(args: Any) -> Tuple[str, str]:
115
# From https://docs.github.com/en/actions/learn-github-actions/environment-variables
116
PYTORCH_REPO = os.environ.get("GITHUB_REPOSITORY", "pytorch/pytorch")
117
PYTORCH_GITHUB_API = f"https://api.github.com/repos/{PYTORCH_REPO}"
118
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
120
"Accept": "application/vnd.github.v3+json",
121
"Authorization": "token " + GITHUB_TOKEN,
124
url = f"{PYTORCH_GITHUB_API}/actions/runs/{args.workflow_run_id}/jobs?per_page=100"
125
jobs = fetch_jobs(url, REQUEST_HEADERS)
127
# Sort the jobs list by start time, in descending order. We want to get the most
128
# recently scheduled job on the runner.
129
jobs.sort(key=operator.itemgetter("started_at"), reverse=True)
132
if job["runner_name"] == args.runner_name:
133
return (job["id"], job["name"])
135
raise RuntimeError(f"Can't find job id for runner {args.runner_name}")
138
def set_output(name: str, val: Any) -> None:
139
if os.getenv("GITHUB_OUTPUT"):
140
with open(str(os.getenv("GITHUB_OUTPUT")), "a") as env:
141
print(f"{name}={val}", file=env)
142
print(f"setting {name}={val}")
144
print(f"::set-output name={name}::{val}")
150
# Get both the job ID and job name because we have already spent a request
151
# here to get the job info
152
job_id, job_name = find_job_id_name(args)
153
set_output("job-id", job_id)
154
set_output("job-name", job_name)
155
except Exception as e:
156
print(repr(e), file=sys.stderr)
157
print(f"workflow-{args.workflow_run_id}")
160
if __name__ == "__main__":