Skip to content

Commit c45876c

Browse files
cydrainXuanYang-cn
authored andcommitted
Add client aws_opensearch
Signed-off-by: Cai Yudong <[email protected]>
1 parent 3fdb298 commit c45876c

File tree

9 files changed

+419
-12
lines changed

9 files changed

+419
-12
lines changed

README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,19 @@ pip install vectordb-bench[pinecone]
2727
```
2828
All the database client supported
2929

30-
|Optional database client|install command|
31-
|---------------|---------------|
32-
|pymilvus(*default*)|`pip install vectordb-bench`|
33-
|all|`pip install vectordb-bench[all]`|
34-
|qdrant|`pip install vectordb-bench[qdrant]`|
35-
|pinecone|`pip install vectordb-bench[pinecone]`|
36-
|weaviate|`pip install vectordb-bench[weaviate]`|
37-
|elastic|`pip install vectordb-bench[elastic]`|
38-
|pgvector|`pip install vectordb-bench[pgvector]`|
39-
|pgvecto.rs|`pip install vectordb-bench[pgvecto_rs]`|
40-
|redis|`pip install vectordb-bench[redis]`|
41-
|chromadb|`pip install vectordb-bench[chromadb]`|
30+
| Optional database client | install command |
31+
|--------------------------|---------------------------------------------|
32+
| pymilvus(*default*) | `pip install vectordb-bench` |
33+
| all | `pip install vectordb-bench[all]` |
34+
| qdrant | `pip install vectordb-bench[qdrant]` |
35+
| pinecone | `pip install vectordb-bench[pinecone]` |
36+
| weaviate | `pip install vectordb-bench[weaviate]` |
37+
| elastic | `pip install vectordb-bench[elastic]` |
38+
| pgvector | `pip install vectordb-bench[pgvector]` |
39+
| pgvecto.rs | `pip install vectordb-bench[pgvecto_rs]` |
40+
| redis | `pip install vectordb-bench[redis]` |
41+
| chromadb | `pip install vectordb-bench[chromadb]` |
42+
| awsopensearch | `pip install vectordb-bench[awsopensearch]` |
4243

4344
### Run
4445

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ all = [
6262
"psycopg2",
6363
"psycopg",
6464
"psycopg-binary",
65+
"opensearch-dsl==2.1.0",
66+
"opensearch-py==2.6.0",
6567
]
6668

6769
qdrant = [ "qdrant-client" ]
@@ -72,6 +74,7 @@ pgvector = [ "psycopg", "psycopg-binary", "pgvector" ]
7274
pgvecto_rs = [ "psycopg2" ]
7375
redis = [ "redis" ]
7476
chromadb = [ "chromadb" ]
77+
awsopensearch = [ "awsopensearch" ]
7578
zilliz_cloud = []
7679

7780
[project.urls]

vectordb_bench/backend/clients/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class DB(Enum):
3232
PgVectoRS = "PgVectoRS"
3333
Redis = "Redis"
3434
Chroma = "Chroma"
35+
AWSOpenSearch = "OpenSearch"
3536
Test = "test"
3637

3738

@@ -78,6 +79,10 @@ def init_cls(self) -> Type[VectorDB]:
7879
from .chroma.chroma import ChromaClient
7980
return ChromaClient
8081

82+
if self == DB.AWSOpenSearch:
83+
from .aws_opensearch.aws_opensearch import AWSOpenSearch
84+
return AWSOpenSearch
85+
8186
@property
8287
def config_cls(self) -> Type[DBConfig]:
8388
"""Import while in use"""
@@ -121,6 +126,10 @@ def config_cls(self) -> Type[DBConfig]:
121126
from .chroma.config import ChromaConfig
122127
return ChromaConfig
123128

129+
if self == DB.AWSOpenSearch:
130+
from .aws_opensearch.config import AWSOpenSearchConfig
131+
return AWSOpenSearchConfig
132+
124133
def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseConfig]:
125134
if self == DB.Milvus:
126135
from .milvus.config import _milvus_case_config
@@ -150,6 +159,10 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon
150159
from .pgvecto_rs.config import _pgvecto_rs_case_config
151160
return _pgvecto_rs_case_config.get(index_type)
152161

162+
if self == DB.AWSOpenSearch:
163+
from .aws_opensearch.config import AWSOpenSearchIndexConfig
164+
return AWSOpenSearchIndexConfig
165+
153166
# DB.Pinecone, DB.Chroma, DB.Redis
154167
return EmptyDBCaseConfig
155168

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import logging
2+
from contextlib import contextmanager
3+
import time
4+
from typing import Iterable, Type
5+
from ..api import VectorDB, DBCaseConfig, DBConfig, IndexType
6+
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig
7+
from opensearchpy import OpenSearch
8+
from opensearchpy.helpers import bulk
9+
10+
log = logging.getLogger(__name__)
11+
12+
13+
class AWSOpenSearch(VectorDB):
14+
def __init__(
15+
self,
16+
dim: int,
17+
db_config: dict,
18+
db_case_config: AWSOpenSearchIndexConfig,
19+
index_name: str = "vdb_bench_index", # must be lowercase
20+
id_col_name: str = "id",
21+
vector_col_name: str = "embedding",
22+
drop_old: bool = False,
23+
**kwargs,
24+
):
25+
self.dim = dim
26+
self.db_config = db_config
27+
self.case_config = db_case_config
28+
self.index_name = index_name
29+
self.id_col_name = id_col_name
30+
self.category_col_names = [
31+
f"scalar-{categoryCount}" for categoryCount in [2, 5, 10, 100, 1000]
32+
]
33+
self.vector_col_name = vector_col_name
34+
35+
log.info(f"AWS_OpenSearch client config: {self.db_config}")
36+
client = OpenSearch(**self.db_config)
37+
if drop_old:
38+
log.info(f"AWS_OpenSearch client drop old index: {self.index_name}")
39+
is_existed = client.indices.exists(index=self.index_name)
40+
if is_existed:
41+
client.indices.delete(index=self.index_name)
42+
self._create_index(client)
43+
44+
@classmethod
45+
def config_cls(cls) -> AWSOpenSearchConfig:
46+
return AWSOpenSearchConfig
47+
48+
@classmethod
49+
def case_config_cls(
50+
cls, index_type: IndexType | None = None
51+
) -> AWSOpenSearchIndexConfig:
52+
return AWSOpenSearchIndexConfig
53+
54+
def _create_index(self, client: OpenSearch):
55+
settings = {
56+
"index": {
57+
"knn": True,
58+
# "number_of_shards": 5,
59+
# "refresh_interval": "600s",
60+
}
61+
}
62+
mappings = {
63+
"properties": {
64+
self.id_col_name: {"type": "integer"},
65+
**{
66+
categoryCol: {"type": "keyword"}
67+
for categoryCol in self.category_col_names
68+
},
69+
self.vector_col_name: {
70+
"type": "knn_vector",
71+
"dimension": self.dim,
72+
"method": self.case_config.index_param(),
73+
},
74+
}
75+
}
76+
try:
77+
client.indices.create(
78+
index=self.index_name, body=dict(settings=settings, mappings=mappings)
79+
)
80+
except Exception as e:
81+
log.warning(f"Failed to create index: {self.index_name} error: {str(e)}")
82+
raise e from None
83+
84+
@contextmanager
85+
def init(self) -> None:
86+
"""connect to elasticsearch"""
87+
self.client = OpenSearch(**self.db_config)
88+
89+
yield
90+
# self.client.transport.close()
91+
self.client = None
92+
del self.client
93+
94+
def insert_embeddings(
95+
self,
96+
embeddings: Iterable[list[float]],
97+
metadata: list[int],
98+
**kwargs,
99+
) -> tuple[int, Exception]:
100+
"""Insert the embeddings to the elasticsearch."""
101+
assert self.client is not None, "should self.init() first"
102+
103+
insert_data = []
104+
for i in range(len(embeddings)):
105+
insert_data.append({"index": {"_index": self.index_name, "_id": metadata[i]}})
106+
insert_data.append({self.vector_col_name: embeddings[i]})
107+
try:
108+
resp = self.client.bulk(insert_data)
109+
log.info(f"AWS_OpenSearch adding documents: {len(resp['items'])}")
110+
resp = self.client.indices.stats(self.index_name)
111+
log.info(f"Total document count in index: {resp['_all']['primaries']['indexing']['index_total']}")
112+
return (len(embeddings), None)
113+
except Exception as e:
114+
log.warning(f"Failed to insert data: {self.index_name} error: {str(e)}")
115+
time.sleep(10)
116+
return self.insert_embeddings(embeddings, metadata)
117+
118+
def search_embedding(
119+
self,
120+
query: list[float],
121+
k: int = 100,
122+
filters: dict | None = None,
123+
) -> list[int]:
124+
"""Get k most similar embeddings to query vector.
125+
126+
Args:
127+
query(list[float]): query embedding to look up documents similar to.
128+
k(int): Number of most similar embeddings to return. Defaults to 100.
129+
filters(dict, optional): filtering expression to filter the data while searching.
130+
131+
Returns:
132+
list[tuple[int, float]]: list of k most similar embeddings in (id, score) tuple to the query embedding.
133+
"""
134+
assert self.client is not None, "should self.init() first"
135+
136+
body = {
137+
"size": k,
138+
"query": {"knn": {self.vector_col_name: {"vector": query, "k": k}}},
139+
}
140+
try:
141+
resp = self.client.search(index=self.index_name, body=body)
142+
log.info(f'Search took: {resp["took"]}')
143+
log.info(f'Search shards: {resp["_shards"]}')
144+
log.info(f'Search hits total: {resp["hits"]["total"]}')
145+
result = [int(d["_id"]) for d in resp["hits"]["hits"]]
146+
# log.info(f'success! length={len(res)}')
147+
148+
return result
149+
except Exception as e:
150+
log.warning(f"Failed to search: {self.index_name} error: {str(e)}")
151+
raise e from None
152+
153+
def optimize(self):
154+
"""optimize will be called between insertion and search in performance cases."""
155+
pass
156+
157+
def ready_to_load(self):
158+
"""ready_to_load will be called before load in load cases."""
159+
pass
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Annotated, TypedDict, Unpack
2+
3+
import click
4+
from pydantic import SecretStr
5+
6+
from ....cli.cli import (
7+
CommonTypedDict,
8+
HNSWFlavor2,
9+
cli,
10+
click_parameter_decorators_from_typed_dict,
11+
run,
12+
)
13+
from .. import DB
14+
15+
16+
class AWSOpenSearchTypedDict(TypedDict):
17+
host: Annotated[
18+
str, click.option("--host", type=str, help="Db host", required=True)
19+
]
20+
port: Annotated[int, click.option("--port", type=int, default=443, help="Db Port")]
21+
user: Annotated[str, click.option("--user", type=str, default="admin", help="Db User")]
22+
password: Annotated[str, click.option("--password", type=str, help="Db password")]
23+
24+
25+
class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor2):
26+
...
27+
28+
29+
@cli.command()
30+
@click_parameter_decorators_from_typed_dict(AWSOpenSearchHNSWTypedDict)
31+
def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
32+
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig
33+
run(
34+
db=DB.AWSOpenSearch,
35+
db_config=AWSOpenSearchConfig(
36+
host=parameters["host"],
37+
port=parameters["port"],
38+
user=parameters["user"],
39+
password=SecretStr(parameters["password"]),
40+
),
41+
db_case_config=AWSOpenSearchIndexConfig(
42+
),
43+
**parameters,
44+
)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from enum import Enum
2+
from pydantic import SecretStr, BaseModel
3+
4+
from ..api import DBConfig, DBCaseConfig, MetricType, IndexType
5+
6+
7+
class AWSOpenSearchConfig(DBConfig, BaseModel):
8+
host: str = ""
9+
port: int = 443
10+
user: str = ""
11+
password: SecretStr = ""
12+
13+
def to_dict(self) -> dict:
14+
return {
15+
"hosts": [{'host': self.host, 'port': self.port}],
16+
"http_auth": (self.user, self.password.get_secret_value()),
17+
"use_ssl": True,
18+
"http_compress": True,
19+
"verify_certs": True,
20+
"ssl_assert_hostname": False,
21+
"ssl_show_warn": False,
22+
"timeout": 600,
23+
}
24+
25+
26+
class AWSOS_Engine(Enum):
27+
nmslib = "nmslib"
28+
faiss = "faiss"
29+
lucene = "Lucene"
30+
31+
32+
class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
33+
metric_type: MetricType = MetricType.L2
34+
engine: AWSOS_Engine = AWSOS_Engine.nmslib
35+
efConstruction: int = 360
36+
M: int = 30
37+
38+
def parse_metric(self) -> str:
39+
if self.metric_type == MetricType.IP:
40+
return "innerproduct" # only support faiss / nmslib, not for Lucene.
41+
elif self.metric_type == MetricType.COSINE:
42+
return "cosinesimil"
43+
return "l2"
44+
45+
def index_param(self) -> dict:
46+
params = {
47+
"name": "hnsw",
48+
"space_type": self.parse_metric(),
49+
"engine": self.engine.value,
50+
"parameters": {
51+
"ef_construction": self.efConstruction,
52+
"m": self.M
53+
}
54+
}
55+
return params
56+
57+
def search_param(self) -> dict:
58+
return {}

0 commit comments

Comments
 (0)