Пример JSON блока¶
Метаданные¶
block_meta.json
{
"uid": "example_json",
"name": "JSON-блок (пример)",
"version": "1.0",
"description": "Блок позволяет преобразовать значения из JSON-поля в новые строки или столбцы. Работает со значениями полей как в виде объектов, так и массивов.",
"author": "AnalyticWorkspace",
"updated_at": "2023-11-05 11:00:00+00:00",
"params":[{
"code": "json_field",
"name": "JSON-поле",
"type": "select",
"description": "Выберите столбец из вложенного в блок объекта, в котором хранятся JSON-значения",
"required": true,
"mult": false,
"domain": "child_fields"
}, {
"code": "json_schema",
"name": "Схема JSON",
"description": "Описание структуры JSON-объекта в выбранном поле. Можете указать вручную, а можете нажать кнопку \"Заполнить схему JSON автоматически\", чтобы система попыталась вывести схему из данных вложенного в блок объекта",
"type": "text",
"required": false,
"mult": false,
"domain": "",
"extra": {
"height": 100,
"placeholder": "Нажмите кнопку \"Заполнить схему JSON автоматически\" и система попробует определить схему автоматически. Как описывать схему вручную - в описании (ссылка \"Подробнее о блоке\" выше).",
"monospaced": true
}
}, {
"code": "autofill_schema",
"name": "Заполнить схему JSON автоматически",
"type": "action",
"action": "autofill_schema"
},
{
"code": "transform_type",
"name": "Тип преобразования",
"type": "select",
"description": "Выберите тип преобразования JSON-поля. \"Значения из объекта в столбцы\" - выводит значения из JSON объекта в отдельные столбцы. \"Значения из массива в столбцы\" - выводит указанные значения из массива в столбцы. \"Значения из массива в строки\" - множит строки по значениям из массива",
"required": true,
"mult": false,
"domain": [
{"id": "object_to_columns", "name": "Атрибуты объекта -> Столбцы"},
{"id": "array_to_columns", "name": "Элементы массива -> Столбцы"},
{"id": "array_to_rows", "name": "Элементы массива -> Строки"}
],
"extra": {
"placeholder": "Выберите тип преобразования"
}
},
{
"code": "path_object_to_columns",
"name": "Путь к объекту",
"type": "string",
"description": "Укажите последовательность ключей, которые позволяют идентифицировать объект. Если вы собираетесь извлекать значения непосредственно из самого поля, то оставьте значение пустым.",
"required": false,
"mult": false,
"extra": {
"visible_if": {"transform_type": "object_to_columns"},
"placeholder": "Заполните, если объект хранится во вложенном атрибуте JSON-поля"
}
},
{
"code": "rules_object_to_columns",
"name": "Правила создания столбцов",
"type": "group",
"description": "",
"mult": true,
"extra": {
"compact": false,
"visible_if": {"transform_type": "object_to_columns"},
"columns": 2,
"empty_row": false,
"add_row_label": "столбец",
"actions": [
{"code": "autofill_attrs_object_to_columns", "name": "Заполнить автоматически", "icon": "ic_etl_block_16"}
]
},
"params": [{
"code": "attr_name",
"name": "Значение из объекта",
"type": "string",
"description":"",
"required": false,
"mult": false
}, {
"code": "column_name",
"name": "Столбец в модели",
"type": "string",
"description": "Как будет называться столбец в модели. Можно оставить пустым, и система сфомирует название столбца автоматически",
"required": false,
"mult": false
}]
},
{
"code": "path_array_to_columns",
"name": "Путь к массиву",
"type": "string",
"description": "Укажите последовательность ключей, которые позволяют идентифицировать массив. Если массив хранится в самом JSON-поле (без дополнительной вложенности), то оставьте значение пустым.",
"required": false,
"mult": false,
"extra": {
"visible_if": {"transform_type": "array_to_columns"},
"placeholder": "Заполните, если массив хранится во вложенном атрибуте JSON-поля"
}
},
{
"code": "column_value_array_to_columns",
"name": "Что указывать в значениях новых столбцов",
"type": "select",
"description": "Укажите, какие значения должны содержаться в добавляемых столбцах",
"required": false,
"mult": false,
"domain": [
{"id": "true_false", "name": "true / false"},
{"id": "one_hot", "name": "1 / 0"},
{"id": "value_null", "name": "значение / null"}
],
"extra": {
"visible_if": {"transform_type": "array_to_columns"},
"placeholder": "По умолчанию, указывается true, если значение столбца есть в массиве, и false в противном случае "
}
},
{
"code": "rules_array_to_columns",
"name": "Правила создания столбцов",
"type": "group",
"description": "Укажите, какие значения из массива необходимо перенести в столбцы",
"mult": true,
"extra": {
"compact": false,
"visible_if": {"transform_type": "array_to_columns"},
"columns": 3,
"empty_row": false,
"add_row_label": "столбец",
"actions": [
{"code": "autofill_array_to_columns", "name": "Заполнить автоматически", "icon": "ic_etl_block_16"}
]
},
"params": [{
"code": "array_value",
"name": "Значение массива",
"type": "string",
"description":"",
"required": false,
"mult": false
}, {
"code": "column_name",
"name": "Столбец в модели",
"type": "string",
"description": "Как будет называться столбец в модели. Можно оставить пустым, и система сфомирует название столбца автоматически",
"required": false,
"mult": false,
"extra": {
"placeholder": "Сформируется автоматически"
}
}]
},
{
"code": "path_array_to_rows",
"name": "Путь к массиву",
"type": "string",
"description": "Укажите последовательность ключей, которые позволяют идентифицировать массив. Если массив хранится в самом JSON-поле (без дополнительной вложенности), то оставьте значение пустым.",
"extra": {
"visible_if": {"transform_type": "array_to_rows"},
"placeholder": "Заполните, если массив хранится во вложенном атрибуте JSON-поля"
},
"required": false,
"mult": false,
"domain": null
}, {
"code": "array_to_rows_column_name",
"name": "Столбец в модели",
"type": "string",
"description": "",
"required": false,
"mult": false,
"extra": {
"visible_if": {"transform_type": "array_to_rows"},
"placeholder": "Укажите название столбца, в котором будут находиться значения массива"
}
}]
}
Код блока¶
block_code.py
from typing import Optional, Any, List, Tuple, Union
import json
import uuid
import datetime
from collections import namedtuple
from dataclasses import dataclass, field
from collections import OrderedDict
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType, StructField, DataType, StringType, IntegerType, LongType, DoubleType, BooleanType, TimestampType, ArrayType
from pyspark.sql.functions import col, from_json, explode, explode_outer, when, array_contains
from pyparsing import ParserElement, CaselessKeyword, Word, ZeroOrMore, Literal, Forward, Opt, ParseResults, MatchFirst, identchars
from aw_etl.etl_blocks import InvalidEtlBlock, ETLBlockApplication, ETLBlockActionResult
from aw_etl.models import ModelObject
@dataclass
class BlockParams:
json_field: str # название поля из модели с JSON значением
json_schema: str # схема данных JSON объекта
transform_type: str # тип трансформации
# настройки преобразования "Объект в столбцы"
path_object_to_columns: Optional[str]
rules_object_to_columns: List['RuleObjectToColumns']
# настройки преобразования "Массив в столбцы"
column_value_array_to_columns: Optional[str]
path_array_to_columns: Optional[str]
rules_array_to_columns: List['RuleArrayToColumns']
# настройки преобразования "Массив в строки"
path_array_to_rows: Optional[str]
array_to_rows_column_name: Optional[str]
@dataclass
class RuleObjectToColumns:
""" """
attr_name: str
column_name: Optional[str]
@dataclass
class RuleArrayToColumns:
""" """
array_value: str
column_name: Optional[str]
def get_block_params(params: dict) -> BlockParams:
"""
Возвращает параметры блока в структурированном виде
"""
return BlockParams(
json_field=params.get('json_field'),
json_schema=params.get('json_schema'),
transform_type=params.get('transform_type'),
# Правила для "Объект -> Столбцы"
path_object_to_columns=params.get('path_object_to_columns'),
rules_object_to_columns=[
RuleObjectToColumns(attr_name=r.get('attr_name'), column_name=r.get('column_name')) \
for r in (params.get('rules_object_to_columns') or []) if r.get('attr_name')
],
# Правила для "Массив -> Столбцы"
path_array_to_columns=params.get('path_array_to_columns'),
column_value_array_to_columns=params.get('column_value_array_to_columns') or 'true_false',
rules_array_to_columns=[
RuleArrayToColumns(array_value=r.get('array_value'),
column_name=r.get('column_name')) \
for r in (params.get('rules_array_to_columns') or []) if r.get('array_value')
],
path_array_to_rows=params.get('path_array_to_rows'),
array_to_rows_column_name=params.get('array_to_rows_column_name'),
)
def block_schema(upstream_schema: StructType,
params: dict,
upstream_dataframe: Optional[DataFrame],
model_object: ModelObject) -> StructType:
"""
Возвращает схему блока.
"""
if upstream_dataframe is None:
# В блоке нет вложенных объектов, ничего не делаем
return upstream_schema
block_params = get_block_params(params)
if not block_params.json_field:
return upstream_schema
df = upstream_dataframe
json_field = find_json_field(upstream_dataframe, block_params, model_object)
if not json_field:
raise InvalidEtlBlock(f'Столбец "{block_params.json_field}", указанный как JSON-поле, не найден во вложенных в блок объекте')
df = prepare_json_parsed_dataframe(upstream_dataframe, block_params, json_field)
if block_params.transform_type == 'object_to_columns':
target_object_path = json_field.name
if block_params.path_object_to_columns:
target_object_path += f'.{block_params.path_object_to_columns}'
target_object_path += '.*'
df = df.select(col(target_object_path))
schema = StructType(fields=[])
for column_rule in block_params.rules_object_to_columns:
field = next((f for f in df.schema.fields if f.name == column_rule.attr_name), None)
schema.fields.append(StructField(
name=column_rule.column_name or model_column_name(json_field.name, column_rule.attr_name),
dataType=field.dataType if field else StringType(),
nullable=True
))
return StructType(
fields=upstream_schema.fields + schema.fields)
elif block_params.transform_type == 'array_to_columns':
target_array_path = json_field.name
if block_params.path_array_to_columns:
target_array_path += f'.{block_params.path_array_to_columns}'
df = df.select(col(target_array_path))
schema = StructType(fields=[])
column_type = None
if block_params.column_value_array_to_columns == 'one_hot':
column_type = IntegerType()
elif block_params.column_value_array_to_columns == 'true_false':
column_type = BooleanType()
elif block_params.column_value_array_to_columns == 'value_null':
column_type = df.schema.fields[0].dataType.elementType
else:
raise InvalidEtlBlock(f'Неизвестное значение для типа новых столбцов: {block_params.column_value_array_to_columns}')
for column_rule in block_params.rules_array_to_columns:
schema.fields.append(StructField(
name=column_rule.column_name or model_column_name(json_field.name, column_rule.array_value),
dataType=column_type,
nullable=True
))
return StructType(
fields=upstream_schema.fields + schema.fields)
elif block_params.transform_type == 'array_to_rows':
# значения из массива в строки
target_array_path = json_field.name
if block_params.path_array_to_rows:
target_array_path += f'.{block_params.path_array_to_rows}'
df = df.select(explode(col(target_array_path)))
return StructType(fields=upstream_schema.fields + [StructField(
name=block_params.array_to_rows_column_name or f'{json_field.name}__expl',
dataType=df.schema.fields[0].dataType,
nullable=True
)])
return upstream_schema
def block_data(app: ETLBlockApplication,
upstream_dataframe: DataFrame,
params: dict,
model_object: ModelObject) -> DataFrame:
"""
Возвращает данные блока.
"""
if upstream_dataframe is None:
return upstream_dataframe
block_params = get_block_params(params)
if not block_params.json_field:
return upstream_dataframe
json_field = find_json_field(upstream_dataframe, block_params, model_object)
if not json_field:
raise InvalidEtlBlock(f'Столбец "{block_params.json_field}", указанный как JSON-поле, не найден во вложенных в блок объекте')
df = prepare_json_parsed_dataframe(upstream_dataframe, block_params, json_field)
if block_params.transform_type == 'object_to_columns':
target_object_path = json_field.name
if block_params.path_object_to_columns:
target_object_path += f'.{block_params.path_object_to_columns}'
cols = []
for column_rule in block_params.rules_object_to_columns:
column_name = column_rule.column_name or model_column_name(json_field.name, column_rule.attr_name)
cols.append(col(f'{target_object_path}.{column_rule.attr_name}').alias(column_name))
df = df.select(col('*'), *cols)
elif block_params.transform_type == 'array_to_columns':
target_array_path = json_field.name
if block_params.path_array_to_columns:
target_array_path += f'.{block_params.path_array_to_columns}'
for column_rule in block_params.rules_array_to_columns:
if block_params.column_value_array_to_columns == 'one_hot':
in_value, out_value = 1, 0
elif block_params.column_value_array_to_columns == 'true_false':
in_value, out_value = True, False
elif block_params.column_value_array_to_columns == 'value_null':
in_value, out_value = column_rule.array_value, None
else:
raise InvalidEtlBlock(f'Неизвестное значение для типа новых столбцов: {block_params.column_value_array_to_columns}')
df = df.withColumn(
column_rule.column_name or model_column_name(json_field.name, column_rule.array_value),
when(array_contains(col(target_array_path), column_rule.array_value), in_value).otherwise(out_value)
)
elif block_params.transform_type == 'array_to_rows':
target_array_path = json_field.name
if block_params.path_array_to_rows:
target_array_path += f'.{block_params.path_array_to_rows}'
explode_column_name = block_params.array_to_rows_column_name or f'{json_field.name}__expl'
df = df.select(col('*'), explode(col(target_array_path)).alias(explode_column_name))
return df
# -------------------------------------------------------------------------------------------------------------
# Действия блока
# -------------------------------------------------------------------------------------------------------------
def block_action_autofill_schema(upstream_dataframe: DataFrame,
params: dict,
model_object: ModelObject) -> Optional[ETLBlockActionResult]:
"""
Экшен блока: "Заполнить схему JSON автоматически"
Анализирует данные в указанном поле и пытается вывести схему JSON объекта автоматически
"""
block_params = get_block_params(params)
if not block_params.json_field or not upstream_dataframe:
return None
json_field = find_json_field(upstream_dataframe, block_params, model_object)
if not json_field:
raise InvalidEtlBlock(f'Столбец "{block_params.json_field}", указанный как JSON-поле, не найден во вложенных в блок объекте')
# собираем данные для автоматического инференса схемы JSON-поля
json_schema = None
jsons = []
df = upstream_dataframe.select(col(json_field.name))
for row in df.limit(100).collect():
if row[json_field.name]:
try:
json_obj = json.loads(row[json_field.name])
except (TypeError, ValueError):
# Не удалось привести значения поля к JSON, просто пропускаем
continue
jsons.append(json_obj)
if jsons:
json_schema = infer_json_def(jsons=jsons, max_lvl=2)
return ETLBlockActionResult(params_patch={'json_schema': json_schema}) if json_schema else None
def block_action_autofill_attrs_object_to_columns(upstream_dataframe: DataFrame,
params: dict,
model_object: ModelObject) -> Optional[ETLBlockActionResult]:
"""
Автоматически заполняет список атрибутов объекта
"""
block_params = get_block_params(params)
if not block_params.json_field or not upstream_dataframe:
# Не заданы параметры, либо нет вложенных объектов - ничего не делаем.
return None
json_field = find_json_field(upstream_dataframe, block_params, model_object)
if not json_field:
raise InvalidEtlBlock(f'Столбец "{block_params.json_field}", указанный как JSON-поле, не найден во вложенных в блок объекте')
df = prepare_json_parsed_dataframe(upstream_dataframe, block_params, json_field)
target_object_path = json_field.name
if block_params.path_object_to_columns:
target_object_path += f'.{block_params.path_object_to_columns}'
target_object_path += '.*'
df = df.select(col(target_object_path))
params_patch = {
'rules_object_to_columns': [{'attr_name': f.name, 'column_name': None} for f in df.schema.fields]
}
return ETLBlockActionResult(
params_patch=params_patch
)
def block_action_autofill_array_to_columns(upstream_dataframe: DataFrame,
params: dict,
model_object: ModelObject) -> Optional[ETLBlockActionResult]:
""" Действие по автоматическому заполнению списка значений массива, которые нужно перевести в столбцы """
block_params = get_block_params(params)
if not block_params.json_field or not upstream_dataframe:
# Не заданы параметры, либо нет вложенных объектов - ничего не делаем.
return None
json_field = find_json_field(upstream_dataframe, block_params, model_object)
if not json_field:
raise InvalidEtlBlock(f'Столбец "{block_params.json_field}", указанный как JSON-поле, не найден во вложенных в блок объекте')
df = prepare_json_parsed_dataframe(upstream_dataframe, block_params, json_field)
target_array_path = json_field.name
if block_params.path_array_to_columns:
target_array_path += f'.{block_params.path_array_to_columns}'
df = df.select(col(target_array_path).alias('arr'))
if not isinstance(df.schema.fields[0].dataType, ArrayType):
raise InvalidEtlBlock(f'Элемент "{block_params.path_array_to_columns}" не является массивом')
array_values = set()
for row in df.limit(1000).collect():
if 'arr' in row and row['arr']:
if isinstance(row['arr'], list) and row['arr']:
if isinstance(row['arr'][0], (list, dict, Row)):
raise InvalidEtlBlock('Преобразование "Элементы массива -> Столбцы" возможно только для элементов массива с простыми типами (строки, числа и т.п.)')
array_values.update(set(row['arr']))
params_patch = {
'rules_array_to_columns': [{'array_value': v, 'column_name': None, 'column_value': None} for v in array_values]
}
return ETLBlockActionResult(
params_patch=params_patch
)
# --------------------------------------------------------------------------------------------------------
# Обработка JSON поля
# --------------------------------------------------------------------------------------------------------
def find_json_field(df: DataFrame, block_params: BlockParams, model_object: ModelObject) -> Optional[StructField]:
""" Возвращает поле схемы вложенного датафрейма, соответствующее названию json-поля из настроек пользователя """
json_field = next((f for f in df.schema.fields if f.name == block_params.json_field), None)
if not json_field:
child_model_object = model_object.childs[0] if model_object.childs else None
if child_model_object:
model_object_field = next((f for f in child_model_object.fields if f.name == block_params.json_field), None)
if model_object_field:
json_field = next((f for f in df.schema.fields if f.name == model_object_field.model_name), None)
return json_field
def prepare_json_parsed_dataframe(df: DataFrame, block_params: BlockParams, json_field: StructField) -> DataFrame:
"""
Возвращает датафрейм с подготовленным для дальнейших операций полем JSON
"""
if not isinstance(json_field.dataType, (StructType, ArrayType)):
if not block_params.json_schema:
raise InvalidEtlBlock('Не указана схема JSON-объекта')
try:
jsondef = parse_jsondef(block_params.json_schema)
json_schema = jsondef.as_spark()
except Exception as e:
raise InvalidEtlBlock(f'JSON схема содержит ошибки: {str(e)[:100]}')
df = df.withColumn(json_field.name, from_json(col=json_field.name, schema=json_schema))
return df
# --------------------------------------------------------------------------------------------------------
# Автоматическое построение схемы
# --------------------------------------------------------------------------------------------------------
def infer_json_def(jsons: List[Union[list, dict]], max_lvl: int = 2):
""" Автоматическое распознавание схемы """
result = ''
element = next((item for item in jsons if item is not None), None)
if isinstance(element, list):
result = infer_json_def_array(element, 1, max_lvl)
elif isinstance(element, dict):
result = infer_json_def_object(jsons, 1, max_lvl)
return result
def infer_json_def_array(jsons: List[list], lvl: int, max_lvl: int = 2):
""" """
element_type = ''
indent = ' ' * lvl
element = next((item for item in jsons if item is not None), None) # первое непустое значение массива
if element is not None:
element_type = infer_simple_element_type(element)
element_type = (indent + ' ' + element_type) if element_type else None
if not element_type: # тип элемента какой-то составной
if isinstance(element, list):
element_type = infer_json_def_array(element, lvl + 1, max_lvl) if lvl < max_lvl else '[]'
elif isinstance(element, dict):
if lvl <= max_lvl:
element_type = infer_json_def_object([o for o in jsons[:10] if o and isinstance(o, dict)], lvl + 1, max_lvl)
else:
element_type = '{}' # достигнут макс. уровень погружения
return f'[\n{element_type}\n]' if element_type else '[]'
def infer_json_def_object(jsons: List[dict], lvl, max_lvl: int = 2):
""" """
def wrap_attr_name(attr_name: str) -> str:
attr_name = attr_name.strip()
return f'\"{attr_name}\"' if ' ' in attr_name else f"{attr_name}"
attrs = OrderedDict()
for obj in [o for o in jsons if o and isinstance(o, dict)]:
for attr_name, attr_value in obj.items():
if attr_name not in attrs or attrs[attr_name][1] in ('{}', '[]'):
element_type = infer_simple_element_type(attr_value)
if not element_type:
if isinstance(attr_value, list):
element_type = infer_json_def_array(attr_value, lvl + 1, max_lvl) if lvl < max_lvl else '[]'
elif isinstance(attr_value, dict):
if lvl <= max_lvl:
element_type = infer_json_def_object([o.get(attr_name) for o in jsons[:10] if o and o.get(attr_name)], lvl + 1, max_lvl)
else:
element_type = '{}'
if element_type:
attrs[attr_name] = type(attr_value), element_type
if attrs:
attrs_inferred = ',\n'.join([f'{wrap_attr_name(attr_name)}: {attr_type[1]}' for attr_name, attr_type in attrs.items()])
return f'{{\n{" " * lvl + wrap_indent(attrs_inferred)}\n}}'
return '{}'
def infer_simple_element_type(element: Any) -> Optional[str]:
""" Попытка распознавания типа значений. Если не получается, возвращается None """
element_type = None
if isinstance(element, bool):
element_type = 'логическое'
elif isinstance(element, int):
element_type = 'целое'
elif isinstance(element, float):
element_type = 'дробное'
elif isinstance(element, str):
try:
datetime.datetime.fromisoformat(element)
element_type = 'датавремя'
except:
element_type = 'строка'
return element_type
def wrap_indent(jsondef: str) -> str:
return jsondef.replace('\n', '\n ')
def unique_field_name(schema: StructType, field_name: str) -> str:
""" """
if next((f for f in schema.fields if f.name == field_name), None) is not None:
field_name = f'{field_name}_{str(uuid.uuid4()[:4])}'
return field_name
# --------------------------------------------------------------------------------------------------------
# Вспомогательные функции
# --------------------------------------------------------------------------------------------------------
def value_type(value: Any) -> DataType:
"""
Возвращает спарковский тип для значения value.
"""
if isinstance(value, bool):
return BooleanType()
elif isinstance(value, int):
return LongType()
elif isinstance(value, float):
return DoubleType()
return StringType()
def aw_type_to_spark(type_name: str) -> DataType:
"""
Возвращает Spark-тип на основе AW-типа.
"""
if type_name == 'number':
return LongType()
elif type_name == 'float':
return DoubleType()
elif type_name == 'bool':
return BooleanType()
elif type_name == 'datetime':
return TimestampType()
return StringType()
def model_column_name(json_field_name: str, json_attr_name: str) -> str:
"""
Возвращает автоматически генерируемое название столбца
"""
return f'{json_field_name}__{json_attr_name}'
# --------------------------------------------------------------------------------------------------
# Грамматика для описания JSON-схемы
# --------------------------------------------------------------------------------------------------
@dataclass
class SimpleTypeNode:
def as_spark(self) -> str:
return self.spark_name
@dataclass
class StringTypeNode(SimpleTypeNode):
spark_name: str = 'string'
keywords = ['строка', 'текст', 'string', 'text', 'str']
@dataclass
class IntegerTypeNode(SimpleTypeNode):
spark_name: str = 'long'
keywords = ['целое', 'число', 'integer', 'int', 'long', 'number']
@dataclass
class FloatTypeNode(SimpleTypeNode):
spark_name: str = 'double'
keywords = ['дробное', 'float', 'double']
@dataclass
class BooleanTypeNode(SimpleTypeNode):
""" """
spark_name: str = 'boolean'
keywords = ['логическое', 'boolean', 'bool']
@dataclass
class DatetimeTypeNode(SimpleTypeNode):
spark_name: str = 'timestamp'
format: str = ""
keywords = ['датавремя', 'датаивремя', 'дата', 'datetime', 'date', 'timestamp']
@dataclass
class ArrayTypeNode:
element_type: Union['SimpleTypeNode', 'ObjectTypeNode', 'ArrayTypeNode']
spark_name: str = 'array'
keywords = ['список', 'массив', 'list', 'array']
def as_spark(self):
if not self.element_type:
return f'array<struct<>>'
#raise Exception('Не указан тип элемента массива')
return f'array<{self.element_type.as_spark()}>'
@dataclass
class ObjectTypeNode:
fields: List['ObjectFieldNode'] = field(default_factory=list)
keywords = ['объект', 'object', 'obj']
def as_spark(self):
return 'struct<' + ', '.join([f.as_spark() for f in self.fields]) + '>'
@dataclass
class ObjectFieldNode:
name: str
type: Union['SimpleTypeNode', 'ObjectTypeNode', 'ArrayTypeNode']
def as_spark(self):
return f"`{self.name}` {self.type.as_spark()}"
def get_jsondef_grammar() -> ParserElement:
"""
Возвращает грамматику для разбора определений схемы jsondef
"""
string_type = MatchFirst(
(CaselessKeyword(k) for k in StringTypeNode.keywords)).set_parse_action(lambda t: StringTypeNode())
integer_type = MatchFirst(
(CaselessKeyword(k) for k in IntegerTypeNode.keywords)).set_parse_action(lambda t: IntegerTypeNode())
float_type = MatchFirst(
(CaselessKeyword(k) for k in FloatTypeNode.keywords)).set_parse_action(lambda t: FloatTypeNode())
boolean_type = MatchFirst(
(CaselessKeyword(k) for k in BooleanTypeNode.keywords)).set_parse_action(lambda t: BooleanTypeNode())
datetime_type = MatchFirst(
(CaselessKeyword(k) for k in DatetimeTypeNode.keywords)).set_parse_action(lambda t: DatetimeTypeNode())
simple_type = string_type | integer_type | float_type | boolean_type | datetime_type
array_type_keyword = MatchFirst(
(CaselessKeyword(k) for k in ArrayTypeNode.keywords))
object_type_keyword = MatchFirst((CaselessKeyword(k) for k in ObjectTypeNode.keywords))
any_keyword = simple_type | array_type_keyword | object_type_keyword
letters = identchars + 'АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯабвгдеёжзийклмнопрстуфхцчшщъыьэюя'
column_name = (~any_keyword + Word(letters, '1234567890' + letters + '_')) | \
(Opt(Literal('"')).suppress() + Word(letters, '1234567890' + letters + '_ .- ') + Opt(Literal('"')).suppress())
array_type = Forward()
object_type = Forward()
any_type = simple_type | array_type | object_type
array_type <<= (
(array_type_keyword.suppress() + Literal('(').suppress() + Opt(any_type) + Literal(')').suppress()) |
(array_type_keyword.suppress() + Literal('<').suppress() + Opt(any_type) + Literal('>').suppress()) |
(Literal('[').suppress() + Opt(any_type) + Literal(']').suppress())
).set_parse_action(lambda t: [ArrayTypeNode(element_type=t[0] if t else None)])
object_field = (
column_name + Opt(Literal(':')).suppress() + any_type + Opt(Literal(',')).suppress()
).set_parse_action(lambda t: [ObjectFieldNode(name=t[0], type=t[1])])
object_type <<= (
(object_type_keyword.suppress() + Literal('(').suppress() + ZeroOrMore(object_field) + Literal(')').suppress()) |
(object_type_keyword.suppress() + Literal('<').suppress() + ZeroOrMore(object_field) + Literal('>').suppress()) |
(Literal('{').suppress() + ZeroOrMore(object_field) + Literal('}').suppress())
).set_parse_action(lambda t: [ObjectTypeNode(fields=t)])
return object_type | array_type
def parse_jsondef(jsondef: str) -> Union[ObjectTypeNode, ArrayTypeNode]:
""" Возвращает рутовую ноду для jsondef выражения """
return get_jsondef_grammar().parse_string(jsondef)[0]
Тесты¶
test_example_json.py
import datetime
from pathlib import Path
from pyspark.sql.types import (
LongType,
StringType,
BooleanType,
)
from aw_client.etl_blocks import (
get_etl_block_meta,
get_etl_block_data,
get_etl_block_schema,
run_block_action,
ModelObjectTestData,
)
# ---------------------------------------------------------
# Тестовые данные
# ---------------------------------------------------------
JSON_OBJECT_DATA = ModelObjectTestData( # (1)!
model_name='json_object',
rows=[
{'id': 1, 'json_object': '{"x": 1, "y": "Название 1"}'},
{'id': 2, 'json_object': '{"x": 2, "y": "Название 2"}'},
{'id': 3, 'json_object': '{"x": 2, "z": true}'}
],
schema=[
{'model_name': 'id', 'simple_type': 'number'},
{'model_name': 'json_object', 'simple_type': 'string'}
]
)
JSON_ARRAY_DATA = ModelObjectTestData( # (2)!
model_name='json_array',
rows=[
{'id': 1, 'json_array': '["tag1", "tag2", "tag3"]'},
{'id': 2, 'json_array': '["tag1", "tag2"]'},
{'id': 3, 'json_array': "['tag1', 'tag3']"},
]
)
# ---------------------------------------------------------
# Тест на проверку метаданных
# ---------------------------------------------------------
def test_block_meta():
"""
Тест на получение метаданных блока. Позволяет проверить,
что block_meta.json не содержит ошибок
"""
get_etl_block_meta(block_path=Path(__file__).parent)
# ---------------------------------------------------------
# Тесты на "Атрибуты объекта -> Столбцы"
# ---------------------------------------------------------
def test_object_to_columns_infer_json_schema():
"""
Проверяем, как определяется схема данных для JSON данных
"""
block_params = {
'json_field': 'json_object'
}
action_result = run_block_action(
block_path=Path(__file__).parent,
action_name='autofill_schema', # (3)!
params=block_params,
test_data=[JSON_OBJECT_DATA]
)
json_schema = action_result.params_patch.get('json_schema') # (4)!
assert json_schema is not None
assert json_schema.replace('\n', '').replace(' ', '') == \
'{x:целое,y:строка,z:логическое}'
def test_object_to_columns_schema():
"""
Проверяем, как происходит преобразование JSON объекта в
столбцы таблицы
"""
block_params = {
'json_field': 'json_object',
'json_schema': '{x: целое, y: строка, z: логическое}',
'transform_type': 'object_to_columns',
'rules_object_to_columns': [
{'attr_name': 'x', 'column_name': 'column_1'},
{'attr_name': 'y', 'column_name': 'column_2'},
{'attr_name': 'z', 'column_name': 'column_3'}
]
}
block_schema = get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[JSON_OBJECT_DATA],
params=block_params
)
assert len(block_schema.fields) == 5
assert block_schema.fields[0].name == 'id'
assert block_schema.fields[0].dataType == LongType()
assert block_schema.fields[1].name == 'json_object'
assert block_schema.fields[1].dataType == StringType()
assert block_schema.fields[2].name == 'column_1'
assert block_schema.fields[2].dataType == LongType()
assert block_schema.fields[3].name == 'column_2'
assert block_schema.fields[3].dataType == StringType()
assert block_schema.fields[4].name == 'column_3'
assert block_schema.fields[4].dataType == BooleanType()
def test_object_to_columns_data():
"""
Проверяем, как происходит преобразование JSON объекта в столбцы таблицы
"""
block_params = {
'json_field': 'json_object',
'json_schema': '{x: целое, y: строка, z: логическое}',
'transform_type': 'object_to_columns',
'rules_object_to_columns': [
{'attr_name': 'x', 'column_name': 'column_1'},
{'attr_name': 'y', 'column_name': 'column_2'},
{'attr_name': 'z', 'column_name': 'column_3'}
]
}
df = get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[JSON_OBJECT_DATA],
params=block_params
)
rows = df.collect()
assert len(rows) == 3
assert rows[0]['column_2'] == 'Название 1'
assert rows[1]['column_2'] == 'Название 2'
assert rows[2]['column_2'] == None
# ---------------------------------------------------------
# Тесты на "Элементы массива -> Столбцы"
# ---------------------------------------------------------
def test_array_to_columns_infer_json_schema():
"""
Тест на получение внутренней JSON схемы при преобразовании
JSON массива в столбцы модели
"""
block_params = {
'json_field': 'json_array'
}
action_result = run_block_action(
block_path=Path(__file__).parent,
action_name='autofill_schema',
params=block_params,
test_data=[JSON_ARRAY_DATA]
)
json_schema = action_result.params_patch.get('json_schema')
assert json_schema is not None
assert json_schema.replace('\n', '').replace(' ', '') == '[строка]'
def test_array_to_columns_schema():
"""
Тест на получение схемы данных при преобразовании
JSON массива в столбцы
"""
block_params = {
'json_field': 'json_array',
'json_schema': '[строка]',
'transform_type': 'array_to_columns',
'column_value_array_to_columns': 'true_false',
'rules_array_to_columns': [
{'array_value': 'tag1', 'column_name': 'column_tag1'},
{'array_value': 'tag2', 'column_name': 'column_tag2'},
{'array_value': 'tag3', 'column_name': 'column_tag3'}
]
}
block_schema = get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[JSON_ARRAY_DATA],
params=block_params
)
assert len(block_schema.fields) == 5
assert block_schema.fields[0].name == 'id'
assert block_schema.fields[0].dataType == LongType()
assert block_schema.fields[1].name == 'json_array'
assert block_schema.fields[1].dataType == StringType()
assert block_schema.fields[2].name == 'column_tag1'
assert block_schema.fields[2].dataType == BooleanType()
assert block_schema.fields[3].name == 'column_tag2'
assert block_schema.fields[3].dataType == BooleanType()
assert block_schema.fields[4].name == 'column_tag3'
assert block_schema.fields[4].dataType == BooleanType()
def test_array_to_columns_data():
"""
Тест на получение схемы данных при преобразовании
JSON массива в столбцы
"""
block_params = {
'json_field': 'json_array',
'json_schema': '[строка]',
'transform_type': 'array_to_columns',
'column_value_array_to_columns': 'true_false',
'rules_array_to_columns': [
{'array_value': 'tag1', 'column_name': 'column_tag1'},
{'array_value': 'tag2', 'column_name': 'column_tag2'},
{'array_value': 'tag3', 'column_name': 'column_tag3'}
]
}
df = get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[JSON_ARRAY_DATA],
params=block_params
)
rows = df.collect()
assert len(rows) == 3
assert rows[0]['column_tag1']
assert rows[0]['column_tag2']
assert rows[0]['column_tag3']
assert rows[1]['column_tag1']
assert rows[1]['column_tag2']
assert not rows[1]['column_tag3']
assert rows[2]['column_tag1']
assert not rows[2]['column_tag2']
assert rows[2]['column_tag3']
# ---------------------------------------------------------
# Тесты на "Элементы массива -> Строки"
# ---------------------------------------------------------
def test_array_to_rows_schema():
"""
Тест на получение схемы данных при преобразовании
JSON массива в строки
"""
block_params = {
'json_field': 'json_array',
'json_schema': '[строка]',
'transform_type': 'array_to_rows',
'array_to_rows_column_name': 'tag'
}
block_schema = get_etl_block_schema(
block_path=Path(__file__).parent,
test_data=[JSON_ARRAY_DATA],
params=block_params
)
assert len(block_schema.fields) == 3
assert block_schema.fields[0].name == 'id'
assert block_schema.fields[0].dataType == LongType()
assert block_schema.fields[1].name == 'json_array'
assert block_schema.fields[1].dataType == StringType()
assert block_schema.fields[2].name == 'tag'
assert block_schema.fields[2].dataType == StringType()
def test_array_to_rows_data():
"""
Тест на получение схемы данных при преобразовании
JSON массива в строки
"""
block_params = {
'json_field': 'json_array',
'json_schema': '[строка]',
'transform_type': 'array_to_rows',
'array_to_rows_column_name': 'tag'
}
df = get_etl_block_data(
block_path=Path(__file__).parent,
test_data=[JSON_ARRAY_DATA],
params=block_params
)
rows = df.collect()
assert len(rows) == 7
assert (rows[0]['id'], rows[0]['tag']) == (1, 'tag1')
assert (rows[1]['id'], rows[1]['tag']) == (1, 'tag2')
assert (rows[2]['id'], rows[2]['tag']) == (1, 'tag3')
assert (rows[3]['id'], rows[3]['tag']) == (2, 'tag1')
assert (rows[4]['id'], rows[4]['tag']) == (2, 'tag2')
assert (rows[5]['id'], rows[5]['tag']) == (3, 'tag1')
assert (rows[6]['id'], rows[6]['tag']) == (3, 'tag3')
-
Эти тестовые данные будут использоваться для проверки того, как блок обрабатывает поле с json-объектом
-
Эти тестовые данные будут использоваться для проверки того, как блок обрабатывает поле с json-массивом
-
Действие, которое срабатывает при нажатии на кнопку "Заполнить схему JSON автоматически"
-
Действие возвращает объект типа EtlBlockActionResult, в атрибуте
params_patch
будет указано, какие значения должны быть выставлены для параметраjson_schema
.
Документация¶
README.md
# Блок «JSON-блок»
<div class="divider" style="width:25%; margin:30px 0;"></div>
## Описание
JSON-блок позволяет преобразовать значения, хранящееся как JSON-поле, в новые столбцы или строки модели.
Порядок работы с блоком:
1. Выбрать поле, в котором хранятся JSON-значения;
2. Определить схему хранения данных в JSON-объектах. Это можно сделать автоматически, а затем подправить
предложенную системой схему;
3. Выбрать тип трансформации: извлечь атрибуты из JSON объекта в столбцы модели, либо преобразовать значения
массив в новые столбцы или строки.
4. Если для трансформации выбран какой-либо внутренний (вложенный внутрь JSON-значения) объект или массив,
то указать путь к нему;
5. В зависимости от выбранного типа трансформации выбрать правила создания новых столбцов или строк.
<div class="divider" style="width:25%; margin:30px 0;"></div>
### Схема JSON
Для корректной обработки JSON-поля необходимо явно задать схему хранения данных в JSON-поле.
Именно в соответствии с данной схемой происходит анализ и разбор JSON-объектов.
Схема JSON может быть построена автоматически на основе первых 100 строк из вложенного объекта.
Для этого, нажмите кнопку "Заполнить схему JSON автоматически". При необходимости, дополните
автоматически полученную схему.
Правила задания схемы:
* `[]` - используется для указания массива;
* `{}` - используется для указания объекта;
* Типы значений: `целое`, `дробное`, `датавремя`, `логическое`, `строка`;
* `[целое]` - описывает массив из целых значений. Вместо `целое` можно указывать любой другой тип значения;
* `{a: целое, b: строка}` - описывает объект с двумя атрибутыми и указанными типами значений;
* Если имя атрибута содержит пробелы, то его нужно заключить в двойные кавычки: `"имя с пробелом": целое`;
* Объекты и массивы могут вкладываться друг в друга:
* `{"a": {"b":1}}` - `{a: {b: целое}}`
* `{"a": [1, 2]}` - `{a: [целое]}`
* `[{"a": 1}, {"b": "привет"}]` - `[{a: целое, b: строка}]`.
Примеры:
|JSON значение|Схема JSON|
| --- | --- |
|`{"a": 1, "b": "строка", "c": "2023-01-01", "d": false}`|`{a: целое, b: строка, c: датавремя, d: логическое}`|
|`[1.1, 2.2, 3.3]`|`[дробное]`|
|`["a", "b", "c"]`|`[строка]`|
|`["a": 1.1, "b": [1, 2, 3], "c": {"x": 1}]`|`{a: дробное, b: [целое], c: {x: целое}}`|
|`{"название с пробелами": 1}`|`{"название с пробелами": целое}`|
<div class="divider" style="width:25%; margin:30px 0;"></div>
### Виды преобразования
Есть три вида преобразований, которые могут быть реализованы в JSON-блоке:
Атрибуты объекта -> Столбцы
: Извлекает значения из атрибутов объектов в JSON-поле и помещает их в отдельные столбцы модели.
Элементы массива -> Столбцы
: Извлекает значения из массива и создает для каждого такого значения отдельный столбец модели.
В значениях новых столбцов указывается true/false (или 1/0, или значение/null) в зависимости
от того, присутствует ли указанное значение в массиве текущей строки.
Элементы массива -> Строки
: Множит текущую строки столько раз, сколько есть элементов в массиве. Вместе с этим, в модель
добавляется отдельная строка, в которой указывается значение массива, для которого была
размножена текущая строка.
Ниже приведены примеры этих преобразований.
**Пример «Атрибуты объекта -> Столбцы»**
До преобразования:
|JSON-объект|
| --- |
|`{"Дата": "2023-01-01", "Магазин": "001", "Продажи": 15.4}`|
|`{"Дата": "2023-01-02", "Магазин": "002"}`|
После преобразования:
|JSON-объект|Дата|Магазин|Продажи|
| --- | --- | --- | --- |
|`{"Дата": "2023-01-01", "Магазин": "001", "Продажи": 15.4}`| 01.01.2023 | 001 | 15.4 |
|`{"Дата": "2023-01-02", "Магазин": "001"}`| 02.01.2023 | 002 | null |
**Пример «Элементы массива -> Столбцы»**
До преобразования:
|JSON-массив|
| --- |
|`["Pork", "Beef", "Shellfish"]`|
|`["Beef", "Lamb"]`|
После преобразования:
|JSON-массив|Pork|Beef|Shellfish|Lamb|
| --- | --- | --- | --- | --- |
|`["Pork", "Beef", "Shellfish"]`| true | true | true | false |
|`["Beef", "Lamb"]`| false | true | false | true |
**Пример «Элементы массива -> Строки»**
До преобразования:
|ID|JSON-массив|
| --- | --- |
|1|`["Pork", "Beef", "Shellfish"]`|
|2|`["Beef", "Lamb"]`|
После преобразования
|ID|JSON-массив|Значение|
| --- | --- | -- |
|1|`["Pork", "Beef", "Shellfish"]`|Pork|
|1|`["Pork", "Beef", "Shellfish"]`|Beef|
|1|`["Pork", "Beef", "Shellfish"]`|Shellfish|
|2|`["Beef", "Lamb"]`|Beef|
|2|`["Beef", "Lamb"]`|Lamb|
<div class="divider" style="width:25%; margin:30px 0;"></div>
## Параметры блока
### Общие параметры
JSON-поле
: Указывается поле вложенного в блок объекта с JSON-значениями.
Схема JSON
: Схема, в соответствие с которой внутри JSON-значений хранятся данные. Правила заполнения приведены <a href="#json">выше</a>.
Если используется каскад JSON-блоков (когда схема JSON уже была применена во вложенном JSON-блоке), то поле можно не
заполнять.
Заполнить схему JSON автоматически
: В большинстве случаев, схему JSON можно получить автоматически, анализируя данные в первых 100 записях вложенного
объекта. Нажмите данную кнопку, чтобы сделать это.
<div class="divider" style="width:25%; margin:30px 0;"></div>
### Объект в столбцы
Путь к объекту
: Если объект, атрибуты которого нужно выгрузить в столбцы, находится не на самом верхнем уровне JSON-поля, то
необходимо указать последовательность атрибутов, как добраться до этого вложенного объекта. Например,
если в JSON-поле хранится объект `{"a": {"b": {"c": {"d": 1}}}}` и нужно преобразовать самый вложенный
объект, то в данном поле указывается значение `a.b.c`
Правила создания столбцов
: Здесь указываются атрибуты объекта, которые нужно перенести в столбцы (поле "Значение из объекта") и
как будет называться этот столбец (поле "Столбец модели"). Название нового столбца можно не заполнять,
система сама присвоит ему имя вида "НазваниеJSONполя__АтрибутОбъекта"
Правила создания столбцов можно заполнить автоматически. Для этого есть кнопка "Заполнить автоматически" под последним правилом.
Система просмотрит первые 100 записей из вложенной в блок таблица и сама добавит все доступные атрибуты в список правил.
<div class="divider" style="width:25%; margin:30px 0;"></div>
### Массив в столбцы
Путь к массиву
: Если массив, элементы которого нужно выгрузить в столбцы, находится не на самом верхнем уровне JSON-поля, то
необходимо указать последовательность атрибутов, как добраться до этого вложенного массива. Например,
если в JSON-поле хранится объект `{"a": {"b": {"c": [1, 2, 3]}}}` и нужно преобразовать самый вложенный
массив, то в данном поле указывается значение `a.b.c`
Что указывать в значениях новых столбцов
: Нужно указать, что должно находиться в добавляемых столбцах, когда значение присутствует / отсутствует в массиве
для текущей строки. Варианты: `true` / `false`; `1` / `0`; само значение / `None`.
Правила создания столбцов
: Здесь указываются значения из массива, для которых нужно создавать новые столбцы в модели. Названия столбцов
(поле "Столбец модели") можно не заполнять, система сама присвоит ему имя вида "НазваниеJSONполя__ЗначениеИзМассива"
<div class="divider" style="width:25%; margin:30px 0;"></div>
### Массив в строки
Путь к массиву
: Если массив, по элементам которого нужно размножить строки, находится не на самом верхнем уровне JSON-поля, то
необходимо указать последовательность атрибутов, как добраться до этого вложенного массива. Например,
если в JSON-поле хранится объект `{"a": {"b": {"c": [1, 2, 3]}}}` и нужно преобразовать самый вложенный
массив, то в данном поле указывается значение `a.b.c`
Столбец в модели
: Укажите, как будет называться столбец со значениями массива.
<div class="divider" style="width:25%; margin:30px 0;"></div>
## Прочее
<div class="divider" style="width:25%; margin:30px 0;"></div>
### Обратная связь
Автор: <a href="https://analyticworkspace.ru" target="_blank">Analytic Workspace</a>.
Вопросы по работе JSON-блока вы можете задать в нашей
<a href="https://t.me/awcommunity" target="_blank">телеграм группе</a>.
<div class="divider" style="width:25%; margin:30px 0;"></div>