4
from pathlib import Path
5
from tempfile import TemporaryDirectory
6
from typing import Any, Dict, List
8
from tools.stats.upload_stats_lib import (
9
download_gha_artifacts,
10
download_s3_artifacts,
17
workflow_run_id: int, workflow_run_attempt: int
18
) -> List[Dict[str, Any]]:
19
with TemporaryDirectory() as temp_dir:
20
print("Using temporary directory:", temp_dir)
24
download_s3_artifacts("sccache-stats", workflow_run_id, workflow_run_attempt)
26
artifact_paths = download_gha_artifacts(
27
"sccache-stats", workflow_run_id, workflow_run_attempt
29
for path in artifact_paths:
33
for json_file in Path(".").glob("**/*.json"):
34
with open(json_file) as f:
35
stats_jsons.append(json.load(f))
39
if __name__ == "__main__":
40
parser = argparse.ArgumentParser(description="Upload test stats to Rockset")
45
help="id of the workflow to get artifacts from",
48
"--workflow-run-attempt",
51
help="which retry of the workflow this is",
53
args = parser.parse_args()
54
stats = get_sccache_stats(args.workflow_run_id, args.workflow_run_attempt)
55
upload_to_rockset("sccache_stats", stats)