ETL-скрипты
В Analytic Workspace есть возможность кастомизировать процесс трансформации данных с помощью ETL-скриптов
Процесс трансформации данных
Рассмотрим процесс трансформации модели с тремя таблицами Table1
, Table2
и Table3
.
Перед запуском трансформации, механизмы ETL обеспечивают выгрузку данных из источников и сохранение их в виде parquet-файлов во внутреннем хранилище системы. В процессе выполнения трансформации, система загружает данные каждой из таблиц в распределенные Spark-датафреймы (DataFrame) и выполняет над ними SQL-запрос, который соединяет их в единую денормализованную аналитическую таблицу.
Схематично, в базовой версии процесс выглядит так:
stateDiagram-v2
state fork_state <<fork>>
[*] --> fork_state
fork_state --> Load_Table1
fork_state --> Load_Table2
fork_state --> Load_Table3
state join_state <<join>>
Load_Table1 --> join_state
Load_Table2 --> join_state
Load_Table3 --> join_state
join_state --> Join_Tables
Join_Tables --> [*]
Теперь опишем, каким образом разработчик модели может встроится в данный процесс:
- Можно задать некоторые инициализирующие операции, которые будут выполнены в самом начале работы процесса;
- После загрузки данных (т.е., перед их соединением в единую таблицу) для каждого из объектов можно получить загруженный датафрейм и выполнить над ним некоторые операции.
- После того, как система выполнила объединение датафреймов всех объектов, можно получить датафрейм для итоговой таблицы и выполнить над ним некоторые операции.
На диаграмме ниже, указанные точки расширения обозначены: before_all()
, after_load_*()
и after_all()
stateDiagram-v2
[*] --> before_all()
state fork_state <<fork>>
before_all() --> fork_state
fork_state --> Load_Table1
fork_state --> Load_Table2
fork_state --> Load_Table3
Load_Table1 --> after_load_Table1(df_table1)
Load_Table2 --> after_load_Table2(df_table2)
Load_Table3 --> after_load_Table3(df_table3)
state join_state <<join>>
after_load_Table1(df_table1) --> join_state
after_load_Table2(df_table2) --> join_state
after_load_Table3(df_table3) --> join_state
join_state --> Join_Tables
Join_Tables --> after_all(df_final)
after_all(df_final) --> [*]
Названия точек расширения не случайно указаны со скобками. Они могут быть заданы python-функциями, которые выполняют операции над датафреймами.
Так, загруженные в текущую Spark-сессию датафреймы таблиц df_table1
, df_table2
, df_table3
передаются в параметры соответствующих функций
after_load_*()
, а итоговый датафрейм df_final
передается в параметры функции after_all()
.
Функция before_all()
, соответствующая начальной точке расширения,
не принимает никакого датафрейма (на этапе инициализации ещё нет загруженных данных), но она имеет доступ к запущенной Spark-сессии и может внести в неё
необходимые изменения.
Например, перед нами стоит задача добавления в конец каждой ячейки строкового столбца name
в итоговой таблице значения - ПРИМЕР
. Тогда скрипт модели может выглядеть так:
from pyspark.sql.functions import concat, col, lit
def after_all(df, spark, app, *args, **kwargs):
""" """
df = df.withColumn("name", concat(col('name'), lit('- ПРИМЕР')))
return df
Больше примеров вы можете найти в разделе Примеры.
Читать далее
- Скрипт модели - описание возможностей, предоставляемых системой, по управлению процессом трансформации данных модели;
- API - описание структур данных, доступных в процессе выполнения скрипта;
- Интерфейс редактора - описание возможностей редактора скрипта;
- Примеры - разные примеры, демонстрирующие возможности скрипта трансформации.