We start by importing a few libraries and a secret YouTube API key. If you do not have an API key, you can create one following this guide.
import requests
import json
import polars as pl
from my_sk import my_keyfrom youtube_transcript_api import YouTubeTranscriptApi
Next, we will define variables to help us extract video data from the YouTube API. Here, I specify the ID of my YouTube channel and the API URL, initialize page_token, and create a list for storing video data.
# define channel ID
channel_id = 'UCa9gErQ9AE5jT2DZLjXBIdA'# define url for API
url = 'https://www.googleapis.com/youtube/v3/search'
# initialize page token
page_token = None
# intialize list to store video data
video_record_list = []
The next chunk of code might be scary, so I will explain what’s happening first. We will perform GET requests to YouTube’s search API. This is just like searching for videos on YouTube, but instead of using the UI, we will perform searches programmatically.
Since search results are limited to 50 per page, we need to recursively perform searches to return every video that matches the search criteria. Here’s what that looks like in Python code.
# extract video data across multiple search result pageswhile page_token != 0:
# define parameters for API call
params = {'key': my_key, 'channelId': channel_id,
'part': ["snippet","id"], 'order': "date",
'maxResults':50, 'pageToken': page_token}
# make get request
response = requests.get(url, params=params)
# append video data from page results to list
video_record_list += getVideoRecords(response)
try:
# grab next page token
page_token = json.loads(response.text)['nextPageToken']
except:
# if no next page token kill while loop
page_token = 0
getVideoRecords() is a user-defined function that extracts the relevant information from an API response.
# extract video data from single search result pagedef getVideoRecords(response: requests.models.Response) -> list:
"""
Function to extract YouTube video data from GET request response
"""
# initialize list to store video data from page results
video_record_list = []
for raw_item in json.loads(response.text)['items']:
# only execute for youtube videos
if raw_item['id']['kind'] != "youtube#video":
continue
# extract relevant data
video_record = {}
video_record['video_id'] = raw_item['id']['videoId']
video_record['datetime'] = raw_item['snippet']['publishedAt']
video_record['title'] = raw_item['snippet']['title']
# append record to list
video_record_list.append(video_record)
return video_record_list
Now that we have information about all my YouTube videos let’s extract the automatically generated captions. To make the video IDs easier to access, I will store the video data in a Polars dataframe.
# store data in polars dataframe
df = pl.DataFrame(video_record_list)
print(df.head())
To pull the video captions, I will use the youtube_transcript_api Python library. I will loop through each video ID in the dataframe and extract the associated transcript.
# intialize list to store video captions
transcript_text_list = []# loop through each row of dataframe
for i in range(len(df)):
# try to extract captions
try:
# get transcript
transcript = YouTubeTranscriptApi.get_transcript(df['video_id'][i])
# extract text transcript
transcript_text = extract_text(transcript)
# if not captions available set as n/a
except:
transcript_text = "n/a"
# append transcript text to list
transcript_text_list.append(transcript_text)
Again, I use a user-defined function called extract_text() to extract the necessary information from the API.
def extract_text(transcript: list) -> str:
"""
Function to extract text from transcript dictionary
"""text_list = [transcript[i]['text'] for i in range(len(transcript))]
return ' '.join(text_list)
Then we can add the transcripts for each video to the dataframe.
# add transcripts to dataframe
df = df.with_columns(pl.Series(name="transcript", values=transcript_text_list))
print(df.head())
With the data extracted, we can transform it so it is ready for the downstream use case. This requires some exploratory data analysis (EDA).
Handing duplicates
A good starting point for EDA is to examine the number of unique rows and elements in each column. Here, we expected each row to be uniquely identified by the video_id. Additionally, each column should not have repeating elements except for videos for which no transcript was available, which we set as “n/a”.
Here’s some code to probe that information. We can see from the output the data match our expectations.
# shape + unique values
print("shape:", df.shape)
print("n unique rows:", df.n_unique())
for j in range(df.shape[1]):
print("n unique elements (" + df.columns[j] + "):", df[:,j].n_unique())### output
# shape: (84, 4)
# n unique rows: 84
# n unique elements (video_id): 84
# n unique elements (datetime): 84
# n unique elements (title): 84
# n unique elements (transcript): 82
Check dtypes
Next, we can examine the data types of each column. In the image above, we saw that all columns are strings.
While this is appropriate for the video_id, title, and transcript, this is not a good choice for the datetime column. We can change this type in the following way.
# change datetime to Datetime dtype
df = df.with_columns(pl.col('datetime').cast(pl.Datetime))
print(df.head())
Handling special characters
Since we are working with text data, it’s important to look out for special character strings. This requires a bit of manual skimming of the text, but after a few minutes, I found 2 special cases: ’ → ' and & → &
In the code below, I replaced these strings with the appropriate characters and changed “sha” to “Shaw”.
# list all special strings and their replacements
special_strings = [''', '&', 'sha ']
special_string_replacements = ["'", "&", "Shaw "]# replace each special string appearing in title and transcript columns
for i in range(len(special_strings)):
df = df.with_columns(df['title'].str.replace(special_strings[i],
special_string_replacements[i]).alias('title'))
df = df.with_columns(df['transcript'].str.replace(special_strings[i],
special_string_replacements[i]).alias('transcript'))
Since the dataset here is very small (84 rows and 4 columns, ~900k characters), we can store the data directly in the project directory. This can be done in one line of code using the write_parquet() method in Polars. The final file size is 341 KB.
# write data to file
df.write_parquet('data/video-transcripts.parquet')
Here, we discussed the basics of building data pipelines in the context of Full Stack Data Science and walked through a concrete example using real-world data.
In the next article of this series, we will continue going down the data science tech stack and discuss how we can use this data pipeline to develop a semantic search system for my YouTube videos.
More in this series 👇