1
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
18
import paddle.distributed as dist
19
from paddle.base import core
20
from ppfleetx.utils.log import logger
23
def is_fused_matmul_bias_supported():
24
if paddle.is_compiled_with_cuda() and not paddle.is_compiled_with_rocm() or paddle.is_compiled_with_xpu():
25
return hasattr(core.eager.ops.legacy, "fused_gemm_epilogue")
30
def process_inference_configs(config):
32
process inference configs for hybrid parallel
34
if "Inference" not in config.keys():
37
configs = config["Inference"]
39
if configs["model_dir"] is None:
40
configs["model_dir"] = config["Engine"]["save_load"]["output_dir"]
42
if configs["mp_degree"] is None:
43
configs["mp_degree"] = config["Distributed"]["mp_degree"]
46
def process_model_configs(config):
48
process model configs for hybrid parallel
50
configs = config["Model"]
51
if configs["ffn_hidden_size"] is None:
52
configs["ffn_hidden_size"] = 4 * configs["hidden_size"]
54
if configs["use_recompute"]:
55
if not configs["recompute_granularity"]:
56
configs["recompute_granularity"] = "full"
57
if not configs["no_recompute_layers"]:
58
configs["no_recompute_layers"] = []
60
assert isinstance(configs["no_recompute_layers"], list), "no_recompute_layers should be a list"
61
for i in configs["no_recompute_layers"]:
62
assert isinstance(i, int), "all values in no_recompute_layers should be an integer"
63
assert min(configs["no_recompute_layers"]) >= 0, "the min value in no_recompute_layers should >= 0"
65
max(configs["no_recompute_layers"]) < configs["num_layers"]
66
), "the max value in no_recompute_layers should < num_layers"
67
configs["no_recompute_layers"] = sorted(list(set(configs["no_recompute_layers"])))
69
if configs["fused_linear"] and not is_fused_matmul_bias_supported():
70
configs["fused_linear"] = False
72
"The flag fused_linear only valid for cuda version higher than 11.6, "
73
"but the paddle is compiled with cuda " + paddle.version.cuda() + ", or you can use xpu version."
76
pp_degree = config.Distributed.pp_degree
79
configs["virtual_pp_degree"] = (
80
1 if configs.get("virtual_pp_degree", None) is None else configs["virtual_pp_degree"]
82
virtual_pp_degree = configs["virtual_pp_degree"]
83
num_layers = configs.num_layers
85
if not (num_layers % (virtual_pp_degree * pp_degree)) == 0:
86
assert virtual_pp_degree == 1, "virtual pp doesn't support uneven layer split."
88
"The num_layers of the model is not divisible by pp_degree."
89
"Receive num_layers: {}, pp_degree: {}.".format(num_layers, pp_degree)
92
assert (num_layers % (virtual_pp_degree * pp_degree)) == 0, (
93
"The num_layers of the model should be divisible of pp_degree * virtual_pp_degree."
94
"Receive num_layers: {}, pp_degree: {}, virtual_pp_degree: {}.".format(
95
num_layers, pp_degree, virtual_pp_degree
99
if virtual_pp_degree > 1:
100
local_batch_size = config.Global.local_batch_size
101
micro_batch_size = config.Global.micro_batch_size
102
acc_steps = local_batch_size // micro_batch_size
104
acc_steps % pp_degree == 0
105
), "num of microbatches {} should be divisible of pp_degree {} when " "using interleave pipeline".format(
109
if virtual_pp_degree > 2:
110
logger.warning("Setting virtual_pp_degree > 2 may harm the throughput of the pipeline parallel.")
112
if configs.get("virtual_pp_degree", None):
113
logger.warning("virtual_pp_degree is unuseful.")
116
def process_optim_configs(config):
118
process optim configs for hybrid parallel
120
config["Optimizer"]["multi_precision"] = config["Engine"]["mix_precision"]["enable"]
122
nranks = dist.get_world_size()
123
dp_degree = config["Distributed"]["dp_degree"]
124
sharding_degree = config["Distributed"]["sharding"]["sharding_degree"]
125
if config["Optimizer"].get("tensor_fusion", None):
127
nranks == dp_degree * sharding_degree
128
), "tensor_fusion only support single card train or data/sharding parallel train"
130
if config["Optimizer"]["lr"]["decay_steps"] is None:
131
config["Optimizer"]["lr"]["decay_steps"] = config["Engine"]["max_steps"]
132
config["Optimizer"]["lr"]["decay_steps"] *= config["Global"]["global_batch_size"]
135
def process_data_configs(config):
137
process data configs for hybrid parallel
139
cfg_global = config["Global"]
140
cfg_data = config["Data"]
142
mode_to_num_samples = {
143
"Train": cfg_global["global_batch_size"] * config["Engine"]["max_steps"],
144
"Eval": cfg_global["global_batch_size"]
145
* (config["Engine"]["max_steps"] // config["Engine"]["eval_freq"] + 1)
146
* config["Engine"]["eval_iters"],
147
"Test": cfg_global["global_batch_size"] * config["Engine"]["test_iters"],
150
for mode in ("Train", "Eval", "Test"):
151
if mode in cfg_data.keys():
152
cfg_data[mode]["dataset"]["num_samples"] = mode_to_num_samples[mode]
153
cfg_data[mode]["dataset"]["mode"] = mode
154
cfg_data[mode]["dataset"]["seed"] = cfg_global["seed"]
155
cfg_data[mode]["dataset"]["model_type"] = config["Model"]["name"]
156
cfg_data[mode]["sampler"]["batch_size"] = cfg_global["local_batch_size"]
159
def process_configs(config):
160
process_data_configs(config)
161
process_model_configs(config)
162
process_optim_configs(config)
163
process_inference_configs(config)