From 53433ad2ed4388850c75e90abbec3901381cca54 Mon Sep 17 00:00:00 2001 From: Luke Arran Date: Fri, 9 Apr 2021 22:05:46 +0100 Subject: [PATCH] refactoring, and improved comments --- README.md | 6 +- TimeToTrackt.py | 270 +++++++++++++++++++++++++++++++++--------------- 2 files changed, 190 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index e85be30..94bbc59 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,14 @@ # TV Time to Trakt - Import Script ![](https://loch.digital/image_for_external_apps/4342799-01.png) -A Python script to import TV Time tracked episode data into Trakt.TV - using data provided by Whip Media Company via a GDPR request. +A Python script to import TV Time tracked episode data into Trakt.TV - using data export provided by TV Time through a GDPR request. # Issues They'll be a few! This was quickly put together within a few hours or so for personal usage. If you come across anything then let me know in the 'Issue' section, and I'll provide support where possible. # Notes 1. The script is using limited data provided from a GDPR request - so the accuracy isn't 100%. But you will be prompted to manually pick the Trakt show, when it can't be determined automatically. -2. A delay of 5 seconds is added between each episode to ensure fair use of Trakt's API server - especially with my import of 6,500 episodes. You should adjust this for your own import, but make sure it's at least 1 second to remain within the rate limit. +2. A delay of 5 seconds is added between each episode to ensure fair use of Trakt's API server. You should adjust this for your own import, but make sure it's at least 1 second to remain within the rate limit. 3. Episodes which have been processed will be saved to a TinyDB file `localStorage.json` - when you restart the script, the program will skip those episodes which have been marked 'imported'. # Setup @@ -46,7 +46,7 @@ Create a new file named `config.json` in the same directory of `TimeToTrakt.py`, } ``` -Then, execute the program using `./python3 TimeToTrakt.py` - make sure to pop back during a long import to provide the correct Trakt TV Show selections. +Once the config is in place, execute the program using `python TimeToTrakt.py`. The process isn't 100% automated - you will need to pop back, especially with large imports, to check if the script requires a manual user input. ##### Credit City vector created by freepik - www.freepik.com \ No newline at end of file diff --git a/TimeToTrackt.py b/TimeToTrackt.py index 010dfc0..b0371f2 100644 --- a/TimeToTrackt.py +++ b/TimeToTrackt.py @@ -28,12 +28,6 @@ class Expando(object): pass -def initTraktAuth(): - # Set the method of authentication - trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH - return init(getConfiguration().TRAKT_USERNAME, store=True, client_id=getConfiguration().CLIENT_ID, client_secret=getConfiguration().CLIENT_SECRET) - - def getConfiguration(): configEx = Expando() @@ -45,90 +39,126 @@ def getConfiguration(): configEx.CLIENT_SECRET = data["CLIENT_SECRET"] configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"] - return configEx + CONFIG_SINGLETON = configEx + + return CONFIG_SINGLETON + + +config = getConfiguration() + +# Return the path to the CSV file contain the watched episode data from TV Time + + +def getWatchedShowsPath(): + return config.GDPR_WORKSPACE_PATH + "/seen_episode.csv" + + +def initTraktAuth(): + # Set the method of authentication + trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH + return init(config.TRAKT_USERNAME, store=True, client_id=config.CLIENT_ID, client_secret=config.CLIENT_SECRET) + +# With a given title, check if it contains a year (e.g Doctor Who (2005)) +# and then return this value, with the title and year removed to improve +# the accuracy of Trakt results. def getYearFromTitle(title): ex = Expando() try: - # Get the year + # Use a regex expression to get the value within the brackets e.g The Americans (2017) yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title) yearValue = yearSearch.group(1) - # Get the value outside the title + # Then, get the title without the year value included titleValue = title.split('(')[0].strip() - + # Put this together into an object ex.titleWithoutYear = titleValue ex.yearValue = int(yearValue) return ex except: + # If the above failed, then the title doesn't include a year + # so return the object as is. ex.titleWithoutYear = title ex.yearValue = -1 return ex +# Shows in TV Time are often different to Trakt.TV - in order to improve results and automation, +# calculate how many words are in the title, and return true if more than 50% of the title is a match, +# It seems to improve automation, and reduce manual selection.... + def checkTitleNameMatch(tvTimeTitle, traktTitle): - # If a name is simply a 1:1 match, then return true + # If the name is a complete match, then don't bother comparing them! if tvTimeTitle == traktTitle: return True # Split the TvTime title tvTimeTitleSplit = tvTimeTitle.split() - # Get all the words which were in the title + # Create an array of words which are found in the Trakt title wordsMatched = [] - # Go through each word of the title, and confirm if the is a match + # Go through each word of the TV Time title, and check if it's in the Trakt title for word in tvTimeTitleSplit: if word in traktTitle: wordsMatched.append(word) - # Calculate the accuracy of the match + # Then calculate what percentage of words matched quotient = len(wordsMatched) / len(traktTitle.split()) percentage = quotient * 100 - # If the title contains more than 50% of the words, then bring it up for use + # If more than 50% of words in the TV Time title exist in the Trakt title, + # then return the title as a possibility to use return percentage > 50 +# Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show +# in Trakt.TV either by automation, or asking the user to confirm. + def getShowByName(name, seasonNo, episodeNo): - # Get the 'year' from the title - if one is present + # Parse the TV Show's name for year, if one is present in the string titleObj = getYearFromTitle(name) - # Title includes a year value, which helps with ensuring accuracy of pick + # Create a boolean to indicate if the title contains a year, + # this is used later on to improve the accuracy of picking + # from search results doesTitleIncludeYear = titleObj.yearValue != -1 - # If the title included the year, then replace the name value - # with the string without the year - which helps with ensuring - # Trakt search accuracy + # If the title contains a year, then replace the local variable with the stripped version if doesTitleIncludeYear: name = titleObj.titleWithoutYear - # Search for a show with the name + # Request the Trakt API for search results, using the name tvSearch = TVShow.search(name) + # Create an array of shows which have been matched showsWithSameName = [] - # Check if the search returned more than 1 result with the same name + # Go through each result from the search for show in tvSearch: + # Check if the title is a match, based on our conditions (e.g over 50% of words match) if checkTitleNameMatch(name, show.title): - # If the TV Time title included a year, then only add results with matching broadcast year + # If the title included the year of broadcast, then we can be more picky in the results + # to look for a show with a broadcast year that matches if doesTitleIncludeYear == True: - # If the show title is a 1:1 match, with the year, then don't bother with checking the rest - - # since this will be a complete match + # If the show title is a 1:1 match, with the same broadcast year, then bingo! if (name == show.title) and (show.year == titleObj.yearValue): + # Clear previous results, and only use this one showsWithSameName = [] showsWithSameName.append(show) break - # Otherwise, check if the year is a match + # Otherwise, only add the show if the broadcast year matches if show.year == titleObj.yearValue: showsWithSameName.append(show) - # Otherwise, just add all options + # If the program doesn't have the broadcast year, then add all the results else: showsWithSameName.append(show) - # Filter down the results further to results containing a 1:1 match on title + # Sweep through the results once more for 1:1 title name matches, + # then if the list contains one entry with a 1:1 match, then clear the array + # and only use this one! completeMatchNames = [] for nameFromSearch in showsWithSameName: if nameFromSearch.title == name: @@ -137,60 +167,78 @@ def getShowByName(name, seasonNo, episodeNo): if (len(completeMatchNames) == 1): showsWithSameName = completeMatchNames - # If the search contains more than one result with the same name, then confirm with user + # If the search contains multiple results, then we need to confirm with the user which show + # the script should use, or access the local database to see if the user has already provided + # a manual selection if len(showsWithSameName) > 1: - # Check if the user has made a selection already + # Query the local database for existing selection userMatchedQuery = Query() queryResult = userMatchedShowsTable.search( userMatchedQuery.ShowName == name) - # If the user has already made a selection for the show, then use the existing selection + # If the local database already contains an entry for a manual selection + # then don't bother prompting the user to select it again! if len(queryResult) == 1: - # Get the first row + # Get the first result from the query firstMatch = queryResult[0] + # Get the value contains the selection index firstMatchSelectedIndex = int(firstMatch.get('UserSelectedIndex')) + # Check if the user previously requested to skip the show skipShow = firstMatch.get('SkipShow') - + # If the user did not skip, but provided an index selection, get the + # matching show if skipShow == False: return showsWithSameName[firstMatchSelectedIndex] + # Otherwise, return None, which will trigger the script to skip + # and move onto the next show else: return None - # Otherwise, ask the user which show they want to match with + # If the user has not provided a manual selection already in the process + # then prompt the user to make a selection else: - # Ask the user to pick print( - f"MESSAGE: The TV Time Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.") + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.") + # Output each show for manual selection for idx, item in enumerate(showsWithSameName): + # Display the show's title, broadcast year, amount of seasons and a link to the Trakt page. + # This will provide the user with enough information to make a selection. print( f" ({idx}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}") while(True): try: + # Get the user's selection, either a numerical input, or a string 'SKIP' value indexSelected = (input( f"Please make a selection from above (or enter SKIP):")) - # If the input was not skip, then validate the selection before ending loop if indexSelected != 'SKIP': + # Since the value isn't 'skip', check that the result is numerical int(indexSelected) + # Exit the selection loop break - # Otherwise, exit with SKIP + # Otherwise, exit the loop else: break + # Still allow the user to provide the exit input, and kill the program except KeyboardInterrupt: sys.exit("Cancel requested...") + # Otherwise, the user has entered an invalid value, warn the user to try again except: print( f"Sorry! Please select a value between 0 to {len(showsWithSameName)}") - # If the user decides to skip the selection, return None + # If the user entered 'SKIP', then exit from the loop with no selection, which + # will trigger the program to move onto the next episode if (indexSelected == 'SKIP'): + # Record that the user has skipped the TV Show for import, so that + # manual input isn't required everytime userMatchedShowsTable.insert( {'ShowName': name, 'UserSelectedIndex': 0, 'SkipShow': True}) return None - # Otherwise, return the selected show + # Otherwise, return the selection which the user made from the list else: selectedShow = showsWithSameName[int(indexSelected)] @@ -200,135 +248,191 @@ def getShowByName(name, seasonNo, episodeNo): return selectedShow else: + # If the search returned only one result, then awesome! + # Return the show, so the import automation can continue. return showsWithSameName[0] -# Confirm if the season has a "special" season starting at 0, if not, then subtract the seasonNo by 1 +# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then +# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes +# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since +# this will match TV Time's existing value. def parseSeasonNo(seasonNo, traktShowObj): + # Parse the season number into a numerical value seasonNo = int(seasonNo) - # Get the first season number in the array + # Then get the Season Number from the first item in the array firstSeasonNo = traktShowObj.seasons[0].number - # If the season number is 0, then the show contains a "special" season + # If the season number is 0, then the Trakt show contains a "special" season if firstSeasonNo == 0: - # Return the Season Number, as is + # No need to modify the value, as the TV Time value will match Trakt return seasonNo + # Otherwise, if the Trakt seasons start with no specials, then return the seasonNo, + # but subtracted by one (e.g Season 1 in TV Time, will be 0) else: - # Otherwise, if the seasons start from 0, without any specials, then return the seasonNo, - # but subtracted by one (unless it is a special season in TV Time) + # Only subtract is the TV Time season number is greater than 0. if seasonNo != 0: return seasonNo - 1 + # Otherwise, the TV Time season is a special! Then you don't need to change the starting position else: return seasonNo -def getWatchedShowsPath(): - return getConfiguration().GDPR_WORKSPACE_PATH + "/seen_episode.csv" - - def processWatchedShows(): - # Keep a count of rows processed etc + # Total amount of rows which have been processed in the CSV file rowsCount = 0 + # Total amount of rows in the CSV file rowsTotal = 0 + # Total amount of errors which have occurred in one streak errorStreak = 0 - # Quickly sweep through the file to get the row count + + # Get the total amount of rows in the CSV file, + # which is helpful for keeping track of progress. + # However, if you have a VERY large CSV file (e.g above 100,000 rows) + # then it might be a good idea to remove this due to the performance + # overhead. with open(getWatchedShowsPath()) as f: rowsTotal = sum(1 for line in f) + # Open the CSV file within the GDPR exported data with open(getWatchedShowsPath(), newline='') as csvfile: + # Create the CSV reader, which will break up the fields using the delimiter ',' showsReader = csv.reader(csvfile, delimiter=',') + # Loop through each line/record of the CSV file for row in showsReader: + # Increment the row counter to keep track of progress completing the + # records during the import process. rowsCount += 1 - # Get the values from the CSV record + # Get the name of the TV show tvShowName = row[4] - # Skip first row + # Ignore the header row if tvShowName != "tv_show_name": + # Get the TV Time Episode Id tvShowEpisodeId = row[1] + # Get the TV Time Season Number tvShowSeasonNo = row[7] + # Get the TV Time Episode Number tvShowEpisodeNo = row[8] + # Get the date which the show was marked 'watched' in TV Time tvShowDateWatched = row[5] + # Parse the watched date value into a Python type tvShowDateWatchedConverted = datetime.strptime( tvShowDateWatched, '%Y-%m-%d %H:%M:%S') - # Query the database to check if it's already been processed + # Query the local database for previous entries indicating that + # the episode has already been imported in the past. Which will + # ease pressure on TV Time's API server during a retry of the import + # process, and just save time overall without needing to create network requests episodeCompletedQuery = Query() queryResult = syncedEpisodesTable.search( episodeCompletedQuery.episodeId == tvShowEpisodeId) + # If the query returned no results, then continue to import it into Trakt if len(queryResult) == 0: + # Create a repeating loop, which will break on success, but repeats on failures while True: + # If more than 10 errors occurred in one streak, whilst trying to import the episode + # then give up, and move onto the next episode, but warn the user. if (errorStreak > 10): print( - f"An error occurred 10 times in a row... skipping episode...") + f"WARNING: An error occurred 10 times in a row... skipping episode...") break - try: - # Sleep for a second between each process, before adding the next watched episode, - # this ensures that the program is within the rate limit of 1 per second. + # Sleep for a second between each process, before going onto the next watched episode. + # This is required to remain within the API rate limit, and use the API server fairly. + # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) - # Get the Trakt version of the show + # Search Trakt for the TV show matching TV Time's title value traktShowObj = getShowByName( tvShowName, tvShowSeasonNo, tvShowEpisodeNo) - # Skip the episode, if no show was selected + # If the method returned 'None', then this is an indication to skip the episode, and + # move onto the next one if traktShowObj == None: break - # Output to console - print(f"({rowsCount}/{rowsTotal}) Processing Show '" + tvShowName + - "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo) - # Add the show to the user's library + # Show the progress of the import on-screen + print( + f"({rowsCount}/{rowsTotal}) Processing Show {tvShowName} on Season {tvShowSeasonNo} - Episode {tvShowEpisodeNo}") + # Add the show to the user's library for tracking traktShowObj.add_to_library() - # Get the season + # Get the season from the Trakt API season = traktShowObj.seasons[parseSeasonNo( tvShowSeasonNo, traktShowObj)] + # Get the episode from the season episode = season.episodes[int(tvShowEpisodeNo) - 1] - # Mark the episode as watched + # Mark the episode as watched! episode.mark_as_seen(tvShowDateWatchedConverted) - # Add the show to the tracker as completed + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated syncedEpisodesTable.insert( {'episodeId': tvShowEpisodeId}) - # Once the episode has been marked watched, then break out of the loop + # Clear the error streak on completing the method without errors errorStreak = 0 break + # Catch errors which occur because of an incorrect array index. This occurs when + # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. + # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - print("Oops! '" + tvShowName + - "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + " is not within range of show array!") + print( + f"({rowsCount}/{rowsTotal}) WARNING: {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (season/episode index) in Trakt!") break + # Catch any errors which are raised because a show could not be found in Trakt except trakt.errors.NotFoundException: - print("Show '" + tvShowName + - "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + " does not exist!") + print( + f"({rowsCount}/{rowsTotal}) WARNING: {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!") break + # Catch errors because of the program breaching the Trakt API rate limit except trakt.errors.RateLimitException: print( - "Oops! You have hit the rate limit! The program will now pause for 1 minute...") + "WARNING: The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between " + + "episdoes via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before " + + "trying again.") time.sleep(60) + + # Mark the exception in the error streak errorStreak += 1 + # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON except json.decoder.JSONDecodeError: print( - f"Oh, oh! A JSON Decode error occurred - maybe a dodgy response from the server? Waiting 60 seconds before resuming") + f"({rowsCount}/{rowsTotal}) WARNING: A JSON decode error occuring whilst processing {tvShowName} " + + f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced " + + "a HTML document instead of JSON. The script will wait 60 seconds before trying again.") + + # Wait 60 seconds time.sleep(60) + + # Mark the exception in the error streak errorStreak += 1 + # Catch a CTRL + C keyboard input, and exits the program except KeyboardInterrupt: sys.exit("Cancel requested...") + # Skip the episode else: - print(f"({rowsCount}/{rowsTotal}) Skipping '" + tvShowName + - "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + ". It's already been imported!") + print( + f"({rowsCount}/{rowsTotal}) Skipping '{tvShowName}' Season {tvShowSeasonNo} Episode {tvShowEpisodeNo}. It's already been imported.") def start(): + # Create the initial authentication with Trakt, before starting the process if initTraktAuth(): - # Start processing the TV shows + # Invoke the method which will import episodes which have been watched + # from TV Time into Trakt processWatchedShows() else: - print("Unable to authenticate with Trakt!") + print("ERROR: Unable to complete authentication to Trakt - please try again.") if __name__ == "__main__": - if os.path.isdir(getConfiguration().GDPR_WORKSPACE_PATH): - start() + # Check that the user has created the config file + if os.path.exists("config.json"): + # Check that the user has provided the GDPR path + if os.path.isdir(config.GDPR_WORKSPACE_PATH): + start() + else: + print("Oops! The TV Time GDPR folder '" + config.GDPR_WORKSPACE_PATH + + "' does not exist on the local system. Please check it, and try again.") else: - print("Oops! The TV Time GDPR folder '" + getConfiguration().GDPR_WORKSPACE_PATH + - "' does not exist on the local system. Please check it, and try again.") + print(f"ERROR: The 'config.json' file cannot be found - have you created it yet?")