pyobvector is the Python SDK for OceanBase's vector storage functionality. It provides two usage modes:
pymilvus compatible mode: Uses the MilvusLikeClient object to interact with the database, offering commonly used interfaces compatible with the lightweight MilvusClient.
SQLAlchemy extension mode: Uses the ObVecClient object to interact with the database, providing an extension to the Python SDK for relational databases.
This topic describes the interfaces and provides examples for both modes.
MilvusLikeClient
Constructor
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
Collection related interfaces
| API | Description | Example |
|---|---|---|
def create_schema(self, **kwargs) -> CollectionSchema: |
|
|
|
Creates a table:
|
|
|
Retrieves the number of records in the table
|
|
|
Checks if the table exists
|
|
|
Renames the table
|
|
|
Loads table metadata into the SQLAlchemy metadata cache
|
CollectionSchema & FieldSchema
MilvusLikeClient uses CollectionSchema to describe the schema of a table. A CollectionSchema contains multiple FieldSchema, each of which describes the schema of a column in the table.
Create a CollectionSchema using MilvusLikeClient
def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)
The parameters are described as follows:
fields: A list of optional FieldSchema objects.
partitions: The partitioning rules (see the section on defining partitioning rules using ObPartition for details).
description: This is for compatibility with Milvus and has no effect in OceanBase.
Create a FieldSchema and register it to a CollectionSchema
def add_field(self, field_name: str, datatype: DataType, **kwargs)
field_name: The name of the column.
datatype: The data type of the column (see Compatibility for supported data types).
kwargs: Additional parameters for configuring column properties, as follows:
def __init__( self, name: str, dtype: DataType, description: str = "", is_primary: bool = False, auto_id: bool = False, nullable: bool = False, **kwargs, )The parameters are described as follows:
is_primary: Whether the column is a primary key.
auto_id: Whether the column is an auto-increment column.
nullable: Whether the column allows null values.
Example
schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)
self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)
Index related
| API | Description | Example or Note |
|---|---|---|
|
Creates a vector index table based on the constructed IndexParams (for more details on using IndexParams, see the prepare_index_params and add_index interfaces).
|
|
|
Drops an index table.
|
|
|
Refreshes the vector index table to improve read performance, which can be understood as migrating incremental data.
|
OceanBase's additional interface Not compatible with Milvus |
|
Rebuilds the vector index table to improve read performance, which can be understood as merging incremental data into the base index data.
|
OceanBase's additional interface Not compatible with Milvus |
|
Executes a vector approximate nearest neighbor search.
A list of records, each of which is a dictionary Mapping from column name to column value. |
|
|
Reads data records using the specified filter condition.
A list of records, each of which is a dictionary Mapping from column name to column value. |
|
|
Retrieves records with the specified primary key ids:
A list of records, each of which is a dictionary Mapping from column name to column value. |
|
|
Deletes data from the collection.
|
|
|
Inserts data into the table.
|
|
|
Updates data in the table. If the primary key exists, the corresponding record is updated; otherwise, a new record is inserted.
|
|
|
Executes an SQL statement directly.
Returns an iterator of result sets provided by SQLAlchemy |
ObVecClient
Constructor
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
Table mode operations
| API | Description | Example or Notes |
|---|---|---|
|
Checks if a table exists
|
|
|
Creates a table
|
|
|
Creates an IndexParams object to record the schema definition of the vector index tablestoring a mapping from (column name, index name) tuples to IndexParam structures. The constructor for the IndexParam class is
prepare_index_params, you can register an IndexParam using the add_index interface: |
Here's an example of creating a vector index:prepare_index_params function is recommended to be used under MilvusLikeClient and not under ObVecClient. In ObVecClient mode, the create_index interface should be used to define vector index tables. (See the create_index interface for details.) |
|
Creates a table and simultaneously creates vector indexes using optional index_params
|
Recommended to use under MilvusLikeClient, not recommended to use under ObVecClient |
|
Supports creating both regular indexes and vector indexes
type=hnsw and lib=vsag. Please retain these settings. The distance can be set to l2 or inner_product. |
|
|
Creates a vector index using vector index parameters
|
|
|
Drops a table
|
|
|
Drops an index
|
|
|
Refreshes the vector index table to improve read performance, which can be understood as the migration of incremental data
|
|
|
Rebuilds the vector index table to improve read performance, which can be understood as merging incremental data into the base index data
|
DML operations
| API | Parameter Description | Example or Remarks |
|---|---|---|
|
Insert data into the table
|
|
|
Insert or update data in the table. If the primary key exists, the corresponding record is updated; otherwise, a new record is inserted.
|
|
|
Update data in the table. If the primary key is duplicated, it will be replaced.
|
|
|
Delete data from the table
|
|
|
Retrieve records with the specified primary key ids.
Unlike MilvusLikeClient, the return value of ObVecClient is a list of tuples, where each tuple represents a record. |
|
|
Set the efSearch parameter for the HNSW index. This is a session-level variable. A higher ef_search value improves recall but decreases query performance.
|
|
|
Retrieve the efSearch parameter for the HNSW index | |
|
Perform approximate nearest neighbor search
Unlike MilvusLikeClient, the return value of ObVecClient is a list of tuples, where each tuple represents a record. |
|
|
Perform precise nearest neighbor search
Unlike MilvusLikeClient, the return value of ObVecClient is a list of tuples, where each tuple represents a record. |
|
|
Execute an SQL statement directly
Returns an iterator of result sets provided by SQLAlchemy |
Define partitioning rules using ObPartition
pyobvector provides the following types to support range/range columns, list/list columns, hash, key, and subpartitioning:
ObRangePartition: A range partition. Set
is_range_columns = Trueduring construction to create a range columns partition.ObListPartition: A list partition. Set
is_list_columns = Trueduring construction to create a list columns partition.ObHashPartition: A hash partition.
ObKeyPartition: A key partition.
ObSubRangePartition: A subpartition for range partitioning. Set
is_range_columns = Trueduring construction to create a range columns subpartition.ObSubListPartition: A subpartition for list partitioning. Set
is_list_columns = Trueduring construction to create a list columns subpartition.ObSubHashPartition: A subpartition for hash partitioning.
ObSubKeyPartition: A subpartition for key partitioning.
Example of range partitioning
range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)
Example of list partitioning
list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)
Example of hash partitioning
hash_part = ObHashPartition("col1", part_count=60)
Example of multi-level partitioning
# Primary range partition
range_columns_part = ObRangePartition(
True,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", 200),
RangeListPartInfo("p2", 300),
],
col_name_list=["col1"],
)
# Subpartition for range partitioning
range_sub_part = ObSubRangePartition(
False,
range_part_infos=[
RangeListPartInfo("mp0", 1000),
RangeListPartInfo("mp1", 2000),
RangeListPartInfo("mp2", 3000),
],
range_expr="col3",
)
range_columns_part.add_subpartition(range_sub_part)
Pure SQLAlchemy API mode
If you want to use the vector search feature of OceanBase Database with the pure SQLAlchemy API, you can obtain the synchronous database engine in the following two ways:
Method 1: Use ObVecClient to create the database engine.
from pyobvector import ObVecClient client = ObVecClient(uri="127.0.0.1:2881", user="test@test") engine = client.engine # You can then proceed to create a session using SQLAlchemy and use the SQLAlchemy API.Method 2: Use the
create_engineinterface of ObVecClient to create the database engine.import pyobvector from sqlalchemy.dialects import registry from sqlalchemy import create_engine uri: str = "127.0.0.1:2881" user: str = "root@test" password: str = "" db_name: str = "test" registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect") connection_str = ( # mysql+oceanbase indicates selecting the mysql standard and using the synchronous driver of OceanBase Database. f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4" ) engine = create_engine(connection_str, **kwargs) # You can then proceed to create a session using SQLAlchemy and use the SQLAlchemy API.
If you want to use the asynchronous interface of SQLAlchemy, you can use the asynchronous driver of OceanBase Database:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
# mysql+aoceanbase indicates selecting the mysql standard and using the asynchronous driver of OceanBase Database.
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
# You can then proceed to create a session using SQLAlchemy and use the SQLAlchemy API.
More examples
For more examples, visit the pyobvector code repository.