Пример блока "Декоратор"¶
Назначение блока¶
В этом примере мы создадим etl-блок, с помощью которого пользователи смогут реализовать произвольное поведение по обработке вложенных объектов.
Блок будет работать следующим образом:
- При создании модели пользователи будут добавлять блок декоратор
- В настройка блока пользователь указывает:
- Название функции из etl-скрипта модели, которая будет возвращать схему данных;
- Название функции из etl-скрипта модели, которая будет возвращать датафрейм с данными блока.
- При запуске etl-блока нам надо проверить наличие этих функций в etl-скрипте модели. Если функции есть, то будем передавать им управление. Если - нет, то выбрасывать исключение.
Метаданные¶
На форме настройки параметров блока мы должны запросить у пользователя названия двух функций:
- Какая функция из etl-скрипта отвечает за получение схемы данных;
- Какая функция из etl-скрипта отвечает за получение данных.
block_meta.json
{
"uid": "example_decorator",
"name": "Декоратор (пример)",
"version": "1.0",
"author": "Analytic Workspace",
"description": "",
"updated_at": "2023-07-30 09:00:00+00:00",
"params": [{
"code": "schema_func",
"name": "Функция для получения схемы",
"type": "string",
"required": true,
"mult": false,
"domain": null
}, {
"code": "data_func",
"name": "Функция для построения данных",
"type": "string",
"required": true,
"mult": false,
"domain": null
}]
}
Исходный код¶
При определении функций block_schema()
и block_data()
будем учитывать два момента:
- Для модели может быть a) не задан etl-скрипт или б) задан, но не содержать функций, которые пользователь указал как функции получения схемы или данных. Этот случай нужно корректно обработать и выбросить исключение с понятным пользователю текстом.
- Проаналзируем список параметров функций получения схемы и данных, которые указаны
пользователем в etl-скрипте. И передадим только те параметры, которые
объявлены для функции. Например, если для получения схемы данных не требуются
схемы вложенных в блок объектов, то не будем передавать
schemas
в функцию получения схемы из etl-скрипта.
block_code.py
import inspect # (1)
def block_schema(schemas, params, app):
"""
Получение схемы блока
"""
schema_func_name = params.get('schema_func')
if not schema_func_name:
raise Exception(
'Не указана функция для получения схемы'
)
if not app.model_module or schema_func_name not in app.model_module:
raise Exception(
f'Функция {schema_func_name} не найдена в etl-скрипте модели'
)
schema_func = app.model_module[schema_func_name] # (2)
declared_params = inspect.signature(schema_func).parameters # (3)
call_params = {} # (4)
if 'schemas' in declared_params:
call_params['schemas'] = schemas
if 'app' in declared_params:
call_params['app'] = app
return schema_func(**call_params)
def block_data(dfs, params, app):
"""
Получение данных блока
"""
data_func_name = params.get('data_func')
if not data_func_name:
raise Exception(
'Не указана функция для получения данных'
)
if not app.model_module or data_func_name not in app.model_module:
raise Exception(
f'Функция {data_func_name} не найдена в etl-скрипте модели'
)
data_func = app.model_module[data_func_name]
# Объявленные в скрипте модели параметры
declared_params = inspect.signature(data_func).parameters
# Параметры вызова функции
call_params = {}
if 'dfs' in declared_params:
call_params['dfs'] = dfs
if 'app' in declared_params:
call_params['app'] = app
return data_func(**call_params)
- Модуль inspect нужен для получения объявленных пользователем параметров функций в etl-скрипте.
- Получим функцию из etl-скрипта, которую пользователь указал в параметрах etl-блока.
- Используя возможности модуля
inspect
прочитаем список параметров функции. - В
call_params
будет содержаться фактический набор параметров, которые ожидаются функцией в etl-скрипте.
Тесты¶
В тестах сосредоточимся на проверке того, как вызываются функции из etl-скрипта модели. Для этого определим
тестовый скрипт в переменной SCRIPT_CODE
.
Также, проверим негативные варианты использования блока:
- Пользователь не указал название одной из функций в параметрах блока;
- Пользователь не указал etl-скрипт для модели;
- Пользователь указал в параметрах блока названия несуществующих в etl-скрипте функций.
test_example_json.py
from pathlib import Path
import pytest
from pyspark.sql.types import IntegerType, StringType
from aw_client.etl_blocks import get_etl_block_meta, \
get_etl_block_schema, get_etl_block_data
# ---------------------------------------------------
# Тестовые данные
# ---------------------------------------------------
SCRIPT_CODE = """
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, \
IntegerType, StringType
def custom_schema_func():
'''
Добавляем к первому вложенному объекту два поля: id и name
'''
return StructType(fields=[
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
])
def custom_data_func(app):
''' '''
rows = [
Row(id=1, name='Название')
]
return app.spark.createDataFrame(rows, schema=custom_schema_func())
"""
# ---------------------------------------------------
# Тест на проверку метаданных блока
# ---------------------------------------------------
def test_block_meta():
"""
Тест на корректность метаданных. Если в описании block_meta.json
есть ошибки, то вызов get_etl_block_meta вызовет ошибку
"""
get_etl_block_meta(block_path=Path(__file__).parent)
# ---------------------------------------------------
# Тесты на получение схемы данных
# ---------------------------------------------------
def test_block_schema():
"""
Получение схемы
"""
block_schema = get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[],
params={
'schema_func': 'custom_schema_func',
'data_func': 'custom_data_func'
},
model_script_code=SCRIPT_CODE
)
assert len(block_schema.fields) == 2
assert block_schema.fields[0].name == 'id'
assert block_schema.fields[0].dataType == IntegerType()
assert block_schema.fields[1].name == 'name'
assert block_schema.fields[1].dataType == StringType()
def test_block_schema_empty_function_name():
"""
Тест на получение схемы, когда в параметрах не указана
функция schema_func.
Должна появиться исключительная ситуация с текстом
про то, что функция не найдена в etl-скрипте модели
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[],
params={},
model_script_code=SCRIPT_CODE
)
assert 'Не указана функция' in str(exc_info.value)
def test_block_schema_empty_script():
"""
Тест на получение схемы, когда скрипт модели пуст.
Должна появиться исключительная ситуация с текстом
про то, что функция не указана в скрипте модели.
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[],
params={
'schema_func': 'custom_schema_func'
},
model_script_code=""
)
assert 'не найдена в etl-скрипте' in str(exc_info.value)
def test_block_schema_no_function_in_script():
"""
Тест на получение схемы, когда функции получения схемы
нет в etl-скрипте модели.
Должна появиться исключительная ситуация с текстом
про то, что функция не указана в скрипте модели.
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[],
params={
'schema_func': 'some_function_name'
},
model_script_code=SCRIPT_CODE
)
assert 'не найдена в etl-скрипте' in str(exc_info.value)
# ---------------------------------------------------
# Тесты на получение данных
# ---------------------------------------------------
def test_block_data():
"""
Тест на получение данных
"""
block_data = get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[],
params={
'schema_func': 'custom_schema_func',
'data_func': 'custom_data_func'
},
model_script_code=SCRIPT_CODE
)
rows = block_data.collect()
# Скрипт всегда возвращает только одну строку
assert len(rows) == 1
def test_block_data_empty_function_name():
"""
Тест на получение данных, когда в параметрах не указана
функция schema_func.
Должна появиться исключительная ситуация с текстом
про то, что функция не найдена в etl-скрипте модели
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[],
params={},
model_script_code=SCRIPT_CODE
)
assert 'Не указана функция' in str(exc_info.value)
def test_block_data_empty_script():
"""
Тест на получение данных, когда скрипт модели пуст.
Должна появиться исключительная ситуация с текстом
про то, что функция не указана в скрипте модели.
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[],
params={
'data_func': 'custom_data_func'
},
model_script_code=""
)
assert 'не найдена в etl-скрипте' in str(exc_info.value)
def test_block_data_no_function_in_script():
"""
Тест на получение данных, когда функции получения схемы
нет в etl-скрипте модели.
Должна появиться исключительная ситуация с текстом
про то, что функция не указана в скрипте модели.
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[],
params={
'data_func': 'some_function_name'
},
model_script_code=SCRIPT_CODE
)
assert 'не найдена в etl-скрипте' in str(exc_info.value)