Перейти к содержанию

Пример создания SQL-блока

Описание

SQL блок позволяет выполнить произвольное sql-выражение над любой частью модели данных Analytic Workspace.

Пользователь добавляет SQL-блок в модель и вкладывает внутрь блока один или несколько объектов.

В настройках блока указывается sql-выражение, которое должно быть выполнено над вложенными объектами. Названия полей (для SELECT/WHERE/ORDER BY/GROUP BY) и таблиц (для FROM) в sql-выражении указываются в соответствие со структурой вложенных объектов

После выполнения блока, его структура и данные соответствуют указанному sql-выражению.

Метаданные

В параметрах блока будет только одно поле с названием sql. При вводе sql-выражения должна быть подсветка синтаксиса и подсказки поля, поэтому будем использовать тип параметраsql_text.

Кроме этого, sql-выражения могут быть многострочными, поэтому сразу расширим высоту поля до 240 пикселей. Для этого, добавляем extra-значение height: 240.

block_meta.json
{
  "uid": "example_sql",
  "name": "SQL-блок (пример)",
  "version": "1.0",
  "description": "Выполнение SQL выражения над любой частью модели. Используется Spark SQL https://spark.apache.org/docs/latest/sql-ref.html",
  "author": "Analytic Workspace",
  "updated_at": "2023-07-30 09:00:00+00:00",
  "params": [{
    "code": "sql",
    "name": "SQL выражение",
    "type": "sql_text",
    "description": "Таблица дочернего объекта называется child. Пример запроса: SELECT * FROM child.",
    "required": true,
    "mult": false,
    "domain": null,
    "extra": {
      "height": 240
    }
  }]
}

Исходный код

Параметры

block_code.py
"""
ETL блок для выполнения SQL-выражений над произвольными частями модели
"""
from dataclasses import dataclass

from aw_etl.etl_blocks import InvalidEtlBlock


@dataclass
class BlockParams:  # (1)!
    """ 
    Параметры блока 
    """
    # текст SQL запроса
    sql: str


def get_block_params(params):
    """ 
    Возвращает параметры блока в виде объекта BlockParams. 

    Если sql-выражение не указано, то выбросим исключение c сообщением
    для пользователя
    """
    if not params.get('sql'):
        raise InvalidEtlBlock('Не указан текст SQL запроса')  # (2)!

    return BlockParams(
        sql=params.get('sql')
    )


def block_schema(app, schemas, params):
    """
    Возвращает схему блока
    """
    block_params = get_block_params(params)

    # Для каждого вложенного в блок объекта зарегистрируем
    # в каталоге Spark пустой датафрейм с схемой этого обхекта
    for obj_name, schema in schemas.items():
        df_obj = app.spark.createDataFrame([], schema=schema)  # (3)!
        df_obj.createOrReplaceTempView(obj_name)

    # К первому вложенному объекту можно обращаться с именем child, поэтому
    # для него зарегистрируем в каталоге Spark отдельный датафрейм
    first_schema = schemas.first()
    if first_schema:
        df_first = app.spark.createDataFrame([], schema=schema)
        df_first.createOrReplaceTempView('child')

    try:
        # Выполняем sql-выражение
        df_result = app.spark.sql(block_params.sql)
    finally:
        # Почистим за собой каталог сессии Spark
        for component_name, _ in schemas.items():
            app.spark.catalog.dropTempView(component_name)
        if first_schema:
            app.spark.catalog.dropTempView('child')

    # Возвращаем схему датафрейма, которая получается после 
    # выполнения sql-выражения
    return df_result.schema


def block_data(app, dfs, params):
    """
    Возвращаает DataFrame с данными блока
    """
    block_params = get_block_params(params)

    # Регистрируем датафреймы вложенных объектов в каталоге сессии Spark.
    # Имя датафрейма (для использования в FROM) совпадает с именем
    # объекта из структуры вложенных объектов.
    for df_name, upstream_df in dfs.items():
        upstream_df.createOrReplaceTempView(df_name)

    # Данные первого вложенного в блок объекта должны быть доступны
    # по имени "child" (select * from child)
    if dfs:
        dfs[0].createOrReplaceTempView('child')

    try:
        # Выполняем sql-выражение
        df = app.spark.sql(block_params.sql)
    finally:
        # Почистим за собой каталог сессии Spark
        for df_name, upstream_df in dfs.items():
            upstream_df.createOrReplaceTempView(df_name)
        app.spark.catalog.dropTempView('child')

    # Возвращаем датафрейм, который получается после выполнения sql-выражения
    return df
  1. Рекомендуется завести отдельный класс, в атрибуты которого будут считываться параметры блока.
  2. Чтобы указать системе, что работа блока превращена из-за того, что некорректно сконфигурированы его параметры, нужно выбросить исключение InvalidEtlBlock из пакета aw_etl.etl_blocks.
  3. Сессия Spark доступа в функциях etl-блока через параметр app. В этом объекте содержится информация о контексте текущего выполняемого приложения.

Тесты

test_example_sql.py
from pathlib import Path
import pytest

from pyspark.sql.types import (
    StructType,
    LongType,
    StringType
)

from aw_client.etl_blocks import (
    get_etl_block_meta, 
    get_etl_block_data,
    get_etl_block_schema,
    ModelObjectTestData
)


# ---------------------------------------------------------
# Тестовые данные
# ---------------------------------------------------------
TEST_OBJECT = ModelObjectTestData(  # (1)!
    model_name='test_object',
    rows=[
        {'id': 1, 'name': 'Название 1'},
        {'id': 2, 'name': 'Название 2'}
    ],
    schema=[
        {'model_name': 'id', 'simple_type': 'number'}, 
        {'model_name': 'name', 'simple_type': 'string'}
    ]
)

# ---------------------------------------------------------
# Тесты
# ---------------------------------------------------------
def test_block_meta():  # (2)!
    """ 
    Тест на получение метаданных блока. Позволяет проверить, 
    что block_meta.json не содержит ошибок
    """
    get_etl_block_meta(block_path=Path(__file__).parent)


def test_block_schema():
    """ 
    Проверяем, как определяется схема данных результатов выполнения
    SQL запроса
    """
    block_params = {
        'sql': 'select id, name from test_object'
    }

    block_schema = get_etl_block_schema(
        block_path=Path(__file__).parent, 
        test_data=[TEST_OBJECT],
        params=block_params
    )

    # Проверяем, что в схеме блока для SQL-запроса будет
    # одно целочисленное поле с названием "id"
    assert isinstance(block_schema, StructType)
    assert len(block_schema.fields) == 2
    assert block_schema.fields[0].name == 'id'
    assert block_schema.fields[0].dataType == LongType()
    assert block_schema.fields[1].name == 'name'
    assert block_schema.fields[1].dataType == StringType()


def test_block_schema_no_sql():
    """ 
    Проверяем, как функция получения схемы данных реагирует на то, что
    параметр sql не указан
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_schema(
            block_path=Path(__file__).parent,
            test_data=[TEST_OBJECT],
            params={
                "sql": ""
            },
        )

    assert 'не указан' in str(exc_info.value).lower() and \
        'sql' in str(exc_info.value).lower()


def test_block_data():
    """
    Проверяем, как возвращаются данные SQL запроса
    """
    block_params = {
        'sql': 'select id, name from test_object'
    }

    df = get_etl_block_data(
        block_path=Path(__file__).parent, 
        test_data=[TEST_OBJECT],
        params=block_params
    )

    rows = df.collect()

    assert len(rows) == 2
    for i in range(len(rows)):
        assert (rows[i]['id'], rows[i]['name']) == (TEST_OBJECT.rows[i]['id'], TEST_OBJECT.rows[i]['name'])


def test_block_data_no_sql():
    """ 
    Проверяем, как функция получения данных реагирует на то, что
    параметр sql не указан
    """
    with pytest.raises(Exception) as exc_info:
        get_etl_block_data(
            block_path=Path(__file__).parent,
            test_data=[TEST_OBJECT],
            params={
                "sql": ""
            },
        )

    assert 'не указан' in str(exc_info.value).lower() and \
        'sql' in str(exc_info.value).lower()