Airbyte CDK
Airbyte is a data integration platform for ELT pipelines from APIs, databases & files to warehouses & lakes. It has the largest catalog of ELT connectors to data warehouses and databases.
A lot of source connectors are implemented using the Airbyte CDK. This loader allows to run any of these connectors and return the data as documents.
Installationβ
First, you need to install the airbyte-cdk
python package.
#!pip install airbyte-cdk
Then, either install an existing connector from the Airbyte Github repository or create your own connector using the Airbyte CDK.
For example, to install the Github connector, run
#!pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"
Some sources are also published as regular packages on PyPI
Exampleβ
Now you can create an AirbyteCDKLoader
based on the imported source.
It takes a config
object thatβs passed to the connector. You also have
to pick the stream you want to retrieve records from by name
(stream_name
). Check the connectors documentation page and spec
definition for more information on the config object and available
streams. For the Github connectors these are:
- https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/spec.json.
- https://docs.airbyte.com/integrations/sources/github/
from langchain.document_loaders.airbyte import AirbyteCDKLoader
from source_github.source import SourceGithub # plug in your own source here
config = {
# your github configuration
"credentials": {"api_url": "api.github.com", "personal_access_token": "<token>"},
"repository": "<repo>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}
issues_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues"
)
Now you can load documents the usual way
docs = issues_loader.load()
As load
returns a list, it will block until all documents are loaded.
To have better control over this process, you can also you the
lazy_load
method which returns an iterator instead:
docs_iterator = issues_loader.lazy_load()
Keep in mind that by default the page content is empty and the metadata object contains all the information from the record. To create documents in a different, pass in a record_handler function when creating the loader:
from langchain.docstore.document import Document
def handle_record(record, id):
return Document(
page_content=record.data["title"] + "\n" + (record.data["body"] or ""),
metadata=record.data,
)
issues_loader = AirbyteCDKLoader(
source_class=SourceGithub,
config=config,
stream_name="issues",
record_handler=handle_record,
)
docs = issues_loader.load()
Incremental loadsβ
Some streams allow incremental loading, this means the source keeps track of synced records and wonβt load them again. This is useful for sources that have a high volume of data and are updated frequently.
To take advantage of this, store the last_state
property of the loader
and pass it in when creating the loader again. This will ensure that
only new records are loaded.
last_state = issues_loader.last_state # store safely
incremental_issue_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues", state=last_state
)
new_docs = incremental_issue_loader.load()