Введение

В этой статье я создам конвейер данных для передачи и анализа данных фильмов с IMDb.

Конвейер данных будет создан с использованием следующих инструментов:

  1. Прием данных: веб-скрапинг из IMDB с использованием Python
  2. Хранение данных: Google BigQuery
  3. Анализ данных: DBT
  4. Визуализация данных: Power BI
  5. Организация данных: Apache Airflow
  6. Развертывание контейнера: Docker

Проект будет иметь следующую структуру:

Проверьте мой репозиторий git для получения полного кода и пояснений.



1. Развертывание воздушного потока

В этом проекте я использовал docker compose для развертывания воздушного потока с использованием различных контейнеров. Я подготовил файл bash, чтобы помочь с развертыванием. Просто запустите следующий файл в папке вашего проекта, чтобы начать:

#!/bin/bash

### Clean up Environment ###

if [ "$1" == "--clean-up" ]; then

    # Remove airflow folder
    rm -r $(pwd)/airflow

    # Stop and remove all containers
    docker stop $(docker ps -a -q)
    docker rm $(docker ps -a -q)

    # Remove all containers, volumes and images related to the environment
    #docker-compose down --volumes --rmi all

    # Remove all process listening to port 8080
    lsof -i tcp:8080 | grep root | awk '{print $2}' | xargs kill -9
fi

### Install Airflow locally ###

# Set Airflow's home
mkdir $(pwd)/airflow
export AIRFLOW_HOME=$(pwd)/airflow

# Set airflow and python version 
AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# pipe install airflow based on the constaints file
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

### Deploy Airflow using docker-compose ###

# Move to airflow folder
cd airflow

# fetch the file docker-compose.yaml 
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.1/docker-compose.yaml'

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.1/airflow.sh'
chmod +x airflow.sh

# Create an empty .env file
> .env

# Add an airflow user id to the file
echo AIRFLOW_UID=50000 >> .env
echo PYTHONPATH=$(pwd) >> .env

export PYTHONPATH=$(pwd)

# Add directories to the dags folder
cd airflow/dags

mkdir data
mkdir helper

# Start airflow
docker-compose up -d

Перейдите на localhost:8080 и войдите, используя:

  • имя пользователя: воздушный поток
  • пароль: поток воздуха

2. Прием данных

Первым шагом является получение данных. Я буду использовать python для веб-скрейпинга IMDB и получения данных о различных списках фильмов в диаграммах IMDb, таких как 250 лучших фильмов с рейтингом, самые популярные фильмы, лучшие английские фильмы и т. д.

Я буду кодировать эту часть в каталоге airflow/dags/helper/scrape_imdb_charts.py. Docker позаботится о том, чтобы все, что находится в каталоге airflow/dags, автоматически переносилось в каждый из контейнеров. Это поможет синхронизировать работу, которую мы выполняем во всех контейнерах.

Я создал файл с именем scrape_imdb_charts.py, первой функцией которого будет отправка запроса GET и анализ ответа с помощью библиотеки BeautifulSoup Python:

# web_scraping helper
import requests
from bs4 import BeautifulSoup
import os
import sys
from google.cloud import bigquery
import datetime
import pandas as pd

# Creating an Environmental Variable for the service key configuration
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json'

# Create a client
bigquery_client = bigquery.Client()

def _get_soup(chart):

    '''
    Get the BeautifulSoup object from a url.
    Args:
        - chart(str) = chart to scrape
            Options: 'most_popular_movies', 'top_250_movies', 'top_english_movies', 'top_250_tv'
    Returns:
        - soup(BeautifulSoup) = BeautifulSoup object
    '''
    
    # Send a get request and parse using BeautifulSoup
    if chart == 'most_popular_movies':
        url = 'https://www.imdb.com/chart/moviemeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_2'
    
    if chart == 'top_250_movies':
        url = 'https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_3'
    
    if chart == 'top_english_movies':
        url = 'https://www.imdb.com/chart/top-english-movies?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=3YMHR1ECWH2NNG5TPH1C&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=boxoffice&ref_=chtbo_ql_4'
    
    if chart == 'top_250_tv':
        url = 'https://www.imdb.com/chart/tvmeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=J9H259QR55SJJ93K51B2&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=topenglish&ref_=chttentp_ql_5'

    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    return soup

Следующей функцией вспомогательного файла будет обработка html-данных из объекта супа и создание кадра данных Panda, содержащего все соответствующие данные:

# web_scraping helper
import requests
from bs4 import BeautifulSoup
import os
import sys
from google.cloud import bigquery
import datetime
import pandas as pd

# Creating an Environmental Variable for the service key configuration
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json'

# Create a client
bigquery_client = bigquery.Client()

def _get_soup(chart):

    '''
    Get the BeautifulSoup object from a url.
    Args:
        - chart(str) = chart to scrape
            Options: 'most_popular_movies', 'top_250_movies', 'top_english_movies', 'top_250_tv'
    Returns:
        - soup(BeautifulSoup) = BeautifulSoup object
    '''
    
    # Send a get request and parse using BeautifulSoup
    if chart == 'most_popular_movies':
        url = 'https://www.imdb.com/chart/moviemeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_2'
    
    if chart == 'top_250_movies':
        url = 'https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_3'
    
    if chart == 'top_english_movies':
        url = 'https://www.imdb.com/chart/top-english-movies?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=3YMHR1ECWH2NNG5TPH1C&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=boxoffice&ref_=chtbo_ql_4'
    
    if chart == 'top_250_tv':
        url = 'https://www.imdb.com/chart/tvmeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=J9H259QR55SJJ93K51B2&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=topenglish&ref_=chttentp_ql_5'

    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    return soup


def _scrape_movies(soup):

    '''
    Scrape the most popular titles and ratings from the IMDB website.
    Args:
        - soup(BeautifulSoup) = BeautifulSoup object
    Returns:
        - movie_dict(dict) = Dictionary of movie names and ratings
    '''
    # Find all movie names in the url
    movie_names = []
    movie_years = []
    movie_ratings = []
    user_votings = []

    # Find all movie in the url
    titlesRefs = soup.find_all('td', {'class':'titleColumn'})
    ratingsRefs = soup.find_all('td', {'class':'ratingColumn imdbRating'})

    # Collect movie title, release year, ratings and user votings
    for title in titlesRefs:
        try:
            movie_names.append(title.find("a").text)
        except:
            print('Missing title. Replacing with -1')
            movie_names.append(-1)
        
        try:
            movie_years.append(int(title.find("span").text[1:-1]))
        except:
            print('Missing year. Replacing with -1')
            movie_years.append(-1)

    for rating in ratingsRefs:
        try:
            movie_ratings.append(float(rating.find("strong").text))    
        except:
            print('Missing rating. Replacing with -1')
            movie_ratings.append(-1)

        try:
            votes_str = rating.find("strong").attrs['title']
            votes_str = votes_str.split(' ')[3]
            votes_int = int(votes_str.replace(',', ''))
            user_votings.append(votes_int)
        except:
            user_votings.append(-1)

    # Create a dataframe
    movie_df = pd.DataFrame({'movie_name': movie_names, 'movie_year': movie_years, 'movie_rating': movie_ratings, 'user_votings': user_votings})

    # Add movie_id
    movie_df['movie_id'] = movie_df.index + 1

    # set date
    movie_df['update_date'] = datetime.datetime.today().strftime('%Y-%m-%d')

    # reorder columns
    movie_df = movie_df[['movie_id', 'movie_name', 'movie_year', 'movie_rating', 'user_votings' ,'update_date']]

    return movie_df

Давайте посмотрим, как эти две функции работают до сих пор:

def main():
    soup = _get_soup(chart='top_250_movies')
    movies_df = _scrape_movies(soup)
    print(movies_df.head())

if __name__ == '__main__':
    main()

Выход:

movie_id                movie_name  movie_year  movie_rating  user_votings update_date
0         1  The Shawshank Redemption        1994           9.2       2747079  2023-06-04
1         2             The Godfather        1972           9.2       1910576  2023-06-04
2         3           The Dark Knight        2008           9.0       2719685  2023-06-04
3         4     The Godfather Part II        1974           9.0       1301552  2023-06-04
4         5              12 Angry Men        1957           9.0        813395  2023-06-04

Как мы видим, единственный аргумент в объекте супа используется для указания диаграммы на веб-сайте IMDb. Согласно диаграмме, функция _get_soup будет знать, какой URL нужно очистить. Вы можете проверить строку документа этой функции для получения списка доступных диаграмм.

3. Хранение данных

Теперь, когда у нас есть данные, давайте загрузим их в BigQuery. Для этого я использовал следующие три функции.

  1. _getOrCreate_dataset(dataset_name): получение набора данных BigQuery. Если набор данных не существует, создайте его.
  2. _getOrCreate_table(dataset_name, table_name): после получения/создания набора данных получите или создайте таблицу.
  3. _load_to_bigQuery(movie_names, chart, dataset_name=’imdb’): после получения/создания наборов данных и таблиц эта функция загрузит данные в правильную таблицу в соответствии с диаграммой, использованной для очистки данных. Эта функция усекает и перезаписывает существующие данные в BigQuery.

Примечание:

Я предполагаю, что у меня есть некоторые знания о GCP и BigQuery, но для выполнения этой работы вам необходимо создать проект и учетную запись службы в GCP, загрузить файл ключа JSON учетной записи службы и добавить его в airflow/dags/configs. .

Полный файл scrape_imdb_charts.py теперь выглядит так:

# web_scraping helper
import requests
from bs4 import BeautifulSoup
import os
import sys
from google.cloud import bigquery
import datetime
import pandas as pd

# Creating an Environmental Variable for the service key configuration
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json'

# Create a client
bigquery_client = bigquery.Client()

def _get_soup(chart):

    '''
    Get the BeautifulSoup object from a url.
    Args:
        - chart(str) = chart to scrape
            Options: 'most_popular_movies', 'top_250_movies', 'top_english_movies', 'top_250_tv'
    Returns:
        - soup(BeautifulSoup) = BeautifulSoup object
    '''
    
    # Send a get request and parse using BeautifulSoup
    if chart == 'most_popular_movies':
        url = 'https://www.imdb.com/chart/moviemeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_2'
    
    if chart == 'top_250_movies':
        url = 'https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_3'
    
    if chart == 'top_english_movies':
        url = 'https://www.imdb.com/chart/top-english-movies?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=3YMHR1ECWH2NNG5TPH1C&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=boxoffice&ref_=chtbo_ql_4'
    
    if chart == 'top_250_tv':
        url = 'https://www.imdb.com/chart/tvmeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=J9H259QR55SJJ93K51B2&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=topenglish&ref_=chttentp_ql_5'

    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    return soup


def _scrape_movies(soup):

    '''
    Scrape the most popular titles and ratings from the IMDB website.
    Args:
        - soup(BeautifulSoup) = BeautifulSoup object
    Returns:
        - movie_dict(dict) = Dictionary of movie names and ratings
    '''
    # Find all movie names in the url
    movie_names = []
    movie_years = []
    movie_ratings = []
    user_votings = []

    # Find all movie in the url
    titlesRefs = soup.find_all('td', {'class':'titleColumn'})
    ratingsRefs = soup.find_all('td', {'class':'ratingColumn imdbRating'})

    # Collect movie title, release year, ratings and user votings
    for title in titlesRefs:
        try:
            movie_names.append(title.find("a").text)
        except:
            print('Missing title. Replacing with -1')
            movie_names.append(-1)
        
        try:
            movie_years.append(int(title.find("span").text[1:-1]))
        except:
            print('Missing year. Replacing with -1')
            movie_years.append(-1)

    for rating in ratingsRefs:
        try:
            movie_ratings.append(float(rating.find("strong").text))    
        except:
            print('Missing rating. Replacing with -1')
            movie_ratings.append(-1)

        try:
            votes_str = rating.find("strong").attrs['title']
            votes_str = votes_str.split(' ')[3]
            votes_int = int(votes_str.replace(',', ''))
            user_votings.append(votes_int)
        except:
            user_votings.append(-1)

    # Create a dataframe
    movie_df = pd.DataFrame({'movie_name': movie_names, 'movie_year': movie_years, 'movie_rating': movie_ratings, 'user_votings': user_votings})

    # Add movie_id
    movie_df['movie_id'] = movie_df.index + 1

    # set date
    movie_df['update_date'] = datetime.datetime.today().strftime('%Y-%m-%d')

    # reorder columns
    movie_df = movie_df[['movie_id', 'movie_name', 'movie_year', 'movie_rating', 'user_votings' ,'update_date']]

    return movie_df

# Create a dataset called test_dataset
def _getOrCreate_dataset(dataset_name :str) -> bigquery.dataset.Dataset:

    '''
    Get dataset. If the dataset does not exists, create it.
    
    Args:
        - dataset_name(str) = Name of the new/existing data set.
        - project_id(str) = project id(default = The project id of the bigquery_client object)

    Returns:
        - dataset(google.cloud.bigquery.dataset.Dataset) = Google BigQuery Dataset
    '''

    print('Fetching Dataset...')

    try:
        # get and return dataset if exist
        dataset = bigquery_client.get_dataset(dataset_name)
        print('Done')
        print(dataset.self_link)
        return dataset

    except Exception as e:
        # If not, create and return dataset
        if e.code == 404:
            print('Dataset does not exist. Creating a new one.')
            bigquery_client.create_dataset(dataset_name)
            dataset = bigquery_client.get_dataset(dataset_name)
            print('Done')
            print(dataset.self_link)
            return dataset
        else:
            print(e)

def _getOrCreate_table(dataset_name:str, table_name:str) -> bigquery.table.Table:


    '''
    Create a table. If the table already exists, return it.
    
    Args:
        - table_name(str) = Name of the new/existing table.
        - dataset_name(str) = Name of the new/existing data set.
        - project_id(str) = project id(default = The project id of the bigquery_client object)

    Returns:
        - table(google.cloud.bigquery.table.Table) = Google BigQuery table
    '''

    # Grab prerequisites for creating a table
    dataset = _getOrCreate_dataset(dataset_name)
    project = dataset.project
    dataset = dataset.dataset_id
    table_id = project + '.' + dataset + '.' + table_name

    print('\nFetching Table...')

    try:
        # Get table if exists
        table = bigquery_client.get_table(table_id)
        print('Done')
        print(table.self_link)
    except Exception as e:

        # If not, create and get table
        if e.code == 404:
            print('Table does not exist. Creating a new one.')
            bigquery_client.create_table(table_id)
            table = bigquery_client.get_table(table_id)
            print(table.self_link)
        else:
            print(e)
    finally:
        return table

def _load_to_bigQuery(movie_names, chart, dataset_name='imdb'):

    '''
    Load data into BigQuery table.
    Args:
        - movie_names(list) = List of movie names
        - chart(str) = Name of the chart
            Options: most_popular_movies, top_250_movies, top_english_movies, top_250_tv
        - dataset_name(str) = Name of the new/existing data set.
        - project_id(str) = project id(default = The project id of the bigquery_client object)
        - date_to_load(datetime.datetime) = Date to load into the table
    Returns:
        - None
    Notes:
        - The function will create a new dataset and table if they do not exist.
        - The function will overwrite the table if it already exists.
    '''

    if chart == 'most_popular_movies':
       table_name = 'most_popular_movies'
    
    if chart == 'top_250_movies':
        table_name = 'top_250_movies'

    if chart == 'top_english_movies':
        table_name = 'top_english_movies'

    if chart == 'top_250_tv':
        table_name = 'top_250_tv'

    # Create a table
    table = _getOrCreate_table(dataset_name, table_name)

    # Create a job config
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        schema=[
            bigquery.SchemaField("movie_id", bigquery.enums.SqlTypeNames.INT64),
            bigquery.SchemaField("movie_name", bigquery.enums.SqlTypeNames.STRING),
            bigquery.SchemaField("movie_year", bigquery.enums.SqlTypeNames.INT64),
            bigquery.SchemaField("movie_rating", bigquery.enums.SqlTypeNames.FLOAT64),
            bigquery.SchemaField("user_votings", bigquery.enums.SqlTypeNames.INT64),
            bigquery.SchemaField("update_date", bigquery.enums.SqlTypeNames.DATE),
        ],
        write_disposition="WRITE_TRUNCATE",
    )

    # Load data into the table
    job = bigquery_client.load_table_from_dataframe(
        movie_names, table, job_config=job_config
    )

    # Wait for the job to complete
    job.result()

    # Check if the job is done
    print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_name, table_name))

Запустим файл от начала до конца, чтобы загрузить данные из диаграммы «250 лучших фильмов»:

def main():
    soup = _get_soup(chart='top_250_movies')
    movies_df = _scrape_movies(soup)
    _load_to_bigQuery(movies_df, chart='top_250_movies')

if __name__ == '__main__':
    main()

Конечным результатом должна стать новая таблица с именем «top_250_movies» в bigquery, содержащая данные из диаграммы IMDb «250 лучших фильмов с рейтингом».

4. Оркестрация с использованием Apache Airflow

После реализации этого конвейера данных вручную я быстро добавлю эти задачи в качестве задач воздушного потока. Это будет легко, мы можем просто импортировать эти функции из файла scrape_imdb_charts.py и использовать их как задачи с помощью API задач airflow.

Первый даг воздушного потока будет называться «top_250_movies.py», чтобы создать этот файл в разделе airflow/dags и добавить следующий код:

import airflow
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
import datetime
import os
import sys
from helper.scrape_imdb_charts import _get_soup, _scrape_movies , _load_to_bigQuery

# Creating an Environmental Variable for the service key configuration
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json'

default_args = {
    'start_date': datetime.datetime.today(),
    'schedule_interval': '0 0 * * *' # Run every day at midnight    
}

with DAG(dag_id = 'top_250_movies', default_args = default_args, catchup=False) as dag:

    # Dag #1: Get the most popular movies
    @task
    def scrape_movies():
        soup = _get_soup(chart='top_250_movies')
        movie_df = _scrape_movies(soup)
        return movie_df
    
    # Dag #2: Load the most popular movies
    @task
    def load_movies(movies_df):
        _load_to_bigQuery(movies_df, chart='top_250_movies')

    # Dependencies
    movies_df = scrape_movies()
    load_movies(movies_df)

Как мы видим, все, что нам нужно было здесь сделать, это импортировать функции из helper.scrape_imdb_charts и использовать их внутри нового дага airflow. Давайте посмотрим, как это выглядит в пользовательском интерфейсе воздушного потока, и запустим его:

Похоже, он работает.

Пока что это касается одной таблицы внутри BigQuery, чтобы создать больше, нам просто нужно создать другие dags в airflow/dags и просто изменить диаграмму с «top_250_movies» на некоторые другие доступные диаграммы, такие как: «most_popular_movies», «top_250_tv». и «top_english_movies».

Например, давайте проделаем это еще раз для диаграммы «самые_популярные_фильмы». Я создам даг с именем most_popular_movies.py в airflow/dags и использую следующий код:

import airflow
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
import datetime
import os
import sys
from helper.scrape_imdb_charts import _get_soup, _scrape_movies , _load_to_bigQuery

# Creating an Environmental Variable for the service key configuration
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json'

default_args = {
    'start_date': datetime.datetime.today(),
    'schedule_interval': '0 0 * * *' # Run every day at midnight    
}

with DAG(dag_id = 'most_popular_movies', default_args = default_args, catchup=False) as dag:

    # Dag #1: Get the most popular movies
    @task
    def scrape_movies():
        soup = _get_soup(chart='most_popular_movies')
        movie_df = _scrape_movies(soup)
        return movie_df
    
    # Dag #2: Load the most popular movies
    @task
    def load_movies(movies_df):
        _load_to_bigQuery(movies_df, chart='most_popular_movies')

    # Dependencies
    movie_df = scrape_movies()
    load_movies(movie_df)

После запуска этого второго дага у меня должно быть две таблицы в BigQuery:

5. Анализ данных с использованием DBT

После создания четырех таблиц в BigQuery я создам из них несколько моделей с помощью DBT. DBT будет работать поверх BigQuery и позволит нам повысить его производительность.

Я не буду вдаваться в подробности развертывания и подключения BigQuery к DBT, для этого используйте следующее руководство:



Я сгенерировал три представления, содержащие различный анализ данных. После использования dbt run BigQuery создаст каждый из следующих запросов в виде представлений. Это позволит нам проводить различный анализ данных отдельно от реальных таблиц BigQuery, а также поможет упростить следующий этап конвейера — визуализацию данных с помощью Power BI.

В первом просмотре будут представлены фильмы с самым высоким рейтингом за год:

/*Top rated movies per year*/
select 
  movie_year,
  movie_name,
  movie_rating
from
(
  SELECT 
    m1.movie_year,
    m1.movie_name,
    m1.movie_rating,
    row_number() over (partition by m1.movie_year order by m1.movie_rating desc) as rank
  FROM imdb.top_250_movies as m1
  full join imdb.most_popular_movies as m2 on
    m1.movie_name = m2.movie_name 
  full join imdb.top_english_movies as m3 on
    m1.movie_name = m3.movie_name 
  order by m1.movie_year desc
) as aaa
where 
  aaa.rank = 1

Во второй покажут самые рейтинговые фильмы за десятилетие:

/*Top rated movies per decade*/
select *
from
(
  select 
    movie_year,
    movie_name,
    decade,
    movie_rating,
    row_number() over (partition by decade order by movie_rating desc) as rank
  from
  (
    SELECT 
      m1.movie_year,
      m1.movie_name,
      m1.movie_rating,
      case
        when m1.movie_year between 1920 and 1930 then '20s'
        when m1.movie_year between 1930 and 1940 then '30s'
        when m1.movie_year between 1940 and 1950 then '40s'
        when m1.movie_year between 1950 and 1960 then '50s'
        when m1.movie_year between 1960 and 1970 then '60s'
        when m1.movie_year between 1970 and 1980 then '70s'
        when m1.movie_year between 1980 and 1990 then '80s'
        when m1.movie_year between 1990 and 2000 then '90s'
        when m1.movie_year between 2000 and 2010 then '2000s'
        when m1.movie_year between 2010 and 2020 then '2010s'
        when m1.movie_year between 2020 and 2030 then '2020s'
      end as decade
    FROM imdb.top_250_movies as m1
    full join imdb.most_popular_movies as m2 on
      m1.movie_name = m2.movie_name 
    full join imdb.top_english_movies as m3 on
      m1.movie_name = m3.movie_name 
    order by m1.movie_year desc
  ) as aaa
  where
    aaa.movie_name is not null
) bbb
where 
  bbb.rank = 1
order by 
  bbb.movie_year desc

А третья реализует скользящее среднее по общему количеству голосов пользователей за год.

/*User voting trend*/

select 
  movie_year,
  total_votes,
  round((total_votes + (LAG(total_votes, 2) over(ORDER BY movie_year)) + (LAG(total_votes, 1) over(ORDER BY movie_year ))) / 3, 2) as Moving_Average_3
from
(
    SELECT 
      m1.movie_year,
      sum(m1.user_votings) as total_votes
    FROM imdb.top_250_movies as m1
    full join imdb.most_popular_movies as m2 on
      m1.movie_name = m2.movie_name 
    full join imdb.top_english_movies as m3 on
      m1.movie_name = m3.movie_name 
    group by
      m1.movie_year
) as aaa
order by movie_year

Далее я запущу dbt run, чтобы реализовать эти запросы как представление в BigQuery:

Как мы видим, теперь у нас есть четыре таблицы и три представления в BigQuery. Далее я буду использовать эти представления для создания трех простых отчетов Power BI.

6. Визуализация данных: Power BI

Первый отчет: фильм с самым высоким рейтингом в каждом году:

Второй отчет: фильмы с самым высоким рейтингом в каждом десятилетии:

Третий отчет: динамика голосования пользователей по годам с использованием скользящего среднего:

Это для статьи, полный код можно найти в моем репозитории GitHub.