Skip to content

Using a Custom Adapter

opendbt provides the flexibility to register and utilize custom adapters. This capability allows users to extend existing adapters, add new methods, or override existing ones. By introducing new methods to an adapter, you can expose them to dbt macros and leverage them within your dbt models, for instance, by calling a new method during materialization.

Let's walk through the process step by step.

lets see it ste by step

1: Extend Existing Adapter

Create a new adapter class that inherits from the desired base adapter. Add the necessary methods to this class.

here we are creating a method which will run given pyhon model locally. Notice @available decorator is making it available for use in dbt macros.

import importlib
import logging
import sys
import tempfile
from multiprocessing.context import SpawnContext
from typing import Dict

from dbt.adapters.base import available
from dbt.adapters.duckdb import DuckDBAdapter


class DuckDBAdapterV2Custom(DuckDBAdapter):
    def __init__(self, config, mp_context: SpawnContext = None) -> None:
        print(f"WARNING: Using User Provided DBT Adapter: {type(self).__module__}.{type(self).__name__}")
        # pylint: disable=no-value-for-parameter
        if mp_context:
            # DBT 1.8 and above
            super().__init__(config=config, mp_context=mp_context)
        else:
            # DBT 1.7
            super().__init__(config=config)

    def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs):
        try:
            with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
                try:
                    model_file.write(compiled_code.lstrip().encode('utf-8'))
                    model_file.flush()
                    print(f"Created temp py file {model_file.name}")
                    # Load the module spec
                    spec = importlib.util.spec_from_file_location(model_name, model_file.name)
                    # Create a module object
                    module = importlib.util.module_from_spec(spec)
                    # Load the module
                    sys.modules[model_name] = module
                    spec.loader.exec_module(module)
                    dbt_obj = module.dbtObj(None)
                    # Access and call `model` function of the model!
                    # IMPORTANT: here we are passing down dbt connection object from the adapter to the model
                    module.model(dbt=dbt_obj, **kwargs)
                except Exception as e:
                    raise Exception(
                        f"Failed to load or execute python model:{model_name} from file {model_file.as_posix()}") from e
                finally:
                    model_file.close()
        except Exception as e:
            raise Exception(f"Failed to create temp py file for model:{model_name}") from e

    @available
    def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
        connection = self.connections.get_if_exists()
        if not connection:
            connection = self.connections.get_thread_connection()

2: Activate Custom Adapter

In your dbt_project.yml file, set the dbt_custom_adapter variable to the fully qualified name of your custom adapter class. when defined opendbt will take this adapter and activates it.

vars:
  dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom

Optionally you could provide this with run command

1
2
3
4
5
from opendbt import OpenDbtProject

dp = OpenDbtProject(project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir",
                    args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executedlt_model'])

3: Use new adapter in dbt macro

Call new adapter method from dbt macro/model.

1
2
3
  {% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %}
      {%- set res = adapter.submit_local_python_job(model, compiled_code) -%}
  {% endcall %}

4: Final

Execute dbt commands as usual. dbt will now load and utilize your custom adapter class, allowing you to access the newly defined methods within your Jinja macros.

1
2
3
4
from opendbt import OpenDbtProject

dp = OpenDbtProject(project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir")
dp.run(command="run")