Датафреймы Spark¶
Основной структурой данных в Spark является распределенный датафрейм (DataFrame).
Операции с датафреймами¶
Все перечисленные ниже операции с датафреймами являются иммутабельными, т.е. они не изменяют текущий объект, а возвращают новый. Для того, чтобы продолжить работу с датафреймом после применения операции необходимо сохранить результат выполнения в переменную.
Неправильно
После применения операции в первой строке датафрейфdf
остался прежним, а результат фильтрации по id > 10
не был никуда сохранен.
Поэтому, операция во второй строке будет применена к изначальному датафрейму, а не к отфильтрованному.
Создание датафрейма¶
Создание датафрейма выполняется с помощью метода createDataFrame
объекта SparkSession
. При вызове
этого метода необходимо передать строки датафрейма в виде списка объектов Row.
from pyspark.sql import Row
rows = [
Row(id=1, name='Название 1'),
Row(id=2, name='Название 2')
]
df = spark.createDataFrame(rows)
При создании датафреймов крайне желательно явно указывать схему данных будущего датафрейма. Без этого, типы столбцов будущего датафрейма будут созданы на основе типов указанных в Row значений, что в некоторых случаях не является однозначной операцией. Явное указание схемы позволит убрать возможные двусмысленности в определении типов.
Явное указание схемы выполняется через указание параметра schema
метода createDataFrame
.
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType(fields=[
StructField('id', IntegerType(), True),
StructField('name', StringType(), True)
])
rows = [
Row(id=1, name='Название 1'),
Row(id=2, name='Название 2')
]
df = spark.createDataFrame(rows, schema=schema)
Если уже есть датафрейм с нужной вам схемой данных, то можно использовать его схему при создании нового.
# Фрагмент etl-скрипта модели
def after_all(df, spark, app):
""" """
rows = [
Row(id=1, name='Название 1'),
Row(id=2, name='Название 2')
]
return spark.createDataFrame(rows, schema=df.schema) # (1)
- По смыслу функции after_all, нужно из неё вернуть датафрейм в точности той же структуры, что и
df
из параметров. Поэтому, мы используемdf.schema
, чтобы явно задать схему данных возвращаемого из функции датафрейма.
Более подробно про схему данных можно почитать здесь.
Фильтрация¶
Для фильтрации строк датафрейма используется функция
filter
(или её псевдоним where
).
В функцию передается условие, которое должно выполняться для отфильтрованных столбцов
Если необходимо отфильтровать сразу по нескольким столбцам, то условия для столбцов должны быть соединены через &
(AND) или |
(OR).
Обратите внимание
Условия для каждого из столбцов должны быть заключены в скобки. Если этого не сделать и написать df.age >= 30 & df.salary < 5000
,
то вы получите ошибку вида
An error occurred while calling o2248.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
Это связано с приоритетностью выполнения операций внутри Python: &
имеет более высокий приоритет и попытается выполниться первым.
Поэтому, самым первым действием в этом выражении будет 30 & df.salary
, что является бессмысленным и вызовет вышеуказанную ошибку:
"Метод and (это у нас &
) для класса Integer (это у нас 30
) не существует".
При записи условий фильтрации к столбцам датафрейма можно обращаться в двух формах:
- через атрибут датафрейма:
df.age
- через индекс датафрейма
df['age']
Однако, если название столбца датафрейма не является корректным идентификатором Python, то форму обращения через атрибут объекта использовать не получится. Возникнет синтаксическая ошибка.
# Пусть в датафрейме есть два столбца: "1age" и "unit price"
# Ошибка! (1)
df.filter(df.1age > 30)
# Правильно
df.filter(df['1age'] > 30)
# Ошибка! (2)
df.filter(df.unit price > 1000)
# Правильно
df.filter(df.['unit price'] > 1000)
- Идентификаторы в Python не могут начинаться с цифры.
- price отделен от unit пробелом, и для Python price - это отдельное имя, которое не задано в текущем контексте
Также, есть возможность указывать условия фильтрации в виде SQL выражений.