2
from typing import Any, List, Optional, Sequence
4
import ray._private.usage.usage_lib
7
from rayllm.backend.llm.embedding.embedding_engine import EmbeddingEngine
8
from rayllm.backend.llm.embedding.embedding_models import EmbeddingApp
9
from rayllm.backend.llm.trtllm.trtllm_models import TRTLLMApp
10
from rayllm.backend.llm.vllm.vllm_engine import VLLMEngine
11
from rayllm.backend.llm.vllm.vllm_models import VLLMApp
12
from rayllm.backend.server.app import RouterDeployment
13
from rayllm.backend.server.embedding.embedding_deployment import EmbeddingDeployment
14
from rayllm.backend.server.models import EngineType, LLMApp, RouterArgs, ScalingConfig
15
from rayllm.backend.server.plugins.deployment_base_client import DeploymentBaseClient
16
from rayllm.backend.server.plugins.execution_hooks import ExecutionHooks
17
from rayllm.backend.server.plugins.multi_query_client import MultiQueryClient
18
from rayllm.backend.server.trtllm.trtllm_deployment import TRTLLMDeployment
19
from rayllm.backend.server.utils import parse_args
20
from rayllm.backend.server.vllm.vllm_deployment import VLLMDeployment
23
def set_deployment_placement_options(
24
deployment_config: dict, scaling_config: ScalingConfig
26
scaling_config = scaling_config.as_air_scaling_config()
27
deployment_config.setdefault("ray_actor_options", {})
28
replica_actor_resources = {
29
"CPU": deployment_config["ray_actor_options"].get("num_cpus", 1),
30
"GPU": deployment_config["ray_actor_options"].get("num_gpus", 0),
31
**deployment_config["ray_actor_options"].get("resources", {}),
34
"placement_group_bundles" in deployment_config
35
or "placement_group_strategy" in deployment_config
38
"placement_group_bundles and placement_group_strategy must not be specified in deployment_config. "
39
"Use scaling_config to configure replicaplacement group."
42
bundles = scaling_config.as_placement_group_factory().bundles
44
# May happen if all bundles are empty.
46
deployment_config["placement_group_bundles"] = [replica_actor_resources] + bundles
47
deployment_config["placement_group_strategy"] = scaling_config.placement_strategy
48
return deployment_config
51
def _clean_deployment_name(dep_name: str):
52
return dep_name.replace("/", "--").replace(".", "_")
55
def get_deployment_name(app: LLMApp, name_prefix: str):
56
return _clean_deployment_name(name_prefix + app.model_id)
59
def get_serve_deployment_args(app: LLMApp, name_prefix: str):
60
deployment_config = set_deployment_placement_options(
61
app.deployment_config.copy(deep=True).dict(), app.scaling_config # type: ignore
64
# Set the name of the deployment config to map to the model id
65
deployment_config["name"] = get_deployment_name(app, name_prefix)
66
return deployment_config
69
def _get_execution_hooks():
70
hooks = ExecutionHooks()
74
def get_llm_base_client(
75
llm_base_models: Optional[Sequence[LLMApp]] = None, deployment_kwargs=None
77
if not llm_base_models:
80
base_configs = {model.model_id: model for model in llm_base_models}
82
if deployment_kwargs is None:
83
deployment_kwargs = {}
86
for m in llm_base_models:
87
if m.engine_config.type == EngineType.VLLMEngine:
88
deployment_kwargs.setdefault("engine_cls", VLLMEngine)
89
base_deployments[m.model_id] = VLLMDeployment.options(
90
**get_serve_deployment_args(m, name_prefix="VLLMDeployment:")
91
).bind(base_config=m, **deployment_kwargs)
92
elif m.engine_config.type == EngineType.EmbeddingEngine:
93
deployment_kwargs.setdefault("engine_cls", EmbeddingEngine)
94
base_deployments[m.model_id] = EmbeddingDeployment.options(
95
**get_serve_deployment_args(m, name_prefix="EmbeddingDeployment:")
96
).bind(base_config=m, **deployment_kwargs)
97
elif m.engine_config.type == EngineType.TRTLLMEngine:
98
num_gpus = m.scaling_config.num_workers
99
path = "rayllm.backend.llm.trtllm.trtllm_mpi.create_server"
102
"args": ["-n", f"{int(num_gpus)}"],
103
"worker_entry": path,
106
ray_actor_options = {"num_gpus": num_gpus, "runtime_env": runtime_env}
107
deployment_config = {"ray_actor_options": ray_actor_options}
108
if m.deployment_config:
109
deployment_config = m.deployment_config.dict()
110
if "ray_actor_options" in deployment_config:
111
deployment_config["ray_actor_options"].update(ray_actor_options)
113
deployment_config["ray_actor_options"] = ray_actor_options
114
deployment_config["name"] = get_deployment_name(
115
m, name_prefix="TRTLLMDeployment:"
118
base_deployments[m.model_id] = TRTLLMDeployment.options(
120
).bind(base_config=m, **deployment_kwargs)
122
raise ValueError(f"Unknown engine type {m.engine_config.type}")
124
return DeploymentBaseClient(base_deployments, base_configs)
127
def get_embedding_base_client(
128
embedding_models: Optional[List[EmbeddingApp]] = None, deployment_kwargs=None
130
if not embedding_models:
133
embedding_base_configs = {model.model_id: model for model in embedding_models}
134
if not deployment_kwargs:
135
deployment_kwargs = dict(engine_cls=EmbeddingEngine)
136
embedding_base_deployments = {
137
m.model_id: EmbeddingDeployment.options(
138
**get_serve_deployment_args(m, name_prefix="EmbeddingDeployment:")
139
).bind(base_config=m, **deployment_kwargs)
140
for m in embedding_models
142
embedding_base_client = DeploymentBaseClient(
143
embedding_base_deployments, embedding_base_configs, model_type="embedding"
145
return embedding_base_client
148
def router_deployment(
149
llm_base_models: List[LLMApp],
150
enable_duplicate_models=False,
152
"""Create a Router Deployment.
154
Router Deployment will point to a Serve Deployment for each specified base model,
155
and have a client to query each one.
157
if not enable_duplicate_models:
159
model_deployment_config.engine_config.model_id
160
for model_deployment_config in llm_base_models
163
item for item, count in collections.Counter(ids).items() if count > 1
167
), f"Found duplicate models {duplicate_models}. Please make sure all models have unique ids."
169
hooks = _get_execution_hooks()
171
llm_base_client = get_llm_base_client(llm_base_models)
174
merged_client = MultiQueryClient(llm_base_client, hooks=hooks)
175
return RouterDeployment.bind(merged_client)
178
def router_application(args):
179
ray._private.usage.usage_lib.record_library_usage("ray-llm")
180
router_args = RouterArgs.parse_obj(args)
184
if router_args.models:
185
ray._private.usage.usage_lib.record_library_usage("ray-llm-vllm")
186
vllm_apps = parse_args(router_args.models, llm_app_cls=VLLMApp)
187
if router_args.embedding_models:
188
ray._private.usage.usage_lib.record_library_usage("ray-llm-embedding_models")
189
embedding_apps = parse_args(
190
router_args.embedding_models, llm_app_cls=EmbeddingApp
192
if router_args.trtllm_models:
193
ray._private.usage.usage_lib.record_library_usage("ray-llm-tensorrt_llm")
194
trtllm_apps = parse_args(router_args.trtllm_models, llm_app_cls=TRTLLMApp)
195
return router_deployment(
196
vllm_apps + embedding_apps + trtllm_apps, enable_duplicate_models=False
201
vllm_base_args: Optional[List[str]] = None,
202
embedding_base_args: Optional[Any] = None,
203
blocking: bool = False,
205
"""Run the LLM Server on the local Ray Cluster
207
models: The paths of the model yamls to deploy
211
vllm_base_args or embedding_base_args
212
), "Neither vllm args or embedding args are provided."
213
router_app = router_application(
214
{"models": vllm_base_args, "embedding_models": embedding_base_args}
219
serve.run(router_app, name="router", host=host, _blocking=blocking)
221
deployment_address = f"http://{host}:8000"
222
return deployment_address