Перейти к содержанию

ETL-скрипты

В Analytic Workspace есть возможность кастомизировать процесс трансформации данных с помощью ETL-скриптов

Процесс трансформации данных

Рассмотрим процесс трансформации модели с тремя таблицами Table1, Table2 и Table3.

Перед запуском трансформации, механизмы ETL обеспечивают выгрузку данных из источников и сохранение их в виде parquet-файлов во внутреннем хранилище системы. В процессе выполнения трансформации, система загружает данные каждой из таблиц в распределенные Spark-датафреймы (DataFrame) и выполняет над ними SQL-запрос, который соединяет их в единую денормализованную аналитическую таблицу.

Схематично, в базовой версии процесс выглядит так:

stateDiagram-v2
  state fork_state <<fork>>
    [*] --> fork_state
    fork_state --> Load_Table1
    fork_state --> Load_Table2
    fork_state --> Load_Table3

    state join_state <<join>>
    Load_Table1 --> join_state
    Load_Table2 --> join_state
    Load_Table3 --> join_state
    join_state --> Join_Tables
    Join_Tables --> [*]

Теперь опишем, каким образом разработчик модели может встроится в данный процесс:

  • Можно задать некоторые инициализирующие операции, которые будут выполнены в самом начале работы процесса;
  • После загрузки данных (т.е., перед их соединением в единую таблицу) для каждого из объектов можно получить загруженный датафрейм и выполнить над ним некоторые операции.
  • После того, как система выполнила объединение датафреймов всех объектов, можно получить датафрейм для итоговой таблицы и выполнить над ним некоторые операции.

На диаграмме ниже, указанные точки расширения обозначены: before_all(), after_load_*() и after_all()

stateDiagram-v2
  [*] --> before_all()

  state fork_state <<fork>>
    before_all() --> fork_state
    fork_state --> Load_Table1
    fork_state --> Load_Table2
    fork_state --> Load_Table3

    Load_Table1 --> after_load_Table1(df_table1)
    Load_Table2 --> after_load_Table2(df_table2)
    Load_Table3 --> after_load_Table3(df_table3)

    state join_state <<join>>
    after_load_Table1(df_table1) --> join_state
    after_load_Table2(df_table2) --> join_state
    after_load_Table3(df_table3) --> join_state

    join_state --> Join_Tables
    Join_Tables --> after_all(df_final)
    after_all(df_final) --> [*]

Названия точек расширения не случайно указаны со скобками. Они могут быть заданы python-функциями, которые выполняют операции над датафреймами.

Так, загруженные в текущую Spark-сессию датафреймы таблиц df_table1, df_table2, df_table3 передаются в параметры соответствующих функций after_load_*(), а итоговый датафрейм df_final передается в параметры функции after_all(). Функция before_all(), соответствующая начальной точке расширения, не принимает никакого датафрейма (на этапе инициализации ещё нет загруженных данных), но она имеет доступ к запущенной Spark-сессии и может внести в неё необходимые изменения.

Например, перед нами стоит задача добавления в конец каждой ячейки строкового столбца name в итоговой таблице значения - ПРИМЕР. Тогда скрипт модели может выглядеть так:

from pyspark.sql.functions import concat, col, lit


def after_all(df, spark, app, *args, **kwargs):
    """ """
    df = df.withColumn("name", concat(col('name'), lit('- ПРИМЕР')))
    return df

Больше примеров вы можете найти в разделе Примеры.

Читать далее

  • Скрипт модели - описание возможностей, предоставляемых системой, по управлению процессом трансформации данных модели;
  • API - описание структур данных, доступных в процессе выполнения скрипта;
  • Интерфейс редактора - описание возможностей редактора скрипта;
  • Примеры - разные примеры, демонстрирующие возможности скрипта трансформации.