Skip to content

Commit d0b414e

Browse files
committed
Breaking change: Rename Stage to Volume
Signed-off-by: lentitude2tk <[email protected]>
1 parent e93d548 commit d0b414e

12 files changed

+372
-376
lines changed

examples/bulk_import/example_bulkwriter_stage.py renamed to examples/bulk_import/example_bulkwriter_volume.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import numpy as np
1717

1818
from examples.bulk_import.data_gengerator import *
19-
from pymilvus.bulk_writer.stage_bulk_writer import StageBulkWriter
19+
from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter
2020
from pymilvus.orm import utility
2121

2222
logging.basicConfig(level=logging.INFO)
@@ -46,7 +46,7 @@
4646
API_KEY = "_api_key_for_cluster_org_"
4747

4848
# This is currently a private preview feature. If you need to use it, please submit a request and contact us.
49-
STAGE_NAME = "_stage_name_for_project_"
49+
VOLUME_NAME = "_volume_name_for_project_"
5050

5151
CLUSTER_ID = "_your_cloud_cluster_id_"
5252
DB_NAME = "" # If db_name is not specified, use ""
@@ -93,12 +93,12 @@ def build_all_type_schema():
9393
return schema
9494

9595

96-
def example_collection_remote_stage(file_type: BulkFileType):
96+
def example_collection_remote_volume(file_type: BulkFileType):
9797
schema = build_all_type_schema()
9898
print(f"\n===================== all field types ({file_type.name}) ====================")
9999
create_collection(schema, False)
100-
stage_upload_result = stage_remote_writer(file_type, schema)
101-
call_stage_import(stage_upload_result['stage_name'], stage_upload_result['path'])
100+
volume_upload_result = volume_remote_writer(file_type, schema)
101+
call_volume_import(volume_upload_result['volume_name'], volume_upload_result['path'])
102102
retrieve_imported_data()
103103

104104

@@ -111,16 +111,16 @@ def create_collection(schema: CollectionSchema, drop_if_exist: bool):
111111
print(f"Collection '{collection.name}' created")
112112

113113

114-
def stage_remote_writer(file_type, schema):
115-
with StageBulkWriter(
114+
def volume_remote_writer(file_type, schema):
115+
with VolumeBulkWriter(
116116
schema=schema,
117117
remote_path="bulk_data",
118118
file_type=file_type,
119119
chunk_size=512 * 1024 * 1024,
120120
cloud_endpoint=CLOUD_ENDPOINT,
121121
api_key=API_KEY,
122-
stage_name=STAGE_NAME,
123-
) as stage_bulk_writer:
122+
volume_name=VOLUME_NAME,
123+
) as volume_bulk_writer:
124124
print("Append rows")
125125
batch_count = 10000
126126
for i in range(batch_count):
@@ -146,12 +146,12 @@ def stage_remote_writer(file_type, schema):
146146
"array_int": [k for k in range(10)],
147147
"sparse_vector": gen_sparse_vector(False),
148148
}
149-
stage_bulk_writer.append_row(row)
149+
volume_bulk_writer.append_row(row)
150150

151151
# append rows by numpy type
152152
for i in range(batch_count):
153153
id = i + batch_count
154-
stage_bulk_writer.append_row({
154+
volume_bulk_writer.append_row({
155155
"id": np.int64(id),
156156
"bool": True if i % 3 == 0 else False,
157157
"int8": np.int8(id % 128),
@@ -174,12 +174,12 @@ def stage_remote_writer(file_type, schema):
174174
"sparse_vector": gen_sparse_vector(True),
175175
})
176176

177-
print(f"{stage_bulk_writer.total_row_count} rows appends")
178-
print(f"{stage_bulk_writer.buffer_row_count} rows in buffer not flushed")
177+
print(f"{volume_bulk_writer.total_row_count} rows appends")
178+
print(f"{volume_bulk_writer.buffer_row_count} rows in buffer not flushed")
179179
print("Generate data files...")
180-
stage_bulk_writer.commit()
181-
print(f"Data files have been uploaded: {stage_bulk_writer.batch_files}")
182-
return stage_bulk_writer.get_stage_upload_result()
180+
volume_bulk_writer.commit()
181+
print(f"Data files have been uploaded: {volume_bulk_writer.batch_files}")
182+
return volume_bulk_writer.get_volume_upload_result()
183183

184184

185185
def retrieve_imported_data():
@@ -217,15 +217,15 @@ def retrieve_imported_data():
217217
print(item)
218218

219219

220-
def call_stage_import(stage_name: str, path: str):
220+
def call_volume_import(volume_name: str, path: str):
221221
print(f"\n===================== import files to cluster ====================")
222222
resp = bulk_import(
223223
url=CLOUD_ENDPOINT,
224224
api_key=API_KEY,
225225
cluster_id=CLUSTER_ID,
226226
db_name=DB_NAME,
227227
collection_name=COLLECTION_NAME,
228-
stage_name=stage_name,
228+
volume_name=volume_name,
229229
data_paths=[[path]]
230230
)
231231
print(resp.json())
@@ -270,4 +270,4 @@ def call_stage_import(stage_name: str, path: str):
270270

271271
if __name__ == '__main__':
272272
create_connection()
273-
example_collection_remote_stage(file_type=BulkFileType.PARQUET)
273+
example_collection_remote_volume(file_type=BulkFileType.PARQUET)

examples/bulk_import/example_stage_file_manager.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

examples/bulk_import/example_stage_manager.py

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from pymilvus.bulk_writer.constants import ConnectType
2+
from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager
3+
4+
if __name__ == "__main__":
5+
volume_file_manager = VolumeFileManager(
6+
cloud_endpoint='https://api.cloud.zilliz.com',
7+
api_key='_api_key_for_cluster_org_',
8+
volume_name='_volume_name_for_project_',
9+
connect_type=ConnectType.AUTO,
10+
)
11+
result = volume_file_manager.upload_file_to_volume("/Users/zilliz/data/", "data/")
12+
print(f"\nuploadFileToVolume results: {result}")
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from pymilvus.bulk_writer.volume_manager import VolumeManager
2+
3+
PROJECT_ID = "_id_for_project_"
4+
REGION_ID = "_id_for_region_"
5+
VOLUME_NAME = "_volume_name_for_project_"
6+
7+
if __name__ == "__main__":
8+
volume_manager = VolumeManager(
9+
cloud_endpoint="https://api.cloud.zilliz.com",
10+
api_key="_api_key_for_cluster_org_",
11+
)
12+
13+
volume_manager.create_volume(PROJECT_ID, REGION_ID, VOLUME_NAME)
14+
print(f"\nVolume {VOLUME_NAME} created")
15+
16+
volume_list = volume_manager.list_volumes(PROJECT_ID, 1, 10)
17+
print(f"\nlistVolumes results: ", volume_list.json()['data'])
18+
19+
volume_manager.delete_volume(VOLUME_NAME)
20+
print(f"\nVolume {VOLUME_NAME} deleted")

pymilvus/bulk_writer/bulk_import.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def bulk_import(
114114
access_key: str = "",
115115
secret_key: str = "",
116116
token: str = "",
117-
stage_name: str = "",
117+
volume_name: str = "",
118118
data_paths: [List[List[str]]] = None,
119119
verify: Optional[Union[bool, str]] = True,
120120
cert: Optional[Union[str, tuple]] = None,
@@ -139,7 +139,7 @@ def bulk_import(
139139
secret_key (str): secret key to access the object storage(cloud)
140140
token (str): access token to access the object storage(cloud)
141141
142-
stage_name (str): name of the stage to import(cloud)
142+
volume_name (str): name of the volume to import(cloud)
143143
data_paths (list of list of str): The paths of files that contain the data to import(cloud)
144144
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
145145
or a string, which must be server's certificate path. Defaults to `True`.
@@ -181,15 +181,15 @@ def bulk_import(
181181
... token="your-token" # for short-term credentials, also include `token`
182182
... )
183183
184-
>>> # 3. Import multiple files or folders from a Zilliz Stage into a Zilliz Cloud instance
184+
>>> # 3. Import multiple files or folders from a Zilliz Volume into a Zilliz Cloud instance
185185
>>> bulk_import(
186186
... url="https://api.cloud.zilliz.com", # If regions in China, it is: https://api.cloud.zilliz.com.cn
187187
... api_key="YOUR_API_KEY",
188188
... cluster_id="in0x-xxx",
189189
... db_name="", # Only For Dedicated deployments: this parameter can be specified.
190190
... collection_name="my_collection",
191191
... partition_name="", # If Collection not enable partitionKey, can be specified.
192-
... stage_name="my_stage",
192+
... volume_name="my_volume",
193193
... data_paths=[
194194
... ["parquet-folder/1.parquet"],
195195
... ["parquet-folder-2/"]
@@ -210,7 +210,7 @@ def bulk_import(
210210
"accessKey": access_key,
211211
"secretKey": secret_key,
212212
"token": token,
213-
"stageName": stage_name,
213+
"volumeName": volume_name,
214214
"dataPaths": data_paths,
215215
}
216216

pymilvus/bulk_writer/stage_manager.py

Lines changed: 0 additions & 39 deletions
This file was deleted.

pymilvus/bulk_writer/stage_bulk_writer.py renamed to pymilvus/bulk_writer/volume_bulk_writer.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from pathlib import Path
44
from typing import Any, Dict, List, Optional
55

6-
from pymilvus.bulk_writer.stage_file_manager import StageFileManager
6+
from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager
77
from pymilvus.orm.schema import CollectionSchema
88

99
from .constants import MB, BulkFileType, ConnectType
@@ -12,16 +12,16 @@
1212
logger = logging.getLogger(__name__)
1313

1414

15-
class StageBulkWriter(LocalBulkWriter):
16-
"""StageBulkWriter handles writing local bulk files to a remote stage."""
15+
class VolumeBulkWriter(LocalBulkWriter):
16+
"""VolumeBulkWriter handles writing local bulk files to a remote volume."""
1717

1818
def __init__(
1919
self,
2020
schema: CollectionSchema,
2121
remote_path: str,
2222
cloud_endpoint: str,
2323
api_key: str,
24-
stage_name: str,
24+
volume_name: str,
2525
chunk_size: int = 1024 * MB,
2626
file_type: BulkFileType = BulkFileType.PARQUET,
2727
config: Optional[dict] = None,
@@ -33,11 +33,11 @@ def __init__(
3333
remote_dir_path = Path(remote_path) / super().uuid
3434
self._remote_path = str(remote_dir_path) + "/"
3535
self._remote_files: List[List[str]] = []
36-
self._stage_name = stage_name
37-
self._stage_file_manager = StageFileManager(
36+
self._volume_name = volume_name
37+
self._volume_file_manager = VolumeFileManager(
3838
cloud_endpoint=cloud_endpoint,
3939
api_key=api_key,
40-
stage_name=stage_name,
40+
volume_name=volume_name,
4141
connect_type=ConnectType.AUTO,
4242
)
4343

@@ -50,7 +50,7 @@ def append_row(self, row: Dict[str, Any], **kwargs):
5050
super().append_row(row, **kwargs)
5151

5252
def commit(self, **kwargs):
53-
"""Commit local bulk files and upload to remote stage."""
53+
"""Commit local bulk files and upload to remote volume."""
5454
super().commit(call_back=self._upload)
5555

5656
@property
@@ -61,8 +61,8 @@ def data_path(self) -> str:
6161
def batch_files(self) -> List[List[str]]:
6262
return self._remote_files
6363

64-
def get_stage_upload_result(self) -> Dict[str, str]:
65-
return {"stage_name": self._stage_name, "path": str(self._remote_path)}
64+
def get_volume_upload_result(self) -> Dict[str, str]:
65+
return {"volume_name": self._volume_name, "path": str(self._remote_path)}
6666

6767
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object):
6868
super().__exit__(exc_type, exc_val, exc_tb)
@@ -84,7 +84,7 @@ def _local_rm(self, file_path: str):
8484
logger.warning(f"Failed to delete local file: {file_path}")
8585

8686
def _upload(self, file_list: List[str]) -> List[str]:
87-
"""Upload files to remote stage and remove local copies."""
87+
"""Upload files to remote volume and remove local copies."""
8888
uploaded_files: List[str] = []
8989

9090
for file_path in file_list:
@@ -105,5 +105,5 @@ def _upload(self, file_list: List[str]) -> List[str]:
105105

106106
def _upload_object(self, file_path: str, object_name: str):
107107
logger.info(f"Prepare to upload '{file_path}' to '{object_name}'")
108-
self._stage_file_manager.upload_file_to_stage(file_path, self._remote_path)
108+
self._volume_file_manager.upload_file_to_volume(file_path, self._remote_path)
109109
logger.info(f"Uploaded file '{file_path}' to '{object_name}'")

0 commit comments

Comments
 (0)