Pyobvector
A Python SDK for OceanBase Multimodal Store—enabling vector search, full-text search, and JSON table operations—offers both Milvus-compatible API and SQLAlchemy-based SQL mode, and supports both OceanBase and OceanBase seekdb.
Install / Use
/learn @oceanbase/PyobvectorREADME
pyobvector
A python SDK for OceanBase Multimodal Store (Vector Store / Full Text Search / JSON Table), based on SQLAlchemy, compatible with Milvus API.
Installation
- git clone this repo, then install with:
uv sync
- install with pip:
pip install pyobvector==0.2.25
- for embedded SeekDB support (local SeekDB without server):
pip install pyobvector[pyseekdb]
Build Doc
You can build document locally with sphinx:
mkdir build
make html
Release Notes
For detailed release notes and changelog, see RELEASE_NOTES.md.
Usage
pyobvector supports four modes:
Milvus compatible mode: You can use theMilvusLikeClientclass to use vector storage in a way similar to the Milvus APISQLAlchemy hybrid mode: You can use the vector storage function provided by theObVecClientclass and execute the relational database statement with the SQLAlchemy library. In this mode, you can regardpyobvectoras an extension of SQLAlchemy.Embedded SeekDB mode: UseObVecClientorSeekdbRemoteClientwith local embedded SeekDB (no server). Same API as remote:create_table,insert,ann_search, etc. Requires optional dependency:pip install pyobvector[pyseekdb].Hybrid Search mode: You can use theHybridSearchclass to perform hybrid search that combines full-text search and vector similarity search, with Elasticsearch-compatible query syntax.
Milvus compatible mode
Refer to tests/test_milvus_like_client.py for more examples.
A simple workflow to perform ANN search with OceanBase Vector Store:
- setup a client:
from pyobvector import *
client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")
- create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = self.client.prepare_index_params()
idx_params.add_index(
field_name='embedding',
index_type=VecIndexType.HNSW,
index_name='vidx',
metric_type="L2",
params={"M": 16, "efConstruction": 256},
)
# create collection
client.create_collection(
collection_name=test_collection_name,
schema=schema,
index_params=idx_params,
)
- insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)
- do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]
SQLAlchemy hybrid mode
- setup a client:
from pyobvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
- create a partitioned table with vector index:
# create partitioned table
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
cols = [
Column('id', Integer, primary_key=True, autoincrement=False),
Column('embedding', VECTOR(3)),
Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)
# create vector index
client.create_index(
test_collection_name,
is_vec_index=True,
index_name='vidx',
column_names=['embedding'],
vidx_params='distance=l2, type=hnsw, lib=vsag',
)
- insert data to your collection:
# insert data
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)
- do ann search:
# perform ann search with basic column selection
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_column_names=['id'] # Legacy parameter
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]
# perform ann search with SQLAlchemy expressions (recommended)
from sqlalchemy import Table, text, func
table = Table(test_collection_name, client.metadata_obj, autoload_with=client.engine)
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_columns=[
table.c.id,
table.c.meta,
(table.c.id + 1000).label('id_plus_1000'),
text("JSON_EXTRACT(meta, '$.key') as extracted_key")
]
)
# For example, the result will be:
# [(112, '{"key": "value"}', 1112, 'value'), ...]
# perform ann search with distance threshold (filter results by distance)
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
with_dist=True,
topk=10,
output_column_names=['id'],
distance_threshold=0.5 # Only return results where distance <= 0.5
)
# Only returns results with distance <= 0.5
# For example, the result will be:
# [(10, 0.0), (11, 0.0), ...] # Only includes results with distance <= 0.5
ann_search Parameters
The ann_search method supports flexible output column selection through the output_columns parameter:
-
output_columns(recommended): Accepts SQLAlchemy Column objects, expressions, or a mix of both- Column objects:
table.c.id,table.c.name - Expressions:
(table.c.age + 10).label('age_plus_10') - JSON queries:
text("JSON_EXTRACT(meta, '$.key') as extracted_key") - String functions:
func.concat(table.c.name, ' (', table.c.age, ')').label('name_age')
- Column objects:
-
output_column_names(legacy): Accepts list of column name strings- Example:
['id', 'name', 'meta']
- Example:
-
Parameter Priority:
output_columnstakes precedence overoutput_column_nameswhen both are provided -
distance_threshold(optional): Filter results by distance threshold- Type:
Optional[float] - Only returns results where
distance <= threshold - Example:
distance_threshold=0.5returns only results with distance <= 0.5 - Use case: Quality control for similarity search, only return highly similar results
- Type:
-
If you want to use pure
SQLAlchemyAPI withOceanBasedialect, you can just get anSQLAlchemy.engineviaclient.engine. The engine can also be created as following:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
- Async engine is also supported:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
- For further usage in pure
SQLAlchemymode, please refer to SQLAlchemy
Embedded SeekDB mode
Use the same ObClient/ObVecClient API with embedded SeekDB (local file, no server). Install the optional dependency:
pip install pyobvector[pyseekdb]
- connect with path or with an existing
pyseekdb.Client:
from pyobvector import SeekdbRemoteClient, ObVecClient
from pyobvector.client.ob_client import ObClient
# Option 1: path to SeekDB data directory
client = SeekdbRemoteClient(path="./seekdb_data", database="test")
# Option 2: use an existing pyseekdb.Client
import pyseekdb
pyseekdb_client = pyseekdb.Client(path="./seekdb_data", database="test")
client = SeekdbRemoteClient(pyseekdb_client=pyseekdb_client)
# Option 3: ObVecClient directly
client = ObVecClient(path="./seekdb_data", db_name="test")
assert isinstance(client, ObVecClient)
assert isinstance(client, ObClient)
- create table, insert, and ann search (same API as remote):
from sqlalchemy import Column, Integer, VARCHAR
from pyobvector import VECTOR, VectorIndex, l2_distance
client.drop_table_if_exist("vec_table")
client.create_table(
table_name="vec_table",
columns=[
Column("id", Integer, primary_key=True),
Column("title", VARCHAR(255)),
Column("vec", VECTOR(3)),
],
indexes=[VectorIndex("vec_idx", "vec",
