Batch Functions
Batch Insert
- class relevanceai.api.batch.batch_insert.BatchInsertClient(project, api_key)
Bases:
relevanceai.api.batch.batch_retrieve.BatchRetrieveClient,relevanceai.api.endpoints.client.APIClient,relevanceai.api.batch.chunk.Chunker,doc_utils.doc_utils.DocUtils- api_key: str
- config: relevanceai.config.Config
- delete_pull_update_push_logs(dataset_id=False)
- insert_df(dataset_id, dataframe, *args, **kwargs)
Insert a dataframe for eachd doc
- insert_documents(dataset_id, docs, bulk_fn=None, max_workers=8, retry_chunk_mult=0.5, show_progress_bar=False, chunksize=0, *args, **kwargs)
Insert a list of documents with multi-threading automatically enabled.
When inserting the document you can optionally specify your own id for a document by using the field name “_id”, if not specified a random id is assigned.
When inserting or specifying vectors in a document use the suffix (ends with) “_vector_” for the field name. e.g. “product_description_vector_”.
When inserting or specifying chunks in a document the suffix (ends with) “_chunk_” for the field name. e.g. “products_chunk_”.
When inserting or specifying chunk vectors in a document’s chunks use the suffix (ends with) “_chunkvector_” for the field name. e.g. “products_chunk_.product_description_chunkvector_”.
Documentation can be found here: https://ingest-api-dev-aueast.relevance.ai/latest/documentation#operation/InsertEncode
- Parameters
dataset_id (string) – Unique name of dataset
docs (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’
bulk_fn (callable) – Function to apply to documents before uploading
max_workers (int) – Number of workers active for multi-threading
retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails
chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb
- project: str
- pull_update_push(original_collection, update_function, updated_collection=None, log_file=None, updating_args={}, retrieve_chunk_size=100, max_workers=8, filters=[], select_fields=[], show_progress_bar=True)
Loops through every document in your collection and applies a function (that is specified by you) to the documents. These documents are then uploaded into either an updated collection, or back into the original collection.
- Parameters
original_collection (string) – The dataset_id of the collection where your original documents are
logging_collection (string) – The dataset_id of the collection which logs which documents have been updated. If ‘None’, then one will be created for you.
updated_collection (string) – The dataset_id of the collection where your updated documents are uploaded into. If ‘None’, then your original collection will be updated.
update_function (function) – A function created by you that converts documents in your original collection into the updated documents. The function must contain a field which takes in a list of documents from the original collection. The output of the function must be a list of updated documents.
updating_args (dict) – Additional arguments to your update_function, if they exist. They must be in the format of {‘Argument’: Value}
retrieve_chunk_size (int) – The number of documents that are received from the original collection with each loop iteration.
max_workers (int) – The number of processors you want to parallelize with
max_error – How many failed uploads before the function breaks
- pull_update_push_to_cloud(original_collection, update_function, updated_collection=None, logging_collection=None, updating_args={}, retrieve_chunk_size=100, retrieve_chunk_size_failure_retry_multiplier=0.5, number_of_retrieve_retries=3, max_workers=8, max_error=1000, filters=[], select_fields=[], show_progress_bar=True)
Loops through every document in your collection and applies a function (that is specified by you) to the documents. These documents are then uploaded into either an updated collection, or back into the original collection.
- Parameters
original_collection (string) – The dataset_id of the collection where your original documents are
logging_collection (string) – The dataset_id of the collection which logs which documents have been updated. If ‘None’, then one will be created for you.
updated_collection (string) – The dataset_id of the collection where your updated documents are uploaded into. If ‘None’, then your original collection will be updated.
update_function (function) – A function created by you that converts documents in your original collection into the updated documents. The function must contain a field which takes in a list of documents from the original collection. The output of the function must be a list of updated documents.
updating_args (dict) – Additional arguments to your update_function, if they exist. They must be in the format of {‘Argument’: Value}
retrieve_chunk_size (int) – The number of documents that are received from the original collection with each loop iteration.
retrieve_chunk_size_failure_retry_multiplier (int) – If fails, retry on each chunk
max_workers (int) – The number of processors you want to parallelize with
max_error (
int) – How many failed uploads before the function breaks
- rename_fields(dataset_id, field_mappings, retrieve_chunk_size=100, max_workers=8, show_progress_bar=True)
Loops through every document in your collection and renames specified fields by deleting the old one and creating a new field using the provided mapping These documents are then uploaded into either an updated collection, or back into the original collection.
Example: rename_fields(dataset_id,field_mappings = {‘a.b.d’:’a.b.c’}) => doc[‘a’][‘b’][‘d’] => doc[‘a’][‘b’][‘c’] rename_fields(dataset_id,field_mappings = {‘a.b’:’a.c’}) => doc[‘a’][‘b’] => doc[‘a’][‘c’]
- Parameters
dataset_id (string) – The dataset_id of the collection where your original documents are
field_mappings (dict) – A dictionary in the form f {old_field_name1 : new_field_name1, …}
retrieve_chunk_size (int) – The number of documents that are received from the original collection with each loop iteration.
retrieve_chunk_size_failure_retry_multiplier (int) – If fails, retry on each chunk
max_workers (int) – The number of processors you want to parallelize with
show_progress_bar (bool) – Shows a progress bar if True
- update_documents(dataset_id, docs, bulk_fn=None, max_workers=8, retry_chunk_mult=0.5, chunksize=0, show_progress_bar=False, *args, **kwargs)
Update a list of documents with multi-threading automatically enabled. Edits documents by providing a key value pair of fields you are adding or changing, make sure to include the “_id” in the documents.
>>> from relevanceai import Client >>> url = "https://api-aueast.relevance.ai/v1/" >>> collection = "" >>> project = "" >>> api_key = "" >>> client = Client(project, api_key) >>> docs = client.datasets.documents.get_where(collection, select_fields=['title']) >>> while len(docs['documents']) > 0: >>> docs['documents'] = model.encode_documents_in_bulk(['product_name'], docs['documents']) >>> client.update_documents(collection, docs['documents']) >>> docs = client.datasets.documents.get_where(collection, select_fields=['product_name'], cursor=docs['cursor'])
- Parameters
dataset_id (string) – Unique name of dataset
docs (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’
bulk_fn (callable) – Function to apply to documents before uploading
max_workers (int) – Number of workers active for multi-threading
retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails
chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb
Batch Retrieve
- class relevanceai.api.batch.batch_retrieve.BatchRetrieveClient(project, api_key)
Bases:
relevanceai.api.endpoints.client.APIClient,relevanceai.api.batch.chunk.Chunker- api_key: str
- config: relevanceai.config.Config
- get_all_documents(dataset_id, chunk_size=1000, filters=[], sort=[], select_fields=[], include_vector=True, show_progress_bar=True)
Retrieve all documents with filters. Filter is used to retrieve documents that match the conditions set in a filter query. This is used in advance search to filter the documents that are searched. For more details see documents.get_where.
Example
>>> client = Client() >>> client.get_all_documents(dataset_id="sample_dataset"")
- Parameters
dataset_id (string) – Unique name of dataset
chunk_size (list) – Number of documents to retrieve per retrieval
include_vector (bool) – Include vectors in the search results
sort (list) – Fields to sort by. For each field, sort by descending or ascending. If you are using descending by datetime, it will get the most recent ones.
filters (list) – Query for filtering the search results
select_fields (list) – Fields to include in the search results, empty array/list means all fields.
- get_documents(dataset_id, number_of_documents=20, filters=[], cursor=None, batch_size=1000, sort=[], select_fields=[], include_vector=True)
Retrieve documents with filters. Filter is used to retrieve documents that match the conditions set in a filter query. This is used in advance search to filter the documents that are searched.
If you are looking to combine your filters with multiple ORs, simply add the following inside the query {“strict”:”must_or”}. :type dataset_id:
str:param dataset_id: Unique name of dataset :type dataset_id: string :type number_of_documents:int:param number_of_documents: Number of documents to retrieve :type number_of_documents: int :type select_fields:list:param select_fields: Fields to include in the search results, empty array/list means all fields. :type select_fields: list :type cursor:Optional[str] :param cursor: Cursor to paginate the document retrieval :type cursor: string :type batch_size:int:param batch_size: Number of documents to retrieve per iteration :type batch_size: int :type include_vector:bool:param include_vector: Include vectors in the search results :type include_vector: bool :type sort:list:param sort: Fields to sort by. For each field, sort by descending or ascending. If you are using descending by datetime, it will get the most recent ones. :type sort: list :type filters:list:param filters: Query for filtering the search results :type filters: list
- get_number_of_documents(dataset_id, filters=[])
Get number of documents in a dataset. Filter can be used to select documents that match the conditions set in a filter query. For more details see documents.get_where.
- Parameters
dataset_ids (list) – Unique names of datasets
filters (list) – Filters to select documents
- get_vector_fields(dataset_id)
Returns list of valid vector fields in dataset :param dataset_id: Unique name of dataset :type dataset_id: string
- project: str
Chunk Helper functions
- class relevanceai.api.batch.chunk.Chunker
Bases:
objectUpdate the chunk Mixins
- chunk(documents, chunksize=20)
Chunk an iterable object in Python.
Example:
>>> documents = [{...}] >>> ViClient.chunk(documents)
- Parameters
documents (
Union[DataFrame,List]) – List of dictionaries/Pandas dataframechunksize (
int) – The chunk size of an object.
Local logger for pull_update_push.
- class relevanceai.api.batch.local_logger.PullUpdatePushLocalLogger(filename)
Bases:
relevanceai.logger.LoguruLoggerThis logger class is specifically for pull_update_push to log failures locally as opposed to on the cloud.
- count_ids_in_fn()
Returns total count of failed IDs
- Return type
int
- critical: Callable
- debug: Callable
- error: Callable
- info: Callable
- log_ids(id_list, verbose=True)
Log the failed IDs to the file
- success: Callable
- warn: Callable
- warning: Callable