Skip to content

pyobvector: A python SDK for OceanBase Vector Store, based on SQLAlchemy, compatible with Milvus API.

License

Notifications You must be signed in to change notification settings

oceanbase/pyobvector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

60 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

pyobvector

A python SDK for OceanBase Vector Store, based on SQLAlchemy, compatible with Milvus API.

Downloads Downloads

Installation

  • git clone this repo, then install with:
poetry install
  • install with pip:
pip install pyobvector==0.1.19

Build Doc

You can build document locally with sphinx:

mkdir build
make html

Usage

pyobvector supports two modes:

  • Milvus compatible mode: You can use the MilvusLikeClient class to use vector storage in a way similar to the Milvus API
  • SQLAlchemy hybrid mode: You can use the vector storage function provided by the ObVecClient class and execute the relational database statement with the SQLAlchemy library. In this mode, you can regard pyobvector as an extension of SQLAlchemy.

Milvus compatible mode

Refer to tests/test_milvus_like_client.py for more examples.

A simple workflow to perform ANN search with OceanBase Vector Store:

  • setup a client:
from pyobvector import *

client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")
  • create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = ObRangePartition(False, range_part_infos = [
    RangeListPartInfo('p0', 100),
    RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = self.client.prepare_index_params()
idx_params.add_index(
    field_name='embedding',
    index_type=VecIndexType.HNSW,
    index_name='vidx',
    metric_type="L2",
    params={"M": 16, "efConstruction": 256},
)
# create collection
client.create_collection(
    collection_name=test_collection_name,
    schema=schema,
    index_params=idx_params,
)
  • insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)
  • do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]

SQLAlchemy hybrid mode

  • setup a client:
from pyobvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func

client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
  • create a partitioned table with vector index:
# create partitioned table
range_part = ObRangePartition(False, range_part_infos = [
    RangeListPartInfo('p0', 100),
    RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')

cols = [
    Column('id', Integer, primary_key=True, autoincrement=False),
    Column('embedding', VECTOR(3)),
    Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)

# create vector index
client.create_index(
    test_collection_name, 
    is_vec_index=True, 
    index_name='vidx',
    column_names=['embedding'],
    vidx_params='distance=l2, type=hnsw, lib=vsag',
)
  • insert data to your collection:
# insert data
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)
  • do ann search:
# perform ann search
res = self.client.ann_search(
    test_collection_name, 
    vec_data=[0,0,0], 
    vec_column_name='embedding',
    distance_func=l2_distance,
    topk=5,
    output_column_names=['id']
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]
  • If you want to use pure SQLAlchemy API with OceanBase dialect, you can just get an SQLAlchemy.engine via client.engine. The engine can also be created as following:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
    f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
  • Async engine is also supported:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
    f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
  • For further usage in pure SQLAlchemy mode, please refer to SQLAlchemy

About

pyobvector: A python SDK for OceanBase Vector Store, based on SQLAlchemy, compatible with Milvus API.

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •