sdadfadas

Форк
0
/
benchmark_migration.py 
244 строки · 8.9 Кб
1
# Licensed to the Apache Software Foundation (ASF) under one
2
# or more contributor license agreements.  See the NOTICE file
3
# distributed with this work for additional information
4
# regarding copyright ownership.  The ASF licenses this file
5
# to you under the Apache License, Version 2.0 (the
6
# "License"); you may not use this file except in compliance
7
# with the License.  You may obtain a copy of the License at
8
#
9
#   http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing,
12
# software distributed under the License is distributed on an
13
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
# KIND, either express or implied.  See the License for the
15
# specific language governing permissions and limitations
16
# under the License.
17
import importlib.util
18
import logging
19
import re
20
import time
21
from collections import defaultdict
22
from graphlib import TopologicalSorter
23
from inspect import getsource
24
from pathlib import Path
25
from types import ModuleType
26
from typing import Any
27

28
import click
29
from flask import current_app
30
from flask_appbuilder import Model
31
from flask_migrate import downgrade, upgrade
32
from progress.bar import ChargingBar
33
from sqlalchemy import create_engine, inspect
34
from sqlalchemy.ext.automap import automap_base
35

36
from superset import db
37
from superset.utils.mock_data import add_sample_rows
38

39
logger = logging.getLogger(__name__)
40

41

42
def import_migration_script(filepath: Path) -> ModuleType:
43
    """
44
    Import migration script as if it were a module.
45
    """
46
    spec = importlib.util.spec_from_file_location(filepath.stem, filepath)
47
    if spec:
48
        module = importlib.util.module_from_spec(spec)
49
        spec.loader.exec_module(module)  # type: ignore
50
        return module
51
    raise Exception(f"No module spec found in location: `{str(filepath)}`")
52

53

54
def extract_modified_tables(module: ModuleType) -> set[str]:
55
    """
56
    Extract the tables being modified by a migration script.
57

58
    This function uses a simple approach of looking at the source code of
59
    the migration script looking for patterns. It could be improved by
60
    actually traversing the AST.
61
    """
62

63
    tables: set[str] = set()
64
    for function in {"upgrade", "downgrade"}:
65
        source = getsource(getattr(module, function))
66
        tables.update(re.findall(r'alter_table\(\s*"(\w+?)"\s*\)', source, re.DOTALL))
67
        tables.update(re.findall(r'add_column\(\s*"(\w+?)"\s*,', source, re.DOTALL))
68
        tables.update(re.findall(r'drop_column\(\s*"(\w+?)"\s*,', source, re.DOTALL))
69

70
    return tables
71

72

73
def find_models(module: ModuleType) -> list[type[Model]]:
74
    """
75
    Find all models in a migration script.
76
    """
77
    models: list[type[Model]] = []
78
    tables = extract_modified_tables(module)
79

80
    # add models defined explicitly in the migration script
81
    queue = list(module.__dict__.values())
82
    while queue:
83
        obj = queue.pop()
84
        if hasattr(obj, "__tablename__"):
85
            tables.add(obj.__tablename__)
86
        elif isinstance(obj, list):
87
            queue.extend(obj)
88
        elif isinstance(obj, dict):
89
            queue.extend(obj.values())
90

91
    # build models by automapping the existing tables, instead of using current
92
    # code; this is needed for migrations that modify schemas (eg, add a column),
93
    # where the current model is out-of-sync with the existing table after a
94
    # downgrade
95
    sqlalchemy_uri = current_app.config["SQLALCHEMY_DATABASE_URI"]
96
    engine = create_engine(sqlalchemy_uri)
97
    Base = automap_base()
98
    Base.prepare(engine, reflect=True)
99
    seen = set()
100
    while tables:
101
        table = tables.pop()
102
        seen.add(table)
103
        try:
104
            model = getattr(Base.classes, table)
105
        except AttributeError:
106
            continue
107
        model.__tablename__ = table
108
        models.append(model)
109

110
        # add other models referenced in foreign keys
111
        inspector = inspect(model)
112
        for column in inspector.columns.values():
113
            for foreign_key in column.foreign_keys:
114
                table = foreign_key.column.table.name
115
                if table not in seen:
116
                    tables.add(table)
117

118
    # sort topologically so we can create entities in order and
119
    # maintain relationships (eg, create a database before creating
120
    # a slice)
121
    sorter: TopologicalSorter[Any] = TopologicalSorter()
122
    for model in models:
123
        inspector = inspect(model)
124
        dependent_tables: list[str] = []
125
        for column in inspector.columns.values():
126
            for foreign_key in column.foreign_keys:
127
                if foreign_key.column.table.name != model.__tablename__:
128
                    dependent_tables.append(foreign_key.column.table.name)
129
        sorter.add(model.__tablename__, *dependent_tables)
130
    order = list(sorter.static_order())
131
    models.sort(key=lambda model: order.index(model.__tablename__))
132

133
    return models
134

135

136
@click.command()
137
@click.argument("filepath")
138
@click.option("--limit", default=1000, help="Maximum number of entities.")
139
@click.option("--force", is_flag=True, help="Do not prompt for confirmation.")
140
@click.option("--no-auto-cleanup", is_flag=True, help="Do not remove created models.")
141
def main(
142
    filepath: str, limit: int = 1000, force: bool = False, no_auto_cleanup: bool = False
143
) -> None:
144
    auto_cleanup = not no_auto_cleanup
145
    print(f"Importing migration script: {filepath}")
146
    module = import_migration_script(Path(filepath))
147

148
    revision: str = getattr(module, "revision", "")
149
    down_revision: str = getattr(module, "down_revision", "")
150
    if not revision or not down_revision:
151
        raise Exception(
152
            "Not a valid migration script, couldn't find down_revision/revision"
153
        )
154

155
    print(f"Migration goes from {down_revision} to {revision}")
156
    current_revision = db.engine.execute(
157
        "SELECT version_num FROM alembic_version"
158
    ).scalar()
159
    print(f"Current version of the DB is {current_revision}")
160

161
    if current_revision != down_revision:
162
        if not force:
163
            click.confirm(
164
                "\nRunning benchmark will downgrade the Superset DB to "
165
                f"{down_revision} and upgrade to {revision} again. There may "
166
                "be data loss in downgrades. Continue?",
167
                abort=True,
168
            )
169
        downgrade(revision=down_revision)
170

171
    print("\nIdentifying models used in the migration:")
172
    models = find_models(module)
173
    model_rows: dict[type[Model], int] = {}
174
    for model in models:
175
        rows = db.session.query(model).count()
176
        print(f"- {model.__name__} ({rows} rows in table {model.__tablename__})")
177
        model_rows[model] = rows
178

179
    print("Benchmarking migration")
180
    results: dict[str, float] = {}
181
    start = time.time()
182
    upgrade(revision=revision)
183
    duration = time.time() - start
184
    results["Current"] = duration
185
    print(f"Migration on current DB took: {duration:.2f} seconds")
186

187
    min_entities = 10
188
    new_models: dict[type[Model], list[Model]] = defaultdict(list)
189
    while min_entities <= limit:
190
        downgrade(revision=down_revision)
191
        print(f"Running with at least {min_entities} entities of each model")
192
        for model in models:
193
            missing = min_entities - model_rows[model]
194
            if missing > 0:
195
                entities: list[Model] = []
196
                print(f"- Adding {missing} entities to the {model.__name__} model")
197
                bar = ChargingBar("Processing", max=missing)
198
                try:
199
                    for entity in add_sample_rows(model, missing):
200
                        entities.append(entity)
201
                        bar.next()
202
                except Exception:
203
                    db.session.rollback()
204
                    raise
205
                bar.finish()
206
                model_rows[model] = min_entities
207
                db.session.add_all(entities)
208
                db.session.commit()
209

210
                if auto_cleanup:
211
                    new_models[model].extend(entities)
212
        start = time.time()
213
        upgrade(revision=revision)
214
        duration = time.time() - start
215
        print(f"Migration for {min_entities}+ entities took: {duration:.2f} seconds")
216
        results[f"{min_entities}+"] = duration
217
        min_entities *= 10
218

219
    print("\nResults:\n")
220
    for label, duration in results.items():
221
        print(f"{label}: {duration:.2f} s")
222

223
    if auto_cleanup:
224
        print("Cleaning up DB")
225
        # delete in reverse order of creation to handle relationships
226
        for model, entities in list(new_models.items())[::-1]:
227
            db.session.query(model).filter(
228
                model.id.in_(entity.id for entity in entities)
229
            ).delete(synchronize_session=False)
230
        db.session.commit()
231

232
    if current_revision != revision and not force:
233
        click.confirm(f"\nRevert DB to {revision}?", abort=True)
234
        upgrade(revision=revision)
235
        print("Reverted")
236

237

238
if __name__ == "__main__":
239
    from superset.app import create_app
240

241
    app = create_app()
242
    with app.app_context():
243
        # pylint: disable=no-value-for-parameter
244
        main()
245

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.