4
from anthropic import AsyncAnthropic
5
from anthropic.types import Message, Usage
7
from metagpt.configs.llm_config import LLMConfig, LLMType
8
from metagpt.const import USE_CONFIG_TIMEOUT
9
from metagpt.logs import log_llm_stream
10
from metagpt.provider.base_llm import BaseLLM
11
from metagpt.provider.llm_provider_registry import register_provider
14
@register_provider([LLMType.ANTHROPIC, LLMType.CLAUDE])
15
class AnthropicLLM(BaseLLM):
16
def __init__(self, config: LLMConfig):
18
self.__init_anthropic()
20
def __init_anthropic(self):
21
self.model = self.config.model
22
self.aclient: AsyncAnthropic = AsyncAnthropic(api_key=self.config.api_key, base_url=self.config.base_url)
24
def _const_kwargs(self, messages: list[dict], stream: bool = False) -> dict:
28
"max_tokens": self.config.max_token,
31
if self.use_system_prompt:
33
if messages[0]["role"] == "system":
34
kwargs["messages"] = messages[1:]
35
kwargs["system"] = messages[0]["content"]
38
def _update_costs(self, usage: Usage, model: str = None, local_calc_usage: bool = True):
39
usage = {"prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens}
40
super()._update_costs(usage, model)
42
def get_choice_text(self, resp: Message) -> str:
43
return resp.content[0].text
45
async def _achat_completion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> Message:
46
resp: Message = await self.aclient.messages.create(**self._const_kwargs(messages))
47
self._update_costs(resp.usage, self.model)
50
async def acompletion(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> Message:
51
return await self._achat_completion(messages, timeout=self.get_timeout(timeout))
53
async def _achat_completion_stream(self, messages: list[dict], timeout: int = USE_CONFIG_TIMEOUT) -> str:
54
stream = await self.aclient.messages.create(**self._const_kwargs(messages, stream=True))
55
collected_content = []
56
usage = Usage(input_tokens=0, output_tokens=0)
57
async for event in stream:
58
event_type = event.type
59
if event_type == "message_start":
60
usage.input_tokens = event.message.usage.input_tokens
61
usage.output_tokens = event.message.usage.output_tokens
62
elif event_type == "content_block_delta":
63
content = event.delta.text
64
log_llm_stream(content)
65
collected_content.append(content)
66
elif event_type == "message_delta":
67
usage.output_tokens = event.usage.output_tokens
70
self._update_costs(usage)
71
full_content = "".join(collected_content)