7
from typing import Any, List, Union
9
import rockset # type: ignore[import]
11
from tools.stats.upload_stats_lib import upload_to_s3
14
def get_oncall_from_testfile(testfile: str) -> Union[List[str], None]:
15
path = f"test/{testfile}"
16
if not path.endswith(".py"):
18
# get oncall on test file
22
if line.startswith("# Owner(s): "):
23
possible_lists = re.findall(r"\[.*\]", line)
24
if len(possible_lists) > 1:
25
raise Exception("More than one list found")
26
elif len(possible_lists) == 0:
27
raise Exception("No oncalls found or file is badly formatted")
28
oncalls = ast.literal_eval(possible_lists[0])
30
except Exception as e:
32
return [f"module: {testfile.split('.')[0]}"]
34
return ["module: unmarked"]
38
def get_test_stat_aggregates(date: datetime.date) -> Any:
39
# Initialize the Rockset client with your API key
40
rockset_api_key = os.environ["ROCKSET_API_KEY"]
41
rockset_api_server = "api.rs2.usw2.rockset.com"
42
iso_date = date.isoformat()
43
rs = rockset.RocksetClient(host="api.usw2a1.rockset.com", api_key=rockset_api_key)
45
# Define the name of the Rockset collection and lambda function
46
collection_name = "commons"
47
lambda_function_name = "test_insights_per_daily_upload"
49
rockset.models.QueryParameter(name="startTime", type="string", value=iso_date)
51
api_response = rs.QueryLambdas.execute_query_lambda(
52
query_lambda=lambda_function_name,
53
version="692684fa5b37177f",
54
parameters=query_parameters,
56
for i in range(len(api_response["results"])):
57
oncalls = get_oncall_from_testfile(api_response["results"][i]["test_file"])
58
api_response["results"][i]["oncalls"] = oncalls
60
json.dumps(api_response["results"], indent=4, sort_keys=True, default=str)
64
if __name__ == "__main__":
65
parser = argparse.ArgumentParser(
66
description="Upload test stat aggregates to Rockset."
70
type=datetime.date.fromisoformat,
71
help="Date to upload test stat aggregates for (YYYY-MM-DD). Must be in the last 30 days",
74
args = parser.parse_args()
75
if args.date < datetime.datetime.now().date() - datetime.timedelta(days=30):
76
raise ValueError("date must be in the last 30 days")
77
data = get_test_stat_aggregates(date=args.date)
79
bucket_name="torchci-aggregated-stats",
80
key=f"test_data_aggregates/{str(args.date)}",