Перейти к содержанию

Пример блока "Декоратор"

Назначение блока

В этом примере мы создадим etl-блок, с помощью которого пользователи смогут реализовать произвольное поведение по обработке вложенных объектов.

Блок будет работать следующим образом:

  1. При создании модели пользователи будут добавлять блок декоратор
  2. В настройка блока пользователь указывает:
  3. Название функции из etl-скрипта модели, которая будет возвращать схему данных;
  4. Название функции из etl-скрипта модели, которая будет возвращать датафрейм с данными блока.
  5. При запуске etl-блока нам надо проверить наличие этих функций в etl-скрипте модели. Если функции есть, то будем передавать им управление. Если - нет, то выбрасывать исключение.

Метаданные

На форме настройки параметров блока мы должны запросить у пользователя названия двух функций:

  • Какая функция из etl-скрипта отвечает за получение схемы данных;
  • Какая функция из etl-скрипта отвечает за получение данных.
block_meta.json
{
    "uid": "example_decorator",
    "name": "Декоратор (пример)",
    "version": "1.0",
    "author": "Analytic Workspace",
    "description": "",
    "updated_at": "2023-07-30 09:00:00+00:00",
    "params": [{
        "code": "schema_func",
        "name": "Функция для получения схемы",
        "type": "string",
        "required": true,
        "mult": false,
        "domain": null
    }, {
        "code": "data_func",
        "name": "Функция для построения данных",
        "type": "string",
        "required": true,
        "mult": false,
        "domain": null
    }]
}

Исходный код

При определении функций block_schema() и block_data() будем учитывать два момента:

  1. Для модели может быть a) не задан etl-скрипт или б) задан, но не содержать функций, которые пользователь указал как функции получения схемы или данных. Этот случай нужно корректно обработать и выбросить исключение с понятным пользователю текстом.
  2. Проаналзируем список параметров функций получения схемы и данных, которые указаны пользователем в etl-скрипте. И передадим только те параметры, которые объявлены для функции. Например, если для получения схемы данных не требуются схемы вложенных в блок объектов, то не будем передавать schemas в функцию получения схемы из etl-скрипта.
block_code.py
import inspect  # (1) 


def block_schema(schemas, params, app):
    """ 
    Получение схемы блока
    """
    schema_func_name = params.get('schema_func')
    if not schema_func_name:
        raise Exception(
            'Не указана функция для получения схемы'
        )

    if not app.model_module or schema_func_name not in app.model_module:
        raise Exception(
            f'Функция {schema_func_name} не найдена в etl-скрипте модели'
        )

    schema_func = app.model_module[schema_func_name]  # (2)

    declared_params = inspect.signature(schema_func).parameters  # (3)

    call_params = {}  # (4)
    if 'schemas' in declared_params:
        call_params['schemas'] = schemas
    if 'app' in declared_params:
        call_params['app'] = app

    return schema_func(**call_params)


def block_data(dfs, params, app):
    """ 
    Получение данных блока
    """
    data_func_name = params.get('data_func')
    if not data_func_name:
        raise Exception(
            'Не указана функция для получения данных'
        )

    if not app.model_module or data_func_name not in app.model_module:
        raise Exception(
            f'Функция {data_func_name} не найдена в etl-скрипте модели'
        )

    data_func = app.model_module[data_func_name]

    # Объявленные в скрипте модели параметры
    declared_params = inspect.signature(data_func).parameters

    # Параметры вызова функции
    call_params = {}
    if 'dfs' in declared_params:
        call_params['dfs'] = dfs
    if 'app' in declared_params:
        call_params['app'] = app

    return data_func(**call_params)
  1. Модуль inspect нужен для получения объявленных пользователем параметров функций в etl-скрипте.
  2. Получим функцию из etl-скрипта, которую пользователь указал в параметрах etl-блока.
  3. Используя возможности модуля inspect прочитаем список параметров функции.
  4. В call_params будет содержаться фактический набор параметров, которые ожидаются функцией в etl-скрипте.

Тесты

В тестах сосредоточимся на проверке того, как вызываются функции из etl-скрипта модели. Для этого определим тестовый скрипт в переменной SCRIPT_CODE.

Также, проверим негативные варианты использования блока:

  • Пользователь не указал название одной из функций в параметрах блока;
  • Пользователь не указал etl-скрипт для модели;
  • Пользователь указал в параметрах блока названия несуществующих в etl-скрипте функций.
test_example_json.py
from pathlib import Path

import pytest

from pyspark.sql.types import IntegerType, StringType

from aw_client.etl_blocks import get_etl_block_meta, \
    get_etl_block_schema, get_etl_block_data


# ---------------------------------------------------
# Тестовые данные
# ---------------------------------------------------
SCRIPT_CODE = """
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, \
    IntegerType, StringType

def custom_schema_func():
    ''' 
    Добавляем к первому вложенному объекту два поля: id и name
    '''
    return StructType(fields=[
        StructField('id', IntegerType(), True),
        StructField('name', StringType(), True),
    ])

def custom_data_func(app):
    ''' '''
    rows = [
        Row(id=1, name='Название')
    ]
    return app.spark.createDataFrame(rows, schema=custom_schema_func())
"""


# ---------------------------------------------------
# Тест на проверку метаданных блока
# ---------------------------------------------------
def test_block_meta():
    """ 
    Тест на корректность метаданных. Если в описании block_meta.json
    есть ошибки, то вызов get_etl_block_meta вызовет ошибку
    """
    get_etl_block_meta(block_path=Path(__file__).parent)


# ---------------------------------------------------
# Тесты на получение схемы данных
# ---------------------------------------------------
def test_block_schema():
    """ 
    Получение схемы
    """
    block_schema = get_etl_block_schema(
        block_path=Path(__file__).parent,
        test_data=[],
        params={
            'schema_func': 'custom_schema_func',
            'data_func': 'custom_data_func'
        },
        model_script_code=SCRIPT_CODE
    )

    assert len(block_schema.fields) == 2
    assert block_schema.fields[0].name == 'id'
    assert block_schema.fields[0].dataType == IntegerType()
    assert block_schema.fields[1].name == 'name'
    assert block_schema.fields[1].dataType == StringType()


def test_block_schema_empty_function_name():
    """ 
    Тест на получение схемы, когда в параметрах не указана
    функция schema_func.

    Должна появиться исключительная ситуация с текстом
    про то, что функция не найдена в etl-скрипте модели
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_schema(
            block_path=Path(__file__).parent,
            test_data=[],
            params={},
            model_script_code=SCRIPT_CODE
        )

    assert 'Не указана функция' in str(exc_info.value)


def test_block_schema_empty_script():
    """ 
    Тест на получение схемы, когда скрипт модели пуст.

    Должна появиться исключительная ситуация с текстом
    про то, что функция не указана в скрипте модели.
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_schema(
            block_path=Path(__file__).parent,
            test_data=[],
            params={
                'schema_func': 'custom_schema_func'
            },
            model_script_code=""
        )

    assert 'не найдена в etl-скрипте' in str(exc_info.value)

def test_block_schema_no_function_in_script():
    """ 
    Тест на получение схемы, когда функции получения схемы 
    нет в etl-скрипте модели.

    Должна появиться исключительная ситуация с текстом
    про то, что функция не указана в скрипте модели.
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_schema(
            block_path=Path(__file__).parent,
            test_data=[],
            params={
                'schema_func': 'some_function_name'
            },
            model_script_code=SCRIPT_CODE
        )

    assert 'не найдена в etl-скрипте' in str(exc_info.value)

# ---------------------------------------------------
# Тесты на получение данных
# ---------------------------------------------------
def test_block_data():
    """ 
    Тест на получение данных
    """
    block_data = get_etl_block_data(
        block_path=Path(__file__).parent,
        test_data=[],
        params={
            'schema_func': 'custom_schema_func',
            'data_func': 'custom_data_func'
        },
        model_script_code=SCRIPT_CODE
    )

    rows = block_data.collect()

    # Скрипт всегда возвращает только одну строку
    assert len(rows) == 1

def test_block_data_empty_function_name():
    """ 
    Тест на получение данных, когда в параметрах не указана
    функция schema_func.

    Должна появиться исключительная ситуация с текстом
    про то, что функция не найдена в etl-скрипте модели
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_data(
            block_path=Path(__file__).parent,
            test_data=[],
            params={},
            model_script_code=SCRIPT_CODE
        )

    assert 'Не указана функция' in str(exc_info.value)


def test_block_data_empty_script():
    """ 
    Тест на получение данных, когда скрипт модели пуст.

    Должна появиться исключительная ситуация с текстом
    про то, что функция не указана в скрипте модели.
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_data(
            block_path=Path(__file__).parent,
            test_data=[],
            params={
                'data_func': 'custom_data_func'
            },
            model_script_code=""
        )

    assert 'не найдена в etl-скрипте' in str(exc_info.value)

def test_block_data_no_function_in_script():
    """ 
    Тест на получение данных, когда функции получения схемы 
    нет в etl-скрипте модели.

    Должна появиться исключительная ситуация с текстом
    про то, что функция не указана в скрипте модели.
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_data(
            block_path=Path(__file__).parent,
            test_data=[],
            params={
                'data_func': 'some_function_name'
            },
            model_script_code=SCRIPT_CODE
        )

    assert 'не найдена в etl-скрипте' in str(exc_info.value)