Using a Custom Adapter
opendbt provides the flexibility to register and utilize custom adapters. This capability allows users to extend
existing adapters, add new methods, or override existing ones. By introducing new methods to an adapter, you can expose
them to dbt macros and leverage them within your dbt models, for instance, by calling a new method during
materialization.
Let's walk through the process step by step.
lets see it ste by step
1: Extend Existing Adapter
Create a new adapter class that inherits from the desired base adapter. Add the necessary methods to this class.
here we are creating a method which will run given pyhon model locally. Notice @available
decorator is making it
available for use in dbt macros.
| import importlib
import logging
import sys
import tempfile
from multiprocessing.context import SpawnContext
from typing import Dict
from dbt.adapters.base import available
from dbt.adapters.duckdb import DuckDBAdapter
class DuckDBAdapterV2Custom(DuckDBAdapter):
def __init__(self, config, mp_context: SpawnContext = None) -> None:
print(f"WARNING: Using User Provided DBT Adapter: {type(self).__module__}.{type(self).__name__}")
# pylint: disable=no-value-for-parameter
if mp_context:
# DBT 1.8 and above
super().__init__(config=config, mp_context=mp_context)
else:
# DBT 1.7
super().__init__(config=config)
def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs):
try:
with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
try:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt_obj = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down dbt connection object from the adapter to the model
module.model(dbt=dbt_obj, **kwargs)
except Exception as e:
raise Exception(
f"Failed to load or execute python model:{model_name} from file {model_file.as_posix()}") from e
finally:
model_file.close()
except Exception as e:
raise Exception(f"Failed to create temp py file for model:{model_name}") from e
@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()
|
2: Activate Custom Adapter
In your dbt_project.yml
file, set the dbt_custom_adapter
variable to the fully qualified name of your
custom adapter class. when defined opendbt will take this adapter and activates it.
| vars:
dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom
|
Optionally you could provide this with run command
| from opendbt import OpenDbtProject
dp = OpenDbtProject(project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir",
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executedlt_model'])
|
3: Use new adapter in dbt macro
Call new adapter method from dbt macro/model.
| {% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %}
{%- set res = adapter.submit_local_python_job(model, compiled_code) -%}
{% endcall %}
|
4: Final
Execute dbt commands as usual. dbt will now load and utilize your custom adapter class, allowing you to
access the newly defined methods within your Jinja macros.
| from opendbt import OpenDbtProject
dp = OpenDbtProject(project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir")
dp.run(command="run")
|