Перейти к содержанию

Получение данных из web-сервиса

Задача: заполнить таблицу объекта с названием covid данными из web-сервиса со статистикой по COVID-19.

import requests
import datetime
from pyspark.sql import Row


def after_load_covid(df, spark, app, *args, **kwargs):
    """ 
    Заполнение датафрейма с ковидными данными
    """
    r = requests.get('https://api.covidtracking.com/v1/us/daily.json')
    if not r.ok:
        raise Exception(f'Ошибка GET {r.url}. HTTP {r.status_code}: {r.text}')

    rows = []
    for line in r.json():
        rows.append(Row(
            at=datetime.datetime.strptime(str(line['date']), '%Y%m%d'),
            positive=line['positive'],
            hospitalizedcurrently=line['hospitalizedCurrently'],
            hospitalizedcumulative=line['hospitalizedCumulative'],
            onventilatorcurrently=line['onVentilatorCurrently'],
            onventilatorcumulative=line['onVentilatorCumulative'],
        ))

    return spark.createDataFrame(rows)

Кеширование данных с использованием Storage

Задача: запрашивать данные по COVID-19 из предыдущего примера не чаще 1 раза в сутки

import requests
import datetime
from pyspark.sql import Row


STORAGE_KEY = 'covid_data'


def before_all(spark, app, *args, **kwargs):
    """ """
    # Запрос метаданных ранее сохраненных данных
    stored_meta = app.storage.get_meta(STORAGE_KEY)

    if not stored_meta or (datetime.datetime.now() - stored_meta.last_modified).total_seconds() > 86400:
        # Запрашиваем данные с web-сервиса либо когда их нет в хранилище,
        # либо не чаще раза в сутки
        r = requests.get('https://api.covidtracking.com/v1/us/daily.json')
        if not r.ok:
            raise Exception(f'Ошибка GET {r.url}. HTTP {r.status_code}: {r.text}')

        # Cохраняем данные с web-сервиса в хранилище
        app.storage.save(STORAGE_KEY, r.json())


def after_load_covid(df, spark, app, *args, **kwargs):
    """ 
    Заполнение датафрейма с ковидными данными
    """
    covid_data = app.storage.read(STORAGE_KEY)

    rows = []
    for line in covid_data:
        rows.append(Row(
            at=datetime.datetime.strptime(str(line['date']), '%Y%m%d'),
            positive=line['positive'],
            hospitalizedcurrently=line['hospitalizedCurrently'],
            hospitalizedcumulative=line['hospitalizedCumulative'],
            onventilatorcurrently=line['onVentilatorCurrently'],
            onventilatorcumulative=line['onVentilatorCumulative'],
        ))

    return spark.createDataFrame(rows)