Введение
В этой статье я создам конвейер данных для передачи и анализа данных фильмов с IMDb.
Конвейер данных будет создан с использованием следующих инструментов:
- Прием данных: веб-скрапинг из IMDB с использованием Python
- Хранение данных: Google BigQuery
- Анализ данных: DBT
- Визуализация данных: Power BI
- Организация данных: Apache Airflow
- Развертывание контейнера: Docker
Проект будет иметь следующую структуру:
Проверьте мой репозиторий git для получения полного кода и пояснений.
1. Развертывание воздушного потока
В этом проекте я использовал docker compose для развертывания воздушного потока с использованием различных контейнеров. Я подготовил файл bash, чтобы помочь с развертыванием. Просто запустите следующий файл в папке вашего проекта, чтобы начать:
#!/bin/bash ### Clean up Environment ### if [ "$1" == "--clean-up" ]; then # Remove airflow folder rm -r $(pwd)/airflow # Stop and remove all containers docker stop $(docker ps -a -q) docker rm $(docker ps -a -q) # Remove all containers, volumes and images related to the environment #docker-compose down --volumes --rmi all # Remove all process listening to port 8080 lsof -i tcp:8080 | grep root | awk '{print $2}' | xargs kill -9 fi ### Install Airflow locally ### # Set Airflow's home mkdir $(pwd)/airflow export AIRFLOW_HOME=$(pwd)/airflow # Set airflow and python version AIRFLOW_VERSION=2.5.0 PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" # pipe install airflow based on the constaints file CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" ### Deploy Airflow using docker-compose ### # Move to airflow folder cd airflow # fetch the file docker-compose.yaml curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.1/docker-compose.yaml' curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.1/airflow.sh' chmod +x airflow.sh # Create an empty .env file > .env # Add an airflow user id to the file echo AIRFLOW_UID=50000 >> .env echo PYTHONPATH=$(pwd) >> .env export PYTHONPATH=$(pwd) # Add directories to the dags folder cd airflow/dags mkdir data mkdir helper # Start airflow docker-compose up -d
Перейдите на localhost:8080 и войдите, используя:
- имя пользователя: воздушный поток
- пароль: поток воздуха
2. Прием данных
Первым шагом является получение данных. Я буду использовать python для веб-скрейпинга IMDB и получения данных о различных списках фильмов в диаграммах IMDb, таких как 250 лучших фильмов с рейтингом, самые популярные фильмы, лучшие английские фильмы и т. д.
Я буду кодировать эту часть в каталоге airflow/dags/helper/scrape_imdb_charts.py. Docker позаботится о том, чтобы все, что находится в каталоге airflow/dags, автоматически переносилось в каждый из контейнеров. Это поможет синхронизировать работу, которую мы выполняем во всех контейнерах.
Я создал файл с именем scrape_imdb_charts.py, первой функцией которого будет отправка запроса GET и анализ ответа с помощью библиотеки BeautifulSoup Python:
# web_scraping helper import requests from bs4 import BeautifulSoup import os import sys from google.cloud import bigquery import datetime import pandas as pd # Creating an Environmental Variable for the service key configuration os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json' # Create a client bigquery_client = bigquery.Client() def _get_soup(chart): ''' Get the BeautifulSoup object from a url. Args: - chart(str) = chart to scrape Options: 'most_popular_movies', 'top_250_movies', 'top_english_movies', 'top_250_tv' Returns: - soup(BeautifulSoup) = BeautifulSoup object ''' # Send a get request and parse using BeautifulSoup if chart == 'most_popular_movies': url = 'https://www.imdb.com/chart/moviemeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_2' if chart == 'top_250_movies': url = 'https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_3' if chart == 'top_english_movies': url = 'https://www.imdb.com/chart/top-english-movies?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=3YMHR1ECWH2NNG5TPH1C&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=boxoffice&ref_=chtbo_ql_4' if chart == 'top_250_tv': url = 'https://www.imdb.com/chart/tvmeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=J9H259QR55SJJ93K51B2&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=topenglish&ref_=chttentp_ql_5' response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') return soup
Следующей функцией вспомогательного файла будет обработка html-данных из объекта супа и создание кадра данных Panda, содержащего все соответствующие данные:
# web_scraping helper import requests from bs4 import BeautifulSoup import os import sys from google.cloud import bigquery import datetime import pandas as pd # Creating an Environmental Variable for the service key configuration os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json' # Create a client bigquery_client = bigquery.Client() def _get_soup(chart): ''' Get the BeautifulSoup object from a url. Args: - chart(str) = chart to scrape Options: 'most_popular_movies', 'top_250_movies', 'top_english_movies', 'top_250_tv' Returns: - soup(BeautifulSoup) = BeautifulSoup object ''' # Send a get request and parse using BeautifulSoup if chart == 'most_popular_movies': url = 'https://www.imdb.com/chart/moviemeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_2' if chart == 'top_250_movies': url = 'https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_3' if chart == 'top_english_movies': url = 'https://www.imdb.com/chart/top-english-movies?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=3YMHR1ECWH2NNG5TPH1C&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=boxoffice&ref_=chtbo_ql_4' if chart == 'top_250_tv': url = 'https://www.imdb.com/chart/tvmeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=J9H259QR55SJJ93K51B2&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=topenglish&ref_=chttentp_ql_5' response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') return soup def _scrape_movies(soup): ''' Scrape the most popular titles and ratings from the IMDB website. Args: - soup(BeautifulSoup) = BeautifulSoup object Returns: - movie_dict(dict) = Dictionary of movie names and ratings ''' # Find all movie names in the url movie_names = [] movie_years = [] movie_ratings = [] user_votings = [] # Find all movie in the url titlesRefs = soup.find_all('td', {'class':'titleColumn'}) ratingsRefs = soup.find_all('td', {'class':'ratingColumn imdbRating'}) # Collect movie title, release year, ratings and user votings for title in titlesRefs: try: movie_names.append(title.find("a").text) except: print('Missing title. Replacing with -1') movie_names.append(-1) try: movie_years.append(int(title.find("span").text[1:-1])) except: print('Missing year. Replacing with -1') movie_years.append(-1) for rating in ratingsRefs: try: movie_ratings.append(float(rating.find("strong").text)) except: print('Missing rating. Replacing with -1') movie_ratings.append(-1) try: votes_str = rating.find("strong").attrs['title'] votes_str = votes_str.split(' ')[3] votes_int = int(votes_str.replace(',', '')) user_votings.append(votes_int) except: user_votings.append(-1) # Create a dataframe movie_df = pd.DataFrame({'movie_name': movie_names, 'movie_year': movie_years, 'movie_rating': movie_ratings, 'user_votings': user_votings}) # Add movie_id movie_df['movie_id'] = movie_df.index + 1 # set date movie_df['update_date'] = datetime.datetime.today().strftime('%Y-%m-%d') # reorder columns movie_df = movie_df[['movie_id', 'movie_name', 'movie_year', 'movie_rating', 'user_votings' ,'update_date']] return movie_df
Давайте посмотрим, как эти две функции работают до сих пор:
def main(): soup = _get_soup(chart='top_250_movies') movies_df = _scrape_movies(soup) print(movies_df.head()) if __name__ == '__main__': main()
Выход:
movie_id movie_name movie_year movie_rating user_votings update_date 0 1 The Shawshank Redemption 1994 9.2 2747079 2023-06-04 1 2 The Godfather 1972 9.2 1910576 2023-06-04 2 3 The Dark Knight 2008 9.0 2719685 2023-06-04 3 4 The Godfather Part II 1974 9.0 1301552 2023-06-04 4 5 12 Angry Men 1957 9.0 813395 2023-06-04
Как мы видим, единственный аргумент в объекте супа используется для указания диаграммы на веб-сайте IMDb. Согласно диаграмме, функция _get_soup будет знать, какой URL нужно очистить. Вы можете проверить строку документа этой функции для получения списка доступных диаграмм.
3. Хранение данных
Теперь, когда у нас есть данные, давайте загрузим их в BigQuery. Для этого я использовал следующие три функции.
- _getOrCreate_dataset(dataset_name): получение набора данных BigQuery. Если набор данных не существует, создайте его.
- _getOrCreate_table(dataset_name, table_name): после получения/создания набора данных получите или создайте таблицу.
- _load_to_bigQuery(movie_names, chart, dataset_name=’imdb’): после получения/создания наборов данных и таблиц эта функция загрузит данные в правильную таблицу в соответствии с диаграммой, использованной для очистки данных. Эта функция усекает и перезаписывает существующие данные в BigQuery.
Примечание:
Я предполагаю, что у меня есть некоторые знания о GCP и BigQuery, но для выполнения этой работы вам необходимо создать проект и учетную запись службы в GCP, загрузить файл ключа JSON учетной записи службы и добавить его в airflow/dags/configs. .
Полный файл scrape_imdb_charts.py теперь выглядит так:
# web_scraping helper import requests from bs4 import BeautifulSoup import os import sys from google.cloud import bigquery import datetime import pandas as pd # Creating an Environmental Variable for the service key configuration os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json' # Create a client bigquery_client = bigquery.Client() def _get_soup(chart): ''' Get the BeautifulSoup object from a url. Args: - chart(str) = chart to scrape Options: 'most_popular_movies', 'top_250_movies', 'top_english_movies', 'top_250_tv' Returns: - soup(BeautifulSoup) = BeautifulSoup object ''' # Send a get request and parse using BeautifulSoup if chart == 'most_popular_movies': url = 'https://www.imdb.com/chart/moviemeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_2' if chart == 'top_250_movies': url = 'https://www.imdb.com/chart/top?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=5V6VAGPEK222QB9E0SZ8&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=toptv&ref_=chttvtp_ql_3' if chart == 'top_english_movies': url = 'https://www.imdb.com/chart/top-english-movies?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=3YMHR1ECWH2NNG5TPH1C&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=boxoffice&ref_=chtbo_ql_4' if chart == 'top_250_tv': url = 'https://www.imdb.com/chart/tvmeter?pf_rd_m=A2FGELUUNOQJNL&pf_rd_p=470df400-70d9-4f35-bb05-8646a1195842&pf_rd_r=J9H259QR55SJJ93K51B2&pf_rd_s=right-4&pf_rd_t=15506&pf_rd_i=topenglish&ref_=chttentp_ql_5' response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') return soup def _scrape_movies(soup): ''' Scrape the most popular titles and ratings from the IMDB website. Args: - soup(BeautifulSoup) = BeautifulSoup object Returns: - movie_dict(dict) = Dictionary of movie names and ratings ''' # Find all movie names in the url movie_names = [] movie_years = [] movie_ratings = [] user_votings = [] # Find all movie in the url titlesRefs = soup.find_all('td', {'class':'titleColumn'}) ratingsRefs = soup.find_all('td', {'class':'ratingColumn imdbRating'}) # Collect movie title, release year, ratings and user votings for title in titlesRefs: try: movie_names.append(title.find("a").text) except: print('Missing title. Replacing with -1') movie_names.append(-1) try: movie_years.append(int(title.find("span").text[1:-1])) except: print('Missing year. Replacing with -1') movie_years.append(-1) for rating in ratingsRefs: try: movie_ratings.append(float(rating.find("strong").text)) except: print('Missing rating. Replacing with -1') movie_ratings.append(-1) try: votes_str = rating.find("strong").attrs['title'] votes_str = votes_str.split(' ')[3] votes_int = int(votes_str.replace(',', '')) user_votings.append(votes_int) except: user_votings.append(-1) # Create a dataframe movie_df = pd.DataFrame({'movie_name': movie_names, 'movie_year': movie_years, 'movie_rating': movie_ratings, 'user_votings': user_votings}) # Add movie_id movie_df['movie_id'] = movie_df.index + 1 # set date movie_df['update_date'] = datetime.datetime.today().strftime('%Y-%m-%d') # reorder columns movie_df = movie_df[['movie_id', 'movie_name', 'movie_year', 'movie_rating', 'user_votings' ,'update_date']] return movie_df # Create a dataset called test_dataset def _getOrCreate_dataset(dataset_name :str) -> bigquery.dataset.Dataset: ''' Get dataset. If the dataset does not exists, create it. Args: - dataset_name(str) = Name of the new/existing data set. - project_id(str) = project id(default = The project id of the bigquery_client object) Returns: - dataset(google.cloud.bigquery.dataset.Dataset) = Google BigQuery Dataset ''' print('Fetching Dataset...') try: # get and return dataset if exist dataset = bigquery_client.get_dataset(dataset_name) print('Done') print(dataset.self_link) return dataset except Exception as e: # If not, create and return dataset if e.code == 404: print('Dataset does not exist. Creating a new one.') bigquery_client.create_dataset(dataset_name) dataset = bigquery_client.get_dataset(dataset_name) print('Done') print(dataset.self_link) return dataset else: print(e) def _getOrCreate_table(dataset_name:str, table_name:str) -> bigquery.table.Table: ''' Create a table. If the table already exists, return it. Args: - table_name(str) = Name of the new/existing table. - dataset_name(str) = Name of the new/existing data set. - project_id(str) = project id(default = The project id of the bigquery_client object) Returns: - table(google.cloud.bigquery.table.Table) = Google BigQuery table ''' # Grab prerequisites for creating a table dataset = _getOrCreate_dataset(dataset_name) project = dataset.project dataset = dataset.dataset_id table_id = project + '.' + dataset + '.' + table_name print('\nFetching Table...') try: # Get table if exists table = bigquery_client.get_table(table_id) print('Done') print(table.self_link) except Exception as e: # If not, create and get table if e.code == 404: print('Table does not exist. Creating a new one.') bigquery_client.create_table(table_id) table = bigquery_client.get_table(table_id) print(table.self_link) else: print(e) finally: return table def _load_to_bigQuery(movie_names, chart, dataset_name='imdb'): ''' Load data into BigQuery table. Args: - movie_names(list) = List of movie names - chart(str) = Name of the chart Options: most_popular_movies, top_250_movies, top_english_movies, top_250_tv - dataset_name(str) = Name of the new/existing data set. - project_id(str) = project id(default = The project id of the bigquery_client object) - date_to_load(datetime.datetime) = Date to load into the table Returns: - None Notes: - The function will create a new dataset and table if they do not exist. - The function will overwrite the table if it already exists. ''' if chart == 'most_popular_movies': table_name = 'most_popular_movies' if chart == 'top_250_movies': table_name = 'top_250_movies' if chart == 'top_english_movies': table_name = 'top_english_movies' if chart == 'top_250_tv': table_name = 'top_250_tv' # Create a table table = _getOrCreate_table(dataset_name, table_name) # Create a job config job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.CSV, schema=[ bigquery.SchemaField("movie_id", bigquery.enums.SqlTypeNames.INT64), bigquery.SchemaField("movie_name", bigquery.enums.SqlTypeNames.STRING), bigquery.SchemaField("movie_year", bigquery.enums.SqlTypeNames.INT64), bigquery.SchemaField("movie_rating", bigquery.enums.SqlTypeNames.FLOAT64), bigquery.SchemaField("user_votings", bigquery.enums.SqlTypeNames.INT64), bigquery.SchemaField("update_date", bigquery.enums.SqlTypeNames.DATE), ], write_disposition="WRITE_TRUNCATE", ) # Load data into the table job = bigquery_client.load_table_from_dataframe( movie_names, table, job_config=job_config ) # Wait for the job to complete job.result() # Check if the job is done print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_name, table_name))
Запустим файл от начала до конца, чтобы загрузить данные из диаграммы «250 лучших фильмов»:
def main(): soup = _get_soup(chart='top_250_movies') movies_df = _scrape_movies(soup) _load_to_bigQuery(movies_df, chart='top_250_movies') if __name__ == '__main__': main()
Конечным результатом должна стать новая таблица с именем «top_250_movies» в bigquery, содержащая данные из диаграммы IMDb «250 лучших фильмов с рейтингом».
4. Оркестрация с использованием Apache Airflow
После реализации этого конвейера данных вручную я быстро добавлю эти задачи в качестве задач воздушного потока. Это будет легко, мы можем просто импортировать эти функции из файла scrape_imdb_charts.py и использовать их как задачи с помощью API задач airflow.
Первый даг воздушного потока будет называться «top_250_movies.py», чтобы создать этот файл в разделе airflow/dags и добавить следующий код:
import airflow from airflow import DAG from airflow.decorators import task from airflow.operators.python import PythonOperator import datetime import os import sys from helper.scrape_imdb_charts import _get_soup, _scrape_movies , _load_to_bigQuery # Creating an Environmental Variable for the service key configuration os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json' default_args = { 'start_date': datetime.datetime.today(), 'schedule_interval': '0 0 * * *' # Run every day at midnight } with DAG(dag_id = 'top_250_movies', default_args = default_args, catchup=False) as dag: # Dag #1: Get the most popular movies @task def scrape_movies(): soup = _get_soup(chart='top_250_movies') movie_df = _scrape_movies(soup) return movie_df # Dag #2: Load the most popular movies @task def load_movies(movies_df): _load_to_bigQuery(movies_df, chart='top_250_movies') # Dependencies movies_df = scrape_movies() load_movies(movies_df)
Как мы видим, все, что нам нужно было здесь сделать, это импортировать функции из helper.scrape_imdb_charts и использовать их внутри нового дага airflow. Давайте посмотрим, как это выглядит в пользовательском интерфейсе воздушного потока, и запустим его:
Похоже, он работает.
Пока что это касается одной таблицы внутри BigQuery, чтобы создать больше, нам просто нужно создать другие dags в airflow/dags и просто изменить диаграмму с «top_250_movies» на некоторые другие доступные диаграммы, такие как: «most_popular_movies», «top_250_tv». и «top_english_movies».
Например, давайте проделаем это еще раз для диаграммы «самые_популярные_фильмы». Я создам даг с именем most_popular_movies.py в airflow/dags и использую следующий код:
import airflow from airflow import DAG from airflow.decorators import task from airflow.operators.python import PythonOperator import datetime import os import sys from helper.scrape_imdb_charts import _get_soup, _scrape_movies , _load_to_bigQuery # Creating an Environmental Variable for the service key configuration os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/opt/airflow/dags/configs/ServiceKey_GoogleCloud.json' default_args = { 'start_date': datetime.datetime.today(), 'schedule_interval': '0 0 * * *' # Run every day at midnight } with DAG(dag_id = 'most_popular_movies', default_args = default_args, catchup=False) as dag: # Dag #1: Get the most popular movies @task def scrape_movies(): soup = _get_soup(chart='most_popular_movies') movie_df = _scrape_movies(soup) return movie_df # Dag #2: Load the most popular movies @task def load_movies(movies_df): _load_to_bigQuery(movies_df, chart='most_popular_movies') # Dependencies movie_df = scrape_movies() load_movies(movie_df)
После запуска этого второго дага у меня должно быть две таблицы в BigQuery:
5. Анализ данных с использованием DBT
После создания четырех таблиц в BigQuery я создам из них несколько моделей с помощью DBT. DBT будет работать поверх BigQuery и позволит нам повысить его производительность.
Я не буду вдаваться в подробности развертывания и подключения BigQuery к DBT, для этого используйте следующее руководство:
Я сгенерировал три представления, содержащие различный анализ данных. После использования dbt run
BigQuery создаст каждый из следующих запросов в виде представлений. Это позволит нам проводить различный анализ данных отдельно от реальных таблиц BigQuery, а также поможет упростить следующий этап конвейера — визуализацию данных с помощью Power BI.
В первом просмотре будут представлены фильмы с самым высоким рейтингом за год:
/*Top rated movies per year*/ select movie_year, movie_name, movie_rating from ( SELECT m1.movie_year, m1.movie_name, m1.movie_rating, row_number() over (partition by m1.movie_year order by m1.movie_rating desc) as rank FROM imdb.top_250_movies as m1 full join imdb.most_popular_movies as m2 on m1.movie_name = m2.movie_name full join imdb.top_english_movies as m3 on m1.movie_name = m3.movie_name order by m1.movie_year desc ) as aaa where aaa.rank = 1
Во второй покажут самые рейтинговые фильмы за десятилетие:
/*Top rated movies per decade*/ select * from ( select movie_year, movie_name, decade, movie_rating, row_number() over (partition by decade order by movie_rating desc) as rank from ( SELECT m1.movie_year, m1.movie_name, m1.movie_rating, case when m1.movie_year between 1920 and 1930 then '20s' when m1.movie_year between 1930 and 1940 then '30s' when m1.movie_year between 1940 and 1950 then '40s' when m1.movie_year between 1950 and 1960 then '50s' when m1.movie_year between 1960 and 1970 then '60s' when m1.movie_year between 1970 and 1980 then '70s' when m1.movie_year between 1980 and 1990 then '80s' when m1.movie_year between 1990 and 2000 then '90s' when m1.movie_year between 2000 and 2010 then '2000s' when m1.movie_year between 2010 and 2020 then '2010s' when m1.movie_year between 2020 and 2030 then '2020s' end as decade FROM imdb.top_250_movies as m1 full join imdb.most_popular_movies as m2 on m1.movie_name = m2.movie_name full join imdb.top_english_movies as m3 on m1.movie_name = m3.movie_name order by m1.movie_year desc ) as aaa where aaa.movie_name is not null ) bbb where bbb.rank = 1 order by bbb.movie_year desc
А третья реализует скользящее среднее по общему количеству голосов пользователей за год.
/*User voting trend*/ select movie_year, total_votes, round((total_votes + (LAG(total_votes, 2) over(ORDER BY movie_year)) + (LAG(total_votes, 1) over(ORDER BY movie_year ))) / 3, 2) as Moving_Average_3 from ( SELECT m1.movie_year, sum(m1.user_votings) as total_votes FROM imdb.top_250_movies as m1 full join imdb.most_popular_movies as m2 on m1.movie_name = m2.movie_name full join imdb.top_english_movies as m3 on m1.movie_name = m3.movie_name group by m1.movie_year ) as aaa order by movie_year
Далее я запущу dbt run
, чтобы реализовать эти запросы как представление в BigQuery:
Как мы видим, теперь у нас есть четыре таблицы и три представления в BigQuery. Далее я буду использовать эти представления для создания трех простых отчетов Power BI.
6. Визуализация данных: Power BI
Первый отчет: фильм с самым высоким рейтингом в каждом году:
Второй отчет: фильмы с самым высоким рейтингом в каждом десятилетии:
Третий отчет: динамика голосования пользователей по годам с использованием скользящего среднего:
Это для статьи, полный код можно найти в моем репозитории GitHub.