Skip to content

Executing Python Models Locally

You can extend dbt to execute Python code locally by utilizing a customized adapter in conjunction with a custom materialization.

This approach offers a powerful way to integrate tasks like data ingestion from external APIs directly into your dbt workflows. It allows for the development of end-to-end data ingestion pipelines entirely within the dbt framework.

⚠NOTE: Be aware that this method means Python code execution and data processing occur within the dbt environment itself. For instance, if dbt is deployed on an Airflow server, the processing happens on that server. While this is generally fine for handling reasonable amounts of data, it might not be suitable for very large datasets or resource-intensive processing tasks.

1: Extend Adapter

Opendbt already comes with this feature implemented!

Below submit_local_python_job method will execute the provided Python code(compiled Python model) as a subprocess. Note the connection variable passed down to the python model. Which is used to save data to destination database.

import importlib
import sys
import tempfile
from typing import Dict

from dbt.adapters.base import available, BaseAdapter

from opendbt.runtime_patcher import PatchClass


@PatchClass(module_name="dbt.adapters.base", target_name="BaseAdapter")
class OpenDbtBaseAdapter(BaseAdapter):

    def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs):
        try:
            with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
                try:
                    model_file.write(compiled_code.lstrip().encode('utf-8'))
                    model_file.flush()
                    print(f"Created temp py file {model_file.name}")
                    # Load the module spec
                    spec = importlib.util.spec_from_file_location(model_name, model_file.name)
                    # Create a module object
                    module = importlib.util.module_from_spec(spec)
                    # Load the module
                    sys.modules[model_name] = module
                    spec.loader.exec_module(module)
                    dbt_obj = module.dbtObj(None)
                    # Access and call `model` function of the model!
                    # IMPORTANT: here we are passing down duckdb session from the adapter to the model
                    module.model(dbt=dbt_obj, **kwargs)
                except Exception as e:
                    raise Exception(
                        f"Failed to load or execute python model:{model_name} from file {model_file.as_posix()}") from e
                finally:
                    model_file.close()
        except Exception as e:
            raise Exception(f"Failed to create temp py file for model:{model_name}") from e

    @available
    def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
        connection = self.connections.get_if_exists()
        if not connection:
            connection = self.connections.get_thread_connection()
        self._execute_python_model(model_name=parsed_model['name'],
                                   compiled_code=compiled_code,
                                   # following args passed to model
                                   connection=connection)

2: Pyhon Execution from macro

Create a new materialization named executepython. This materialization will call the newly added submit_local_python_job method to execute the compiled Python code.

1
2
3
  {% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %}
      {%- set res = adapter.submit_local_python_job(model, compiled_code) -%}
  {% endcall %}

3: Final

Let's create a sample Python model that will be executed locally by dbt using the executepython materialization.

import os
import platform

from dbt import version
from dbt.adapters.contracts.connection import Connection


def print_info():
    _str = f"name:{os.name}, system:{platform.system()} release:{platform.release()}"
    _str += f"\npython version:{platform.python_version()}, dbt:{version.__version__}"
    print(_str)


def model(dbt, connection: Connection):
    dbt.config(materialized="executepython")
    print("==================================================")
    print("========IM LOCALLY EXECUTED PYTHON MODEL==========")
    print("==================================================")
    print_info()
    print("==================================================")
    print("===============MAKE DBT GREAT AGAIN===============")
    print("==================================================")
    return None