Airbyte Hubspot
Airbyte is a data integration platform for ELT pipelines from APIs, databases & files to warehouses & lakes. It has the largest catalog of ELT connectors to data warehouses and databases.
This loader exposes the Hubspot connector as a document loader, allowing you to load various Hubspot objects as documents.
Installationβ
First, you need to install the airbyte-source-hubspot
python package.
#!pip install airbyte-source-hubspot
Exampleβ
Check out the Airbyte documentation page for details about how to configure the reader. The JSON schema the config object should adhere to can be found on Github: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml.
The general shape looks like this:
{
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"credentials": {
"credentials_title": "Private App Credentials",
"access_token": "<access token of your private app>"
}
}
By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader.
from langchain.document_loaders.airbyte import AirbyteHubspotLoader
config = {
# your hubspot configuration
}
loader = AirbyteHubspotLoader(
config=config, stream_name="products"
) # check the documentation linked above for a list of all streams
Now you can load documents the usual way
docs = loader.load()
As load
returns a list, it will block until all documents are loaded.
To have better control over this process, you can also you the
lazy_load
method which returns an iterator instead:
docs_iterator = loader.lazy_load()
Keep in mind that by default the page content is empty and the metadata
object contains all the information from the record. To process
documents, create a class inheriting from the base loader and implement
the _handle_records
method yourself:
from langchain.docstore.document import Document
def handle_record(record, id):
return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteHubspotLoader(
config=config, record_handler=handle_record, stream_name="products"
)
docs = loader.load()
Incremental loadsβ
Some streams allow incremental loading, this means the source keeps track of synced records and wonβt load them again. This is useful for sources that have a high volume of data and are updated frequently.
To take advantage of this, store the last_state
property of the loader
and pass it in when creating the loader again. This will ensure that
only new records are loaded.
last_state = loader.last_state # store safely
incremental_loader = AirbyteHubspotLoader(
config=config, stream_name="products", state=last_state
)
new_docs = incremental_loader.load()