Batch Functions

Batch Insert

class relevanceai.api.batch.batch_insert.BatchInsertClient(project, api_key)

Bases: relevanceai.api.batch.batch_retrieve.BatchRetrieveClient, relevanceai.api.endpoints.client.APIClient, relevanceai.api.batch.chunk.Chunker, doc_utils.doc_utils.DocUtils

api_key: str
config: relevanceai.config.Config
delete_pull_update_push_logs(dataset_id=False)
insert_df(dataset_id, dataframe, *args, **kwargs)

Insert a dataframe for eachd doc

insert_documents(dataset_id, docs, bulk_fn=None, max_workers=8, retry_chunk_mult=0.5, show_progress_bar=False, chunksize=0, *args, **kwargs)

Insert a list of documents with multi-threading automatically enabled.

  • When inserting the document you can optionally specify your own id for a document by using the field name “_id”, if not specified a random id is assigned.

  • When inserting or specifying vectors in a document use the suffix (ends with) “_vector_” for the field name. e.g. “product_description_vector_”.

  • When inserting or specifying chunks in a document the suffix (ends with) “_chunk_” for the field name. e.g. “products_chunk_”.

  • When inserting or specifying chunk vectors in a document’s chunks use the suffix (ends with) “_chunkvector_” for the field name. e.g. “products_chunk_.product_description_chunkvector_”.

Documentation can be found here: https://ingest-api-dev-aueast.relevance.ai/latest/documentation#operation/InsertEncode

Parameters
  • dataset_id (string) – Unique name of dataset

  • docs (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’

  • bulk_fn (callable) – Function to apply to documents before uploading

  • max_workers (int) – Number of workers active for multi-threading

  • retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails

  • chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb

project: str
pull_update_push(original_collection, update_function, updated_collection=None, log_file=None, updating_args={}, retrieve_chunk_size=100, max_workers=8, filters=[], select_fields=[], show_progress_bar=True)

Loops through every document in your collection and applies a function (that is specified by you) to the documents. These documents are then uploaded into either an updated collection, or back into the original collection.

Parameters
  • original_collection (string) – The dataset_id of the collection where your original documents are

  • logging_collection (string) – The dataset_id of the collection which logs which documents have been updated. If ‘None’, then one will be created for you.

  • updated_collection (string) – The dataset_id of the collection where your updated documents are uploaded into. If ‘None’, then your original collection will be updated.

  • update_function (function) – A function created by you that converts documents in your original collection into the updated documents. The function must contain a field which takes in a list of documents from the original collection. The output of the function must be a list of updated documents.

  • updating_args (dict) – Additional arguments to your update_function, if they exist. They must be in the format of {‘Argument’: Value}

  • retrieve_chunk_size (int) – The number of documents that are received from the original collection with each loop iteration.

  • max_workers (int) – The number of processors you want to parallelize with

  • max_error – How many failed uploads before the function breaks

pull_update_push_to_cloud(original_collection, update_function, updated_collection=None, logging_collection=None, updating_args={}, retrieve_chunk_size=100, retrieve_chunk_size_failure_retry_multiplier=0.5, number_of_retrieve_retries=3, max_workers=8, max_error=1000, filters=[], select_fields=[], show_progress_bar=True)

Loops through every document in your collection and applies a function (that is specified by you) to the documents. These documents are then uploaded into either an updated collection, or back into the original collection.

Parameters
  • original_collection (string) – The dataset_id of the collection where your original documents are

  • logging_collection (string) – The dataset_id of the collection which logs which documents have been updated. If ‘None’, then one will be created for you.

  • updated_collection (string) – The dataset_id of the collection where your updated documents are uploaded into. If ‘None’, then your original collection will be updated.

  • update_function (function) – A function created by you that converts documents in your original collection into the updated documents. The function must contain a field which takes in a list of documents from the original collection. The output of the function must be a list of updated documents.

  • updating_args (dict) – Additional arguments to your update_function, if they exist. They must be in the format of {‘Argument’: Value}

  • retrieve_chunk_size (int) – The number of documents that are received from the original collection with each loop iteration.

  • retrieve_chunk_size_failure_retry_multiplier (int) – If fails, retry on each chunk

  • max_workers (int) – The number of processors you want to parallelize with

  • max_error (int) – How many failed uploads before the function breaks

rename_fields(dataset_id, field_mappings, retrieve_chunk_size=100, max_workers=8, show_progress_bar=True)

Loops through every document in your collection and renames specified fields by deleting the old one and creating a new field using the provided mapping These documents are then uploaded into either an updated collection, or back into the original collection.

Example: rename_fields(dataset_id,field_mappings = {‘a.b.d’:’a.b.c’}) => doc[‘a’][‘b’][‘d’] => doc[‘a’][‘b’][‘c’] rename_fields(dataset_id,field_mappings = {‘a.b’:’a.c’}) => doc[‘a’][‘b’] => doc[‘a’][‘c’]

Parameters
  • dataset_id (string) – The dataset_id of the collection where your original documents are

  • field_mappings (dict) – A dictionary in the form f {old_field_name1 : new_field_name1, …}

  • retrieve_chunk_size (int) – The number of documents that are received from the original collection with each loop iteration.

  • retrieve_chunk_size_failure_retry_multiplier (int) – If fails, retry on each chunk

  • max_workers (int) – The number of processors you want to parallelize with

  • show_progress_bar (bool) – Shows a progress bar if True

update_documents(dataset_id, docs, bulk_fn=None, max_workers=8, retry_chunk_mult=0.5, chunksize=0, show_progress_bar=False, *args, **kwargs)

Update a list of documents with multi-threading automatically enabled. Edits documents by providing a key value pair of fields you are adding or changing, make sure to include the “_id” in the documents.

>>> from relevanceai import Client
>>> url = "https://api-aueast.relevance.ai/v1/"
>>> collection = ""
>>> project = ""
>>> api_key = ""
>>> client = Client(project, api_key)
>>> docs = client.datasets.documents.get_where(collection, select_fields=['title'])
>>> while len(docs['documents']) > 0:
>>>     docs['documents'] = model.encode_documents_in_bulk(['product_name'], docs['documents'])
>>>     client.update_documents(collection, docs['documents'])
>>>     docs = client.datasets.documents.get_where(collection, select_fields=['product_name'], cursor=docs['cursor'])
Parameters
  • dataset_id (string) – Unique name of dataset

  • docs (list) – A list of documents. Document is a JSON-like data that we store our metadata and vectors with. For specifying id of the document use the field ‘_id’, for specifying vector field use the suffix of ‘_vector_’

  • bulk_fn (callable) – Function to apply to documents before uploading

  • max_workers (int) – Number of workers active for multi-threading

  • retry_chunk_mult (int) – Multiplier to apply to chunksize if upload fails

  • chunksize (int) – Number of documents to upload per worker. If None, it will default to the size specified in config.upload.target_chunk_mb

Batch Retrieve

class relevanceai.api.batch.batch_retrieve.BatchRetrieveClient(project, api_key)

Bases: relevanceai.api.endpoints.client.APIClient, relevanceai.api.batch.chunk.Chunker

api_key: str
config: relevanceai.config.Config
get_all_documents(dataset_id, chunk_size=1000, filters=[], sort=[], select_fields=[], include_vector=True, show_progress_bar=True)

Retrieve all documents with filters. Filter is used to retrieve documents that match the conditions set in a filter query. This is used in advance search to filter the documents that are searched. For more details see documents.get_where.

Example

>>> client = Client()
>>> client.get_all_documents(dataset_id="sample_dataset"")
Parameters
  • dataset_id (string) – Unique name of dataset

  • chunk_size (list) – Number of documents to retrieve per retrieval

  • include_vector (bool) – Include vectors in the search results

  • sort (list) – Fields to sort by. For each field, sort by descending or ascending. If you are using descending by datetime, it will get the most recent ones.

  • filters (list) – Query for filtering the search results

  • select_fields (list) – Fields to include in the search results, empty array/list means all fields.

get_documents(dataset_id, number_of_documents=20, filters=[], cursor=None, batch_size=1000, sort=[], select_fields=[], include_vector=True)

Retrieve documents with filters. Filter is used to retrieve documents that match the conditions set in a filter query. This is used in advance search to filter the documents that are searched.

If you are looking to combine your filters with multiple ORs, simply add the following inside the query {“strict”:”must_or”}. :type dataset_id: str :param dataset_id: Unique name of dataset :type dataset_id: string :type number_of_documents: int :param number_of_documents: Number of documents to retrieve :type number_of_documents: int :type select_fields: list :param select_fields: Fields to include in the search results, empty array/list means all fields. :type select_fields: list :type cursor: Optional[str] :param cursor: Cursor to paginate the document retrieval :type cursor: string :type batch_size: int :param batch_size: Number of documents to retrieve per iteration :type batch_size: int :type include_vector: bool :param include_vector: Include vectors in the search results :type include_vector: bool :type sort: list :param sort: Fields to sort by. For each field, sort by descending or ascending. If you are using descending by datetime, it will get the most recent ones. :type sort: list :type filters: list :param filters: Query for filtering the search results :type filters: list

get_number_of_documents(dataset_id, filters=[])

Get number of documents in a dataset. Filter can be used to select documents that match the conditions set in a filter query. For more details see documents.get_where.

Parameters
  • dataset_ids (list) – Unique names of datasets

  • filters (list) – Filters to select documents

get_vector_fields(dataset_id)

Returns list of valid vector fields in dataset :param dataset_id: Unique name of dataset :type dataset_id: string

project: str

Chunk Helper functions

class relevanceai.api.batch.chunk.Chunker

Bases: object

Update the chunk Mixins

chunk(documents, chunksize=20)

Chunk an iterable object in Python.

Example:

>>> documents = [{...}]
>>> ViClient.chunk(documents)
Parameters
  • documents (Union[DataFrame, List]) – List of dictionaries/Pandas dataframe

  • chunksize (int) – The chunk size of an object.

Local logger for pull_update_push.

class relevanceai.api.batch.local_logger.PullUpdatePushLocalLogger(filename)

Bases: relevanceai.logger.LoguruLogger

This logger class is specifically for pull_update_push to log failures locally as opposed to on the cloud.

count_ids_in_fn()

Returns total count of failed IDs

Return type

int

critical: Callable
debug: Callable
error: Callable
info: Callable
log_ids(id_list, verbose=True)

Log the failed IDs to the file

success: Callable
warn: Callable
warning: Callable

Module contents