Implement general Searcher to search and handle search results

This commit is contained in:
Markus Nyman 2023-01-15 04:14:52 +02:00
parent 8e92ce7934
commit 8a5453c8b2

View file

@ -8,10 +8,11 @@ import sys
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable, TypeVar, Union, List, Literal
from typing import Optional, Callable, TypeVar, Union
import trakt.core
from tinydb import Query, TinyDB
from tinydb.table import Table
from trakt import init
from trakt.movies import Movie
from trakt.tv import TVShow
@ -97,6 +98,12 @@ def init_trakt_auth() -> bool:
# and then return this value, with the title and year removed to improve
# the accuracy of Trakt results.
TraktTVShow = TypeVar("TraktTVShow")
TraktMovie = TypeVar("TraktMovie")
SearchResult = Union[TraktTVShow, TraktMovie]
@dataclass
class Title:
name: str
@ -104,6 +111,10 @@ class Title:
year: Optional[int]
def __init__(self, title: str):
"""
Parse the title's name for year.
:param title:
"""
try:
# Use a regex expression to get the value within the brackets e.g. The Americans (2017)
year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title)
@ -121,133 +132,99 @@ class Title:
self.without_year = title
self.year = None
def items_with_same_name(self, items: list[SearchResult]) -> list[SearchResult]:
with_same_name = []
def get_year_from_title(title) -> Title:
return Title(title)
for item in items:
if self.matches(item.title):
# If the title included the year of broadcast, then we can be more picky in the results
# to look for an item with a broadcast year that matches
if self.year:
# If the item title is a 1:1 match, with the same broadcast year, then bingo!
if (self.name == item.title) and (item.year == self.year):
# Clear previous results, and only use this one
with_same_name = [item]
break
# Otherwise, only add the item if the broadcast year matches
if item.year == self.year:
with_same_name.append(item)
# If the item doesn't have the broadcast year, then add all the results
else:
with_same_name.append(item)
return with_same_name
def matches(self, other: str) -> bool:
"""
Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
calculate how many words are in the title, and return true if more than 50% of the title is a match,
It seems to improve automation, and reduce manual selection...
"""
# If the name is a complete match, then don't bother comparing them!
if self.name == other:
return True
# Split the TvTime title
tv_time_title_split = self.name.split()
# Create an array of words which are found in the Trakt title
words_matched = []
# Go through each word of the TV Time title, and check if it's in the Trakt title
for word in tv_time_title_split:
if word in other:
words_matched.append(word)
# Then calculate what percentage of words matched
quotient = len(words_matched) / len(other.split())
percentage = quotient * 100
# If more than 50% of words in the TV Time title exist in the Trakt title,
# then return the title as a possibility to use
return percentage > 50
# Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
# calculate how many words are in the title, and return true if more than 50% of the title is a match,
# It seems to improve automation, and reduce manual selection....
class Searcher:
def __init__(self, user_matched_table: Table, trakt_search: Callable[[str], list[SearchResult]],
manual_selection_prompt: Callable[[list[SearchResult]], None]):
self.name = ""
self.items_with_same_name = None
self._user_matched_table = user_matched_table
self.trakt_search = trakt_search
self._manual_selection_prompt = manual_selection_prompt
def search(self, title: Title) -> Optional[SearchResult]:
self.name = title.name
# If the title contains a year, then replace the local variable with the stripped version.
if title.year:
self.name = title.without_year
self.items_with_same_name = title.items_with_same_name(self._search_trakt(self.name))
def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool:
# If the name is a complete match, then don't bother comparing them!
if tv_time_title == trakt_title:
return True
single_result = self._check_single_result()
if single_result:
return single_result
# Split the TvTime title
tv_time_title_split = tv_time_title.split()
# Create an array of words which are found in the Trakt title
words_matched = []
# Go through each word of the TV Time title, and check if it's in the Trakt title
for word in tv_time_title_split:
if word in trakt_title:
words_matched.append(word)
# Then calculate what percentage of words matched
quotient = len(words_matched) / len(trakt_title.split())
percentage = quotient * 100
# If more than 50% of words in the TV Time title exist in the Trakt title,
# then return the title as a possibility to use
return percentage > 50
# Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show
# in Trakt.TV either by automation, or asking the user to confirm.
TraktTVShow = TypeVar("TraktTVShow")
TraktMovie = TypeVar("TraktMovie")
SearchResult = Union[TraktTVShow, TraktMovie]
def get_items_with_same_name(title: Title, items: List[SearchResult]) -> List[SearchResult]:
shows_with_same_name = []
for item in items:
if check_title_name_match(title.name, item.title):
# If the title included the year of broadcast, then we can be more picky in the results
# to look for an item with a broadcast year that matches
if title.year:
# If the item title is a 1:1 match, with the same broadcast year, then bingo!
if (title.name == item.title) and (item.year == title.year):
# Clear previous results, and only use this one
shows_with_same_name = [item]
break
# Otherwise, only add the item if the broadcast year matches
if item.year == title.year:
shows_with_same_name.append(item)
# If the item doesn't have the broadcast year, then add all the results
else:
shows_with_same_name.append(item)
return shows_with_same_name
@dataclass
class GetItemInput:
name: str
@dataclass
class GetTVShowInput(GetItemInput):
season_number: str
episode_number: str
@dataclass
class GetMovieInput(GetItemInput):
pass
def get_item(get_item_input: GetItemInput) -> Optional[SearchResult]:
if isinstance(get_item_input, GetTVShowInput):
return get_show_by_name(get_item_input.name, get_item_input.season_number, get_item_input.episode_number)
elif isinstance(get_item_input, GetMovieInput):
return get_movie_by_name(get_item_input.name)
else:
logging.warning("Invalid get item input type")
return None
def find_single_result(name: str, items_with_same_name: List[SearchResult]) -> Optional[SearchResult]:
complete_match_names = [name_from_search for name_from_search in items_with_same_name if
name_from_search.title == name]
if len(complete_match_names) == 1:
return complete_match_names[0]
elif len(items_with_same_name) == 1:
return items_with_same_name[0]
elif len(items_with_same_name) < 1:
return None
def get_show_by_name(name: str, season_number: str, episode_number: str):
# Parse the TV Show's name for year, if one is present in the string
title = get_year_from_title(name)
# If the title contains a year, then replace the local variable with the stripped version
if title.year:
name = title.without_year
shows_with_same_name = get_items_with_same_name(title, TVShow.search(name))
single_result = find_single_result(name, shows_with_same_name)
if single_result:
return single_result
else:
# If the search contains multiple results, then we need to confirm with the user which show
# the script should use, or access the local database to see if the user has already provided
# a manual selection
# Query the local database for existing selection
user_matched_query = Query()
query_result = userMatchedShowsTable.search(user_matched_query.ShowName == name)
should_return, query_result = self._search_local()
if should_return:
return query_result
# If the user has not provided a manual selection already in the process
# then prompt the user to make a selection
else:
self._handle_multiple_manually()
def _search_trakt(self, name: str) -> list[SearchResult]:
return self.trakt_search(name)
def _search_local(self) -> tuple[bool, SearchResult]:
user_matched_query = Query()
query_result = self._user_matched_table.search(user_matched_query.Name == self.name)
# If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again!
if len(query_result) == 1:
@ -256,88 +233,128 @@ def get_show_by_name(name: str, season_number: str, episode_number: str):
# Get the value contains the selection index
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
# Check if the user previously requested to skip the show
skip_show = first_match.get("SkipShow")
skip_show = first_match.get("Skip")
# If the user did not skip, but provided an index selection, get the
# matching show
if not skip_show:
return shows_with_same_name[first_match_selected_index]
# Otherwise, return None, which will trigger the script to skip
# and move onto the next show
return True, self.items_with_same_name[first_match_selected_index]
else:
return None
# If the user has not provided a manual selection already in the process
# then prompt the user to make a selection
# Otherwise, return None, which will trigger the script to skip
# and move onto the next show
return True, None
else:
print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {season_number},"
f"Episode {episode_number}) has {len(shows_with_same_name)} matching Trakt shows with the same name.\a "
return False, None
def _handle_multiple_manually(self) -> Optional[SearchResult]:
self._manual_selection_prompt(self.items_with_same_name)
while True:
try:
# Get the user's selection, either a numerical input, or a string 'SKIP' value
index_selected = input(
"Please make a selection from above (or enter SKIP):"
)
# Exit the loop
if index_selected == "SKIP":
break
# Since the value isn't 'skip', check that the result is numerical
index_selected = int(index_selected) - 1
break
# Still allow the user to provide the exit input, and kill the program
except KeyboardInterrupt:
sys.exit("Cancel requested...")
# Otherwise, the user has entered an invalid value, warn the user to try again
except Exception:
logging.error(
f"Sorry! Please select a value between 0 to {len(self.items_with_same_name)}"
)
# If the user entered 'SKIP', then exit from the loop with no selection, which
# will trigger the program to move onto the next episode
if index_selected == "SKIP":
# Record that the user has skipped the TV Show for import, so that
# manual input isn't required everytime
self._user_matched_table.insert(
{"Name": self.name, "UserSelectedIndex": 0, "Skip": True}
)
# Output each show for manual selection
for idx, item in enumerate(shows_with_same_name):
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection.
print(
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} "
f"Season(s) - More Info: https://trakt.tv/{item.ext}"
)
return None
# Otherwise, return the selection which the user made from the list
else:
selected_show = self.items_with_same_name[int(index_selected)]
while True:
try:
# Get the user's selection, either a numerical input, or a string 'SKIP' value
index_selected = input(
"Please make a selection from above (or enter SKIP):"
)
self._user_matched_table.insert(
{
"Name": self.name,
"UserSelectedIndex": index_selected,
"Skip": False,
}
)
# Exit the loop
if index_selected == "SKIP":
break
return selected_show
# Since the value isn't 'skip', check that the result is numerical
index_selected = int(index_selected) - 1
# Exit the selection loop
break
# Still allow the user to provide the exit input, and kill the program
except KeyboardInterrupt:
sys.exit("Cancel requested...")
# Otherwise, the user has entered an invalid value, warn the user to try again
except Exception:
logging.error(
f"Sorry! Please select a value between 0 to {len(shows_with_same_name)}"
)
# If the user entered 'SKIP', then exit from the loop with no selection, which
# will trigger the program to move onto the next episode
if index_selected == "SKIP":
# Record that the user has skipped the TV Show for import, so that
# manual input isn't required everytime
userMatchedShowsTable.insert(
{"ShowName": name, "UserSelectedIndex": 0, "SkipShow": True}
)
return None
# Otherwise, return the selection which the user made from the list
else:
selected_show = shows_with_same_name[int(index_selected)]
userMatchedShowsTable.insert(
{
"ShowName": name,
"UserSelectedIndex": index_selected,
"SkipShow": False,
}
)
return selected_show
def _check_single_result(self) -> Optional[SearchResult]:
complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if
name_from_search.title == self.name]
if len(complete_match_names) == 1:
return complete_match_names[0]
elif len(self.items_with_same_name) == 1:
return self.items_with_same_name[0]
elif len(self.items_with_same_name) < 1:
return None
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
# this will match TV Time's existing value.
class TVShowSearcher(Searcher):
def __init__(self, season_number: str, episode_number: str):
super().__init__(userMatchedShowsTable, TVShow.search, self._print_manual_selection)
self.season_number = season_number
self.episode_number = episode_number
def _print_manual_selection(self, items_with_same_name: list[SearchResult]) -> None:
print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}' (Season {self.season_number},"
f"Episode {self.episode_number}) has {len(items_with_same_name)} matching Trakt shows with the same name.\a "
)
# Output each show for manual selection
for idx, item in enumerate(items_with_same_name):
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection.
print(
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} "
f"Season(s) - More Info: https://trakt.tv/{item.ext}"
)
@dataclass
class MovieSearcher(Searcher):
def __init__(self):
super().__init__(userMatchedMoviesTable, Movie.search, self._print_manual_selection)
def _print_manual_selection(self, items_with_same_name: list[SearchResult]) -> None:
print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{self.name}' has {len(items_with_same_name)} "
f"matching Trakt movies with the same name.\a"
)
# Output each movie for manual selection
for idx, item in enumerate(items_with_same_name):
# Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection.
print(
f" ({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}"
)
def parse_season_number(season_number, trakt_show_obj):
"""
Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
this will match TV Time's existing value.
"""
# Parse the season number into a numerical value
season_number = int(season_number)
@ -414,11 +431,9 @@ def process_watched_shows() -> None:
# Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
# Search Trakt for the TV show matching TV Time's title value
trakt_show = get_item(
GetTVShowInput(
tv_show_name, tv_show_season_number, tv_show_episode_number
)
)
trakt_show = TVShowSearcher(tv_show_season_number,
tv_show_episode_number).search(Title(tv_show_name))
# If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one
if not trakt_show:
@ -496,114 +511,6 @@ def process_watched_shows() -> None:
)
# Using TV Time data (Name of Movie) - find the corresponding movie
# in Trakt.TV either by automation, or asking the user to confirm.
def get_movie_by_name(name: str) -> Optional[TraktMovie]:
# Parse the Movie's name for year, if one is present in the string
title = get_year_from_title(name)
# If the title contains a year, then replace the local variable with the stripped version
if title.year:
name = title.without_year
movies_with_same_name = get_items_with_same_name(title, Movie.search(name))
single_result = find_single_result(name, movies_with_same_name)
if single_result:
return single_result
else:
# If the search contains multiple results, then we need to confirm with the user which movie
# the script should use, or access the local database to see if the user has already provided
# a manual selection
# Query the local database for existing selection
user_matched_query = Query()
query_result = userMatchedMoviesTable.search(user_matched_query.MovieName == name)
# If the local database already contains an entry for a manual selection
# then don't bother prompting the user to select it again!
if len(query_result) == 1:
# Get the first result from the query
first_match = query_result[0]
# Get the value contains the selection index
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
# Check if the user previously requested to skip the movie
skip_movie = first_match.get("SkipMovie")
# If the user did not skip, but provided an index selection, get the
# matching movie
if not skip_movie:
return movies_with_same_name[first_match_selected_index]
# Otherwise, return None, which will trigger the script to skip
# and move onto the next movie
else:
return None
# If the user has not provided a manual selection already in the process
# then prompt the user to make a selection
else:
print(
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(movies_with_same_name)} "
f"matching Trakt movies with the same name.\a"
)
# Output each movie for manual selection
for idx, item in enumerate(movies_with_same_name):
# Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page.
# This will provide the user with enough information to make a selection.
print(
f" ({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}"
)
while True:
try:
# Get the user's selection, either a numerical input, or a string 'SKIP' value
index_selected = input(
"Please make a selection from above (or enter SKIP):"
)
if index_selected != "SKIP":
# Since the value isn't 'skip', check that the result is numerical
index_selected = int(index_selected) - 1
# Exit the selection loop
break
# Otherwise, exit the loop
else:
break
# Still allow the user to provide the exit input, and kill the program
except KeyboardInterrupt:
sys.exit("Cancel requested...")
# Otherwise, the user has entered an invalid value, warn the user to try again
except Exception:
logging.error(
f"Sorry! Please select a value between 0 to {len(movies_with_same_name)}"
)
# If the user entered 'SKIP', then exit from the loop with no selection, which
# will trigger the program to move onto the next episode
if index_selected == "SKIP":
# Record that the user has skipped the Movie for import, so that
# manual input isn't required everytime
userMatchedMoviesTable.insert(
{"MovieName": name, "UserSelectedIndex": 0, "SkipMovie": True}
)
return None
# Otherwise, return the selection which the user made from the list
else:
selected_movie = movies_with_same_name[int(index_selected)]
userMatchedMoviesTable.insert(
{
"MovieName": name,
"UserSelectedIndex": index_selected,
"SkipMovie": False,
}
)
return selected_movie
def process_movies():
# Total amount of rows which have been processed in the CSV file
# Total amount of rows in the CSV file
@ -676,7 +583,7 @@ def process_movies():
# Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
# Search Trakt for the Movie matching TV Time's title value
trakt_movie_obj = get_item(GetMovieInput(movie_name))
trakt_movie_obj = MovieSearcher().search(Title(movie_name))
# If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one
if trakt_movie_obj is None: