Перейти к содержанию

Датафреймы Spark

Основной структурой данных в Spark является распределенный датафрейм (DataFrame).

Операции с датафреймами

Все перечисленные ниже операции с датафреймами являются иммутабельными, т.е. они не изменяют текущий объект, а возвращают новый. Для того, чтобы продолжить работу с датафреймом после применения операции необходимо сохранить результат выполнения в переменную.

Неправильно

df.filter(df['id'] > 10)
df.select('id', 'name)
После применения операции в первой строке датафрейф df остался прежним, а результат фильтрации по id > 10 не был никуда сохранен. Поэтому, операция во второй строке будет применена к изначальному датафрейму, а не к отфильтрованному.

Правильно

df = df.select(df['id'] > 10)
df = df.select('id', 'name')

Создание датафрейма

Создание датафрейма выполняется с помощью метода createDataFrame объекта SparkSession. При вызове этого метода необходимо передать строки датафрейма в виде списка объектов Row.

from pyspark.sql import Row

rows = [
    Row(id=1, name='Название 1'),
    Row(id=2, name='Название 2')
]

df = spark.createDataFrame(rows)

При создании датафреймов крайне желательно явно указывать схему данных будущего датафрейма. Без этого, типы столбцов будущего датафрейма будут созданы на основе типов указанных в Row значений, что в некоторых случаях не является однозначной операцией. Явное указание схемы позволит убрать возможные двусмысленности в определении типов.

Явное указание схемы выполняется через указание параметра schema метода createDataFrame.

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType(fields=[
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True)
])

rows = [
    Row(id=1, name='Название 1'),
    Row(id=2, name='Название 2')
]

df = spark.createDataFrame(rows, schema=schema)

Если уже есть датафрейм с нужной вам схемой данных, то можно использовать его схему при создании нового.

# Фрагмент etl-скрипта модели
def after_all(df, spark, app):
    """ """
    rows = [
        Row(id=1, name='Название 1'),
        Row(id=2, name='Название 2')
    ]

    return spark.createDataFrame(rows, schema=df.schema)  # (1)
  1. По смыслу функции after_all, нужно из неё вернуть датафрейм в точности той же структуры, что и df из параметров. Поэтому, мы используем df.schema, чтобы явно задать схему данных возвращаемого из функции датафрейма.

Более подробно про схему данных можно почитать здесь.

Фильтрация

Для фильтрации строк датафрейма используется функция filter (или её псевдоним where).

В функцию передается условие, которое должно выполняться для отфильтрованных столбцов

df.filter(df.age >= 30)

Если необходимо отфильтровать сразу по нескольким столбцам, то условия для столбцов должны быть соединены через & (AND) или | (OR).

df.filter((df.age >= 30) & (df.salary < 5000))

Обратите внимание

Условия для каждого из столбцов должны быть заключены в скобки. Если этого не сделать и написать df.age >= 30 & df.salary < 5000, то вы получите ошибку вида

An error occurred while calling o2248.and. Trace:
    py4j.Py4JException: Method and([class java.lang.Integer]) does not exist

Это связано с приоритетностью выполнения операций внутри Python: & имеет более высокий приоритет и попытается выполниться первым. Поэтому, самым первым действием в этом выражении будет 30 & df.salary, что является бессмысленным и вызовет вышеуказанную ошибку: "Метод and (это у нас &) для класса Integer (это у нас 30) не существует".

При записи условий фильтрации к столбцам датафрейма можно обращаться в двух формах:

  • через атрибут датафрейма: df.age
  • через индекс датафрейма df['age']
# Следующие две записи эквивалентны

df.filter(df.age > 30)
df.filter(df['age'] > 30)

Однако, если название столбца датафрейма не является корректным идентификатором Python, то форму обращения через атрибут объекта использовать не получится. Возникнет синтаксическая ошибка.

# Пусть в датафрейме есть два столбца: "1age" и "unit price"

# Ошибка! (1) 
df.filter(df.1age > 30)  

# Правильно
df.filter(df['1age'] > 30)  

# Ошибка! (2)
df.filter(df.unit price > 1000)

# Правильно
df.filter(df.['unit price'] > 1000)
  1. Идентификаторы в Python не могут начинаться с цифры.
  2. price отделен от unit пробелом, и для Python price - это отдельное имя, которое не задано в текущем контексте

Также, есть возможность указывать условия фильтрации в виде SQL выражений.

df.filter('age > 30')
df.filter('age >= 30 and salary < 5000')