Skip to content

Commit ef6e9fd

Browse files
committed
[Mooncake] Add Basic Mooncake Class with PUT/GET API
1 parent d59def4 commit ef6e9fd

File tree

1 file changed

+90
-0
lines changed
  • vllm/distributed/kv_transfer/kv_lookup_buffer

1 file changed

+90
-0
lines changed

vllm/distributed/kv_transfer/kv_lookup_buffer/base.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
from abc import ABC, abstractmethod
1212
from typing import List, Optional
13+
from vllm.config import VllmConfig
14+
from vllm.distributed.kv_transfer.kv_pipe.mooncake_pipe import MooncakeTransferEngineConfig
1315

1416
import torch
1517

@@ -107,3 +109,91 @@ def close(self) -> None:
107109
NotImplementedError: This method must be implemented in subclasses.
108110
"""
109111
raise NotImplementedError
112+
113+
class MooncakeStore(KVLookupBufferBase):
114+
def __init__(
115+
self,
116+
url: str,
117+
local_tp_rank: int,
118+
config: VllmConfig,
119+
):
120+
"""
121+
from distributed_object_store import DistributedObjectStore
122+
"""
123+
try:
124+
import distributed_object_store as dos
125+
except ImportError as e:
126+
raise ImportError(
127+
"Please install mooncake by following the instructions at "
128+
"https://github.com/kvcache-ai/Mooncake/blob/main/doc/en/build.md "
129+
"to run vLLM with MooncakeConnector.") from e
130+
self.url = url
131+
self.local_tp_rank = local_tp_rank
132+
self.store = dos.DistributedObjectStore() #
133+
134+
try:
135+
self.config = MooncakeTransferEngineConfig.load_from_env()
136+
logger.info("Mooncake Configuration loaded successfully.")
137+
except ValueError as e:
138+
logger.error(e)
139+
raise
140+
except Exception as exc:
141+
logger.error(
142+
"An error occurred while loading the configuration: %s", exc)
143+
raise
144+
145+
self.store.initAll(self.config.protocol,
146+
self.config.device_name,
147+
3200 * 1024 * 1024) # Init ALL, 3200 workaround
148+
149+
def insert(self, input_tokens: torch.Tensor, roi: torch.Tensor,
150+
key: torch.Tensor, value: torch.Tensor,
151+
hidden: torch.Tensor) -> None:
152+
# V1 (pack and put all tensors): insert the tensors into MooncakeStore's buffer
153+
raise NotImplementedError("Insert method is not implemented")
154+
155+
def drop_select(
156+
self, input_tokens: Optional[torch.Tensor],
157+
roi: Optional[torch.Tensor]) -> List[Optional[torch.Tensor]]:
158+
# V1 (get and unpack all tensors): consume tensors from MooncakeStore's buffer
159+
raise NotImplementedError("Insert method is not implemented")
160+
161+
def put(
162+
self,
163+
key: str,
164+
value: Optional[torch.Tensor],
165+
) -> None:
166+
# submit asynchronous put thread
167+
if value is not None:
168+
self._put_impl(key, value)
169+
170+
def get(
171+
self,
172+
key: str,
173+
) -> Optional[torch.Tensor]:
174+
# submit asynchronous get thread
175+
value = self._get_impl(key)
176+
if len(value) > 0:
177+
return value
178+
return None
179+
180+
def _put_impl(
181+
self,
182+
key: str,
183+
value: torch.Tensor,
184+
) -> None:
185+
"""Put KVCache to Mooncake Store"""
186+
value_bytes = pickle.dumps(tensor)
187+
self.store.put(key, value_bytes)
188+
189+
def _get_impl(
190+
self,
191+
key: str,
192+
) -> Optional[torch.Tensor]:
193+
"""Put KVCache from Mooncake Store"""
194+
data = self.store.get(key)
195+
data.
196+
return pickle.loads(data)
197+
198+
199+

0 commit comments

Comments
 (0)