Пример создания SQL-блока¶
Описание¶
SQL блок позволяет выполнить произвольное sql-выражение над любой частью модели данных Analytic Workspace.
Пользователь добавляет SQL-блок в модель и вкладывает внутрь блока один или несколько объектов.
В настройках блока указывается sql-выражение, которое должно быть выполнено над вложенными объектами. Названия
полей (для SELECT/WHERE/ORDER BY/GROUP BY
) и таблиц (для FROM
) в sql-выражении указываются в соответствие со структурой
вложенных объектов
После выполнения блока, его структура и данные соответствуют указанному sql-выражению.
Метаданные¶
В параметрах блока будет только одно поле с названием sql
. При вводе sql-выражения должна быть подсветка синтаксиса и подсказки поля,
поэтому будем использовать тип параметраsql_text
.
Кроме этого, sql-выражения могут быть многострочными, поэтому сразу расширим высоту поля до 240 пикселей. Для этого, добавляем
extra-значение height: 240
.
{
"uid": "example_sql",
"name": "SQL-блок (пример)",
"version": "1.0",
"description": "Выполнение SQL выражения над любой частью модели. Используется Spark SQL https://spark.apache.org/docs/latest/sql-ref.html",
"author": "Analytic Workspace",
"updated_at": "2023-07-30 09:00:00+00:00",
"params": [{
"code": "sql",
"name": "SQL выражение",
"type": "sql_text",
"description": "Таблица дочернего объекта называется child. Пример запроса: SELECT * FROM child.",
"required": true,
"mult": false,
"domain": null,
"extra": {
"height": 240
}
}]
}
Исходный код¶
Параметры
"""
ETL блок для выполнения SQL-выражений над произвольными частями модели
"""
from dataclasses import dataclass
from aw_etl.etl_blocks import InvalidEtlBlock
@dataclass
class BlockParams: # (1)!
"""
Параметры блока
"""
# текст SQL запроса
sql: str
def get_block_params(params):
"""
Возвращает параметры блока в виде объекта BlockParams.
Если sql-выражение не указано, то выбросим исключение c сообщением
для пользователя
"""
if not params.get('sql'):
raise InvalidEtlBlock('Не указан текст SQL запроса') # (2)!
return BlockParams(
sql=params.get('sql')
)
def block_schema(app, schemas, params):
"""
Возвращает схему блока
"""
block_params = get_block_params(params)
# Для каждого вложенного в блок объекта зарегистрируем
# в каталоге Spark пустой датафрейм с схемой этого обхекта
for obj_name, schema in schemas.items():
df_obj = app.spark.createDataFrame([], schema=schema) # (3)!
df_obj.createOrReplaceTempView(obj_name)
# К первому вложенному объекту можно обращаться с именем child, поэтому
# для него зарегистрируем в каталоге Spark отдельный датафрейм
first_schema = schemas.first()
if first_schema:
df_first = app.spark.createDataFrame([], schema=schema)
df_first.createOrReplaceTempView('child')
try:
# Выполняем sql-выражение
df_result = app.spark.sql(block_params.sql)
finally:
# Почистим за собой каталог сессии Spark
for component_name, _ in schemas.items():
app.spark.catalog.dropTempView(component_name)
if first_schema:
app.spark.catalog.dropTempView('child')
# Возвращаем схему датафрейма, которая получается после
# выполнения sql-выражения
return df_result.schema
def block_data(app, dfs, params):
"""
Возвращаает DataFrame с данными блока
"""
block_params = get_block_params(params)
# Регистрируем датафреймы вложенных объектов в каталоге сессии Spark.
# Имя датафрейма (для использования в FROM) совпадает с именем
# объекта из структуры вложенных объектов.
for df_name, upstream_df in dfs.items():
upstream_df.createOrReplaceTempView(df_name)
# Данные первого вложенного в блок объекта должны быть доступны
# по имени "child" (select * from child)
if dfs:
dfs[0].createOrReplaceTempView('child')
try:
# Выполняем sql-выражение
df = app.spark.sql(block_params.sql)
finally:
# Почистим за собой каталог сессии Spark
for df_name, upstream_df in dfs.items():
upstream_df.createOrReplaceTempView(df_name)
app.spark.catalog.dropTempView('child')
# Возвращаем датафрейм, который получается после выполнения sql-выражения
return df
- Рекомендуется завести отдельный класс, в атрибуты которого будут считываться параметры блока.
- Чтобы указать системе, что работа блока превращена из-за того, что некорректно сконфигурированы его параметры,
нужно выбросить исключение
InvalidEtlBlock
из пакетаaw_etl.etl_blocks
. - Сессия Spark доступа в функциях etl-блока через параметр
app
. В этом объекте содержится информация о контексте текущего выполняемого приложения.
Тесты¶
from pathlib import Path
import pytest
from pyspark.sql.types import (
StructType,
LongType,
StringType
)
from aw_client.etl_blocks import (
get_etl_block_meta,
get_etl_block_data,
get_etl_block_schema,
ModelObjectTestData
)
# ---------------------------------------------------------
# Тестовые данные
# ---------------------------------------------------------
TEST_OBJECT = ModelObjectTestData( # (1)!
model_name='test_object',
rows=[
{'id': 1, 'name': 'Название 1'},
{'id': 2, 'name': 'Название 2'}
],
schema=[
{'model_name': 'id', 'simple_type': 'number'},
{'model_name': 'name', 'simple_type': 'string'}
]
)
# ---------------------------------------------------------
# Тесты
# ---------------------------------------------------------
def test_block_meta(): # (2)!
"""
Тест на получение метаданных блока. Позволяет проверить,
что block_meta.json не содержит ошибок
"""
get_etl_block_meta(block_path=Path(__file__).parent)
def test_block_schema():
"""
Проверяем, как определяется схема данных результатов выполнения
SQL запроса
"""
block_params = {
'sql': 'select id, name from test_object'
}
block_schema = get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[TEST_OBJECT],
params=block_params
)
# Проверяем, что в схеме блока для SQL-запроса будет
# одно целочисленное поле с названием "id"
assert isinstance(block_schema, StructType)
assert len(block_schema.fields) == 2
assert block_schema.fields[0].name == 'id'
assert block_schema.fields[0].dataType == LongType()
assert block_schema.fields[1].name == 'name'
assert block_schema.fields[1].dataType == StringType()
def test_block_schema_no_sql():
"""
Проверяем, как функция получения схемы данных реагирует на то, что
параметр sql не указан
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[TEST_OBJECT],
params={
"sql": ""
},
)
assert 'не указан' in str(exc_info.value).lower() and \
'sql' in str(exc_info.value).lower()
def test_block_data():
"""
Проверяем, как возвращаются данные SQL запроса
"""
block_params = {
'sql': 'select id, name from test_object'
}
df = get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[TEST_OBJECT],
params=block_params
)
rows = df.collect()
assert len(rows) == 2
for i in range(len(rows)):
assert (rows[i]['id'], rows[i]['name']) == (TEST_OBJECT.rows[i]['id'], TEST_OBJECT.rows[i]['name'])
def test_block_data_no_sql():
"""
Проверяем, как функция получения данных реагирует на то, что
параметр sql не указан
"""
with pytest.raises(Exception) as exc_info:
get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[TEST_OBJECT],
params={
"sql": ""
},
)
assert 'не указан' in str(exc_info.value).lower() and \
'sql' in str(exc_info.value).lower()