ray-llm

Форк
0
222 строки · 8.5 Кб
1
import collections
2
from typing import Any, List, Optional, Sequence
3

4
import ray._private.usage.usage_lib
5
from ray import serve
6

7
from rayllm.backend.llm.embedding.embedding_engine import EmbeddingEngine
8
from rayllm.backend.llm.embedding.embedding_models import EmbeddingApp
9
from rayllm.backend.llm.trtllm.trtllm_models import TRTLLMApp
10
from rayllm.backend.llm.vllm.vllm_engine import VLLMEngine
11
from rayllm.backend.llm.vllm.vllm_models import VLLMApp
12
from rayllm.backend.server.app import RouterDeployment
13
from rayllm.backend.server.embedding.embedding_deployment import EmbeddingDeployment
14
from rayllm.backend.server.models import EngineType, LLMApp, RouterArgs, ScalingConfig
15
from rayllm.backend.server.plugins.deployment_base_client import DeploymentBaseClient
16
from rayllm.backend.server.plugins.execution_hooks import ExecutionHooks
17
from rayllm.backend.server.plugins.multi_query_client import MultiQueryClient
18
from rayllm.backend.server.trtllm.trtllm_deployment import TRTLLMDeployment
19
from rayllm.backend.server.utils import parse_args
20
from rayllm.backend.server.vllm.vllm_deployment import VLLMDeployment
21

22

23
def set_deployment_placement_options(
24
    deployment_config: dict, scaling_config: ScalingConfig
25
):
26
    scaling_config = scaling_config.as_air_scaling_config()
27
    deployment_config.setdefault("ray_actor_options", {})
28
    replica_actor_resources = {
29
        "CPU": deployment_config["ray_actor_options"].get("num_cpus", 1),
30
        "GPU": deployment_config["ray_actor_options"].get("num_gpus", 0),
31
        **deployment_config["ray_actor_options"].get("resources", {}),
32
    }
33
    if (
34
        "placement_group_bundles" in deployment_config
35
        or "placement_group_strategy" in deployment_config
36
    ):
37
        raise ValueError(
38
            "placement_group_bundles and placement_group_strategy must not be specified in deployment_config. "
39
            "Use scaling_config to configure replicaplacement group."
40
        )
41
    try:
42
        bundles = scaling_config.as_placement_group_factory().bundles
43
    except ValueError:
44
        # May happen if all bundles are empty.
45
        bundles = []
46
    deployment_config["placement_group_bundles"] = [replica_actor_resources] + bundles
47
    deployment_config["placement_group_strategy"] = scaling_config.placement_strategy
48
    return deployment_config
49

50

51
def _clean_deployment_name(dep_name: str):
52
    return dep_name.replace("/", "--").replace(".", "_")
53

54

55
def get_deployment_name(app: LLMApp, name_prefix: str):
56
    return _clean_deployment_name(name_prefix + app.model_id)
57

58

59
def get_serve_deployment_args(app: LLMApp, name_prefix: str):
60
    deployment_config = set_deployment_placement_options(
61
        app.deployment_config.copy(deep=True).dict(), app.scaling_config  # type: ignore
62
    )
63

64
    # Set the name of the deployment config to map to the model id
65
    deployment_config["name"] = get_deployment_name(app, name_prefix)
66
    return deployment_config
67

68

69
def _get_execution_hooks():
70
    hooks = ExecutionHooks()
71
    return hooks
72

73

74
def get_llm_base_client(
75
    llm_base_models: Optional[Sequence[LLMApp]] = None, deployment_kwargs=None
76
):
77
    if not llm_base_models:
78
        return None
79

80
    base_configs = {model.model_id: model for model in llm_base_models}
81

82
    if deployment_kwargs is None:
83
        deployment_kwargs = {}
84

85
    base_deployments = {}
86
    for m in llm_base_models:
87
        if m.engine_config.type == EngineType.VLLMEngine:
88
            deployment_kwargs.setdefault("engine_cls", VLLMEngine)
89
            base_deployments[m.model_id] = VLLMDeployment.options(
90
                **get_serve_deployment_args(m, name_prefix="VLLMDeployment:")
91
            ).bind(base_config=m, **deployment_kwargs)
92
        elif m.engine_config.type == EngineType.EmbeddingEngine:
93
            deployment_kwargs.setdefault("engine_cls", EmbeddingEngine)
94
            base_deployments[m.model_id] = EmbeddingDeployment.options(
95
                **get_serve_deployment_args(m, name_prefix="EmbeddingDeployment:")
96
            ).bind(base_config=m, **deployment_kwargs)
97
        elif m.engine_config.type == EngineType.TRTLLMEngine:
98
            num_gpus = m.scaling_config.num_workers
99
            path = "rayllm.backend.llm.trtllm.trtllm_mpi.create_server"
100
            runtime_env = {
101
                "mpi": {
102
                    "args": ["-n", f"{int(num_gpus)}"],
103
                    "worker_entry": path,
104
                }
105
            }
106
            ray_actor_options = {"num_gpus": num_gpus, "runtime_env": runtime_env}
107
            deployment_config = {"ray_actor_options": ray_actor_options}
108
            if m.deployment_config:
109
                deployment_config = m.deployment_config.dict()
110
                if "ray_actor_options" in deployment_config:
111
                    deployment_config["ray_actor_options"].update(ray_actor_options)
112
                else:
113
                    deployment_config["ray_actor_options"] = ray_actor_options
114
            deployment_config["name"] = get_deployment_name(
115
                m, name_prefix="TRTLLMDeployment:"
116
            )
117

118
            base_deployments[m.model_id] = TRTLLMDeployment.options(
119
                **deployment_config,
120
            ).bind(base_config=m, **deployment_kwargs)
121
        else:
122
            raise ValueError(f"Unknown engine type {m.engine_config.type}")
123

124
    return DeploymentBaseClient(base_deployments, base_configs)
125

126

127
def get_embedding_base_client(
128
    embedding_models: Optional[List[EmbeddingApp]] = None, deployment_kwargs=None
129
):
130
    if not embedding_models:
131
        return None
132

133
    embedding_base_configs = {model.model_id: model for model in embedding_models}
134
    if not deployment_kwargs:
135
        deployment_kwargs = dict(engine_cls=EmbeddingEngine)
136
    embedding_base_deployments = {
137
        m.model_id: EmbeddingDeployment.options(
138
            **get_serve_deployment_args(m, name_prefix="EmbeddingDeployment:")
139
        ).bind(base_config=m, **deployment_kwargs)
140
        for m in embedding_models
141
    }
142
    embedding_base_client = DeploymentBaseClient(
143
        embedding_base_deployments, embedding_base_configs, model_type="embedding"
144
    )
145
    return embedding_base_client
146

147

148
def router_deployment(
149
    llm_base_models: List[LLMApp],
150
    enable_duplicate_models=False,
151
):
152
    """Create a Router Deployment.
153

154
    Router Deployment will point to a Serve Deployment for each specified base model,
155
    and have a client to query each one.
156
    """
157
    if not enable_duplicate_models:
158
        ids = [
159
            model_deployment_config.engine_config.model_id
160
            for model_deployment_config in llm_base_models
161
        ]
162
        duplicate_models = {
163
            item for item, count in collections.Counter(ids).items() if count > 1
164
        }
165
        assert (
166
            not duplicate_models
167
        ), f"Found duplicate models {duplicate_models}. Please make sure all models have unique ids."
168

169
    hooks = _get_execution_hooks()
170

171
    llm_base_client = get_llm_base_client(llm_base_models)
172

173
    # Merged client
174
    merged_client = MultiQueryClient(llm_base_client, hooks=hooks)
175
    return RouterDeployment.bind(merged_client)
176

177

178
def router_application(args):
179
    ray._private.usage.usage_lib.record_library_usage("ray-llm")
180
    router_args = RouterArgs.parse_obj(args)
181
    vllm_apps = []
182
    embedding_apps = []
183
    trtllm_apps = []
184
    if router_args.models:
185
        ray._private.usage.usage_lib.record_library_usage("ray-llm-vllm")
186
        vllm_apps = parse_args(router_args.models, llm_app_cls=VLLMApp)
187
    if router_args.embedding_models:
188
        ray._private.usage.usage_lib.record_library_usage("ray-llm-embedding_models")
189
        embedding_apps = parse_args(
190
            router_args.embedding_models, llm_app_cls=EmbeddingApp
191
        )
192
    if router_args.trtllm_models:
193
        ray._private.usage.usage_lib.record_library_usage("ray-llm-tensorrt_llm")
194
        trtllm_apps = parse_args(router_args.trtllm_models, llm_app_cls=TRTLLMApp)
195
    return router_deployment(
196
        vllm_apps + embedding_apps + trtllm_apps, enable_duplicate_models=False
197
    )
198

199

200
def run(
201
    vllm_base_args: Optional[List[str]] = None,
202
    embedding_base_args: Optional[Any] = None,
203
    blocking: bool = False,
204
):
205
    """Run the LLM Server on the local Ray Cluster
206
    Args:
207
        models: The paths of the model yamls to deploy
208

209
    """
210
    assert (
211
        vllm_base_args or embedding_base_args
212
    ), "Neither vllm args or embedding args are provided."
213
    router_app = router_application(
214
        {"models": vllm_base_args, "embedding_models": embedding_base_args}
215
    )
216

217
    host = "0.0.0.0"
218

219
    serve.run(router_app, name="router", host=host, _blocking=blocking)
220

221
    deployment_address = f"http://{host}:8000"
222
    return deployment_address
223

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.