Получение данных из web-сервиса
Задача: заполнить таблицу объекта с названием covid
данными из web-сервиса
со статистикой по COVID-19.
import requests
import datetime
from pyspark.sql import Row
def after_load_covid(df, spark, app, *args, **kwargs):
"""
Заполнение датафрейма с ковидными данными
"""
r = requests.get('https://api.covidtracking.com/v1/us/daily.json')
if not r.ok:
raise Exception(f'Ошибка GET {r.url}. HTTP {r.status_code}: {r.text}')
rows = []
for line in r.json():
rows.append(Row(
at=datetime.datetime.strptime(str(line['date']), '%Y%m%d'),
positive=line['positive'],
hospitalizedcurrently=line['hospitalizedCurrently'],
hospitalizedcumulative=line['hospitalizedCumulative'],
onventilatorcurrently=line['onVentilatorCurrently'],
onventilatorcumulative=line['onVentilatorCumulative'],
))
return spark.createDataFrame(rows)
Кеширование данных с использованием Storage
Задача: запрашивать данные по COVID-19 из предыдущего примера не чаще 1 раза в сутки
import requests
import datetime
from pyspark.sql import Row
STORAGE_KEY = 'covid_data'
def before_all(spark, app, *args, **kwargs):
""" """
# Запрос метаданных ранее сохраненных данных
stored_meta = app.storage.get_meta(STORAGE_KEY)
if not stored_meta or (datetime.datetime.now() - stored_meta.last_modified).total_seconds() > 86400:
# Запрашиваем данные с web-сервиса либо когда их нет в хранилище,
# либо не чаще раза в сутки
r = requests.get('https://api.covidtracking.com/v1/us/daily.json')
if not r.ok:
raise Exception(f'Ошибка GET {r.url}. HTTP {r.status_code}: {r.text}')
# Cохраняем данные с web-сервиса в хранилище
app.storage.save(STORAGE_KEY, r.json())
def after_load_covid(df, spark, app, *args, **kwargs):
"""
Заполнение датафрейма с ковидными данными
"""
covid_data = app.storage.read(STORAGE_KEY)
rows = []
for line in covid_data:
rows.append(Row(
at=datetime.datetime.strptime(str(line['date']), '%Y%m%d'),
positive=line['positive'],
hospitalizedcurrently=line['hospitalizedCurrently'],
hospitalizedcumulative=line['hospitalizedCumulative'],
onventilatorcurrently=line['onVentilatorCurrently'],
onventilatorcumulative=line['onVentilatorCumulative'],
))
return spark.createDataFrame(rows)