Fabric Madness part 2Image by author and ChatGPT. “Design an illustration, focusing on a basketball player in action, this time the theme is on using pyspark to generate features for machine leaning models in a graphic novel style” prompt. ChatGPT, 4, OpenAI, 4 April. 2024. https://chat.openai.com.A Huge thanks to Martim Chaves who co-authored this post and developed the example scripts.In our previous post we took a high level view of how to train a machine learning model in Microsoft Fabric. In this post we wanted to dive deeper into the process of feature engineering.Feature engineering is a crucial part of the development lifecycle for any Machine Learning (ML) systems. It is a step in the development cycle where raw data is processed to better represent its underlying structure and provide additional information that enhance our ML models. Feature engineering is both an art and a science. Even though there are specific steps that we can take to create good features, sometimes, it is only through experimentation that good results are achieved. Good features are crucial in guaranteeing a good system performance.As datasets grow exponentially, traditional feature engineering may struggle with the size of very large datasets. This is where PySpark can help — as it is a scalable and efficient processing platform for massive datasets. A great thing about Fabric is that it makes using PySpark easy!In this post, we’ll be going over:How does PySpark Work?Basics of PySparkFeature Engineering in ActionBy the end of this post, hopefully you’ll feel comfortable carrying out feature engineering with PySpark in Fabric. Let’s get started!Spark is a distributed computing system that allows for the processing of large datasets with speed and efficiency across a cluster of machines. It is built around the concept of a Resilient Distributed Dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. RDDs are the fundamental data structure of Spark, and they allow for the distribution of data across a cluster of machines.PySpark is the Python API for Spark. It allows for the creation of Spark DataFrames, which are similar to Pandas DataFrames, but with the added benefit of being distributed across a cluster of machines. PySpark DataFrames are the core data structure in PySpark, and they allow for the manipulation of large datasets in a distributed manner.At the core of PySpark is the SparkSession object, which is what fundamentally interacts with Spark. This SparkSession is what allows for the creation of DataFrames, and other functionalities. Note that, when running a Notebook in Fabric, a SparkSession is automatically created for you, so you don’t have to worry about that.Having a rough idea of how PySpark works, let’s get to the basics.Although Spark DataFrames may remind us of Pandas DataFrames due to their similarities, the syntax when using PySpark can be a bit different. In this section, we’ll go over some of the basics of PySpark, such as reading data, combining DataFrames, selecting columns, grouping data, joining DataFrames, and using functions.The DataThe data we are looking at is from the 2024 US college basketball tournaments, which was obtained from the on-going March Machine Learning Mania 2024 Kaggle competition, the details of which can be found here, and is licensed under CC BY 4.0 [1]Reading dataAs mentioned in the previous post of this series, the first step is usually to create a Lakehouse and upload some data. Then, when creating a Notebook, we can attach it to the created Lakehouse, and we’ll have access to the data stored there.PySpark Dataframes can read various data formats, such as CSV, JSON, Parquet, and others. Our data is stored in CSV format, so we’ll be using that, like in the following code snippet:# Read women’s dataw_data = (spark.read.option(“header”, True).option(“inferSchema”, True).csv(f”Files/WNCAATourneyDetailedResults.csv”).cache())In this code snippet, we’re reading the detailed results data set of the final women’s basketball college tournament matches. Note that the “header” option being true means that the names of the columns will be derived from the first row of the CSV file. The inferSchema option tells Spark to guess the data types of the columns – otherwise they would all be read as strings. .cache() is used to keep the DataFrame in memory.If you’re coming from Pandas, you may be wondering what the equivalent of df.head() is for PySpark – it’s df.show(5). The default for .show() is the top 20 rows, hence the need to specifically select 5.Combining DataFramesCombining DataFrames can be done in multiple ways. The first we will look at is a union, where the columns are the same for both DataFrames:# Read women’s data…# Read men’s datam_data = (spark.read.option(“header”, True).option(“inferSchema”, True).csv(f”Files/MNCAATourneyDetailedResults.csv”).cache())# Combine (union) the DataFramescombined_results = m_data.unionByName(w_data)Here, unionByName joins the two DataFrames by matching the names of the columns. Since both the women’s and the men’s detailed match results have the same columns, this is a good approach. Alternatively, there’s also union, which combines two DataFrames, matching column positions.Selecting ColumnsSelecting columns from a DataFrame in PySpark can be done using the .select() method. We just have to indicate the name or names of the columns that are relevant as a parameter.Here’s the output for w_scores.show(5):# Selecting a single columnw_scores = w_data.select(“WScore”)# Selecting multiple columnsteamid_w_scores = w_data.select(“WTeamID”, “WScore”)“`Here’s the output for `w_scores.show(5)`:“`+——+|Season|+——+| 2010|| 2010|| 2010|| 2010|| 2010|+——+only showing top 5 rowsThe columns can also be renamed when being selected using the .alias() method:winners = w_data.select(w_data.WTeamID.alias(“TeamID”),w_data.WScore.alias(“Score”))Grouping DataGrouping allows us to carry out certain operations for the groups that exist within the data and is usually combined with a aggregation functions. We can use .groupBy() for this:# Grouping and aggregatingwinners_average_scores = winners.groupBy(“TeamID”).avg(“Score”)In this example, we are grouping by “TeamID”, meaning we’re considering the groups of rows that have a distinct value for “TeamID”. For each of those groups, we’re calculating the average of the “Score”. This way, we get the average score for each team.Here’s the output of winners_average_scores.show(5), showing the average score of each team:+——+—————–+|TeamID| avg(Score)|+——+—————–+| 3125| 68.5|| 3345| 74.2|| 3346|79.66666666666667|| 3376|73.58333333333333|| 3107| 61.0|+——+—————–+Joining DataJoining two DataFrames can be done using the .join() method. Joining is essentially extending the DataFrame by adding the columns of one DataFrame to another.# Joining on Season and TeamIDfinal_df = matches_df.join(stats_df, on=[‘Season’, ‘TeamID’], how=’left’)In this example, both stats_df and matches_df were using Season and TeamID as unique identifiers for each row. Besides Season and TeamID, stats_df has other columns, such as statistics for each team during each season, whereas matches_df has information about the matches, such as date and location. This operation allows us to add those interesting statistics to the matches information!FunctionsThere are several functions that PySpark provides that help us transform DataFrames. You can find the full list here.Here’s an example of a simple function:from pyspark.sql import functions as Fw_data = w_data.withColumn(“HighScore”, F.when(F.col(“Score”) > 80, “Yes”).otherwise(“No”))In the code snippet above, a “HighScore” column is created when the score is higher than 80. For each row in the “Score” column (indicated by the .col() function), the value “Yes” is chosen for the “HighScore” column if the “Score” value is larger than 80, determined by the .when() function. .otherwise(), the value chosen is “No”.Now that we have a basic understanding of PySpark and how it can be used, let’s go over how the regular season statistics features were created. These features were then used as inputs into our machine learning model to try to predict the outcome of the final tournament games.The starting point was a DataFrame, regular_data, that contained match by match statistics for the regular seasons, which is the United States College Basketball Season that happens from November to March each year.Each row in this DataFrame contained the season, the day the match was held, the ID of team 1, the ID of team 2, and other information such as the location of the match. Importantly, it also contained statistics for each team for that specific match, such as “T1_FGM”, meaning the Field Goals Made (FGM) for team 1, or “T2_OR”, meaning the Offensive Rebounds (OR) of team 2.The first step was selecting which columns would be used. These were columns that strictly contained in-game statistics.# Columns that we’ll want to get statistics fromboxscore_cols = [‘T1_FGM’, ‘T1_FGA’, ‘T1_FGM3’, ‘T1_FGA3’, ‘T1_OR’, ‘T1_DR’, ‘T1_Ast’, ‘T1_Stl’, ‘T1_PF’, ‘T2_FGM’, ‘T2_FGA’, ‘T2_FGM3’, ‘T2_FGA3’, ‘T2_OR’, ‘T2_DR’, ‘T2_Ast’, ‘T2_Stl’, ‘T2_PF’]If you’re interested, here’s what each statistic’s code means:FGM: Field Goals MadeFGA: Field Goals AttemptedFGM3: Field Goals Made from the 3-point-lineFGA3: Field Goals Attempted for 3-point-line goalsOR: Offensive Rebounds. A rebounds is when the ball rebounds from the board when a goal is attempted, not getting in the net. If the team that attempted the goal gets possession of the ball, it’s called an “Offensive” rebound. Otherwise, it’s called a “Defensive” Rebound.DR: Defensive ReboundsAst: Assist, a pass that led directly to a goalStl: Steal, when the possession of the ball is stolenPF: Personal Foul, when a player makes a foulFrom there, a dictionary of aggregation expressions was created. Basically, for each column name in the previous list of columns, a function was stored that would calculate the mean of the column, and rename it, by adding a suffix, “mean”.from pyspark.sql import functions as Ffrom pyspark.sql.functions import col # select a columnagg_exprs = {col: F.mean(col).alias(col + ‘mean’) for col in boxscore_cols}Then, the data was grouped by “Season” and “T1_TeamID”, and the aggregation functions of the previously created dictionary were used as the argument for .agg().season_statistics = regular_data.groupBy([“Season”, “T1_TeamID”]).agg(*agg_exprs.values())Note that the grouping was done by season and the ID of team 1 — this means that “T2_FGAmean”, for example, will actually be the mean of the Field Goals Attempted made by the opponents of T1, not necessarily of a specific team. So, we actually need to rename the columns that are something like “T2_FGAmean” to something like “T1_opponent_FGAmean”.# Rename columns for T1for col in boxscore_cols:season_statistics = season_statistics.withColumnRenamed(col + ‘mean’, ‘T1_’ + col[3:] + ‘mean’) if ‘T1_’ in col \else season_statistics.withColumnRenamed(col + ‘mean’, ‘T1_opponent_’ + col[3:] + ‘mean’)At this point, it’s important to mention that the regular_data DataFrame actually has two rows per each match that occurred. This is so that both teams can be “T1” and “T2”, for each match. This little “trick” is what makes these statistics useful.Note that we “only” have the statistics for “T1”. We “need” the statistics for “T2” as well — “need” in quotations because there are no new statistics being calculated. We just need the same data, but with the columns having different names, so that for a match with “T1” and “T2”, we have statistics for both T1 and T2. So, we created a mirror DataFrame, where, instead of “T1…mean” and “T1_opponent_…mean”, we have “T2…mean” and “T2_opponent_…mean”. This is important because, later on, when we’re joining these regular season statistics to tournament matches, we’ll be able to have statistics for both team 1 and team 2.season_statistics_T2 = season_statistics.select(*[F.col(col).alias(col.replace(‘T1_opponent_’, ‘T2_opponent_’).replace(‘T1_’, ‘T2_’)) if col not in [‘Season’] else F.col(col) for col in season_statistics.columns])Now, there are two DataFrames, with season statistics for “both” T1 and T2. Since the final DataFrame will contain the “Season”, the “T1TeamID” and the “T2TeamID”, we can join these newly created features with a join!tourney_df = tourney_df.join(season_statistics, on=[‘Season’, ‘T1_TeamID’], how=’left’)tourney_df = tourney_df.join(season_statistics_T2, on=[‘Season’, ‘T2_TeamID’], how=’left’)Elo RatingsFirst created by Arpad Elo, Elo is a rating system for zero-sum games (games where one player wins and the other loses), like basketball. With the Elo rating system, each team has an Elo rating, a value that generally conveys the team’s quality. At first, every team has the same Elo, and whenever they win, their Elo increases, and when they lose, their Elo decreases. A key characteristic of this system is that this value increases more with a win against a strong opponent than with a win against a weak opponent. Thus, it can be a very useful feature to have!We wanted to capture the Elo rating of a team at the end of the regular season, and use that as feature for the tournament. To do this, we calculated the Elo for each team on a per match basis. To calculate Elo for this feature, we found it more straightforward to use Pandas.Central to Elo is calculating the expected score for each team. It can be described in code like so:# Function to calculate expected scoredef expected_score(ra, rb):# ra = rating (Elo) team A# rb = rating (Elo) team B# Elo functionreturn 1 / (1 + 10 ** ((rb – ra) / 400))Considering a team A and a team B, this function computes the expected score of team A against team B.For each match, we would update the teams’ Elos. Note that the location of the match also played a part — winning at home was considered less impressive than winning away.# Function to update Elo ratings, keeping T1 and T2 terminologydef update_elo(t1_elo, t2_elo, location, T1_Score, T2_Score):expected_t1 = expected_score(t1_elo, t2_elo)expected_t2 = expected_score(t2_elo, t1_elo)actual_t1 = 1 if T1_Score > T2_Score else 0actual_t2 = 1 – actual_t1# Determine K based on game location# The larger the K, the bigger the impact# team1 winning at home (location=1) less impressive than winning away (location = -1)if actual_t1 == 1: # team1 wonif location == 1:k = 20elif location == 0:k = 30else: # location = -1k = 40else: # team2 wonif location == 1:k = 40elif location == 0:k = 30else: # location = -1k = 20new_t1_elo = t1_elo + k * (actual_t1 – expected_t1)new_t2_elo = t2_elo + k * (actual_t2 – expected_t2)return new_t1_elo, new_t2_eloTo apply the Elo rating system, we iterated through each season’s matches, initializing teams with a base rating and updating their ratings match by match. The final Elo available for each team in each season will, hopefully, be a good descriptor of the team’s quality.def calculate_elo_through_seasons(regular_data):# For this feature, using Pandasregular_data = regular_data.toPandas()# Set value of initial eloinitial_elo = 1500# DataFrame to collect final Elo ratingsfinal_elo_list = []for season in sorted(regular_data[‘Season’].unique()):print(f”Season: {season}”)# Initialize elo ratings dictionaryelo_ratings = {}print(f”Processing Season: {season}”)# Get the teams that played in the seasonseason_teams = set(regular_data[regular_data[‘Season’] == season][‘T1_TeamID’]).union(set(regular_data[regular_data[‘Season’] == season][‘T2_TeamID’]))# Initialize season teams’ Elo ratingsfor team in season_teams:if (season, team) not in elo_ratings:elo_ratings[(season, team)] = initial_elo# Update Elo ratings per gameseason_games = regular_data[regular_data[‘Season’] == season]for _, row in season_games.iterrows():t1_elo = elo_ratings[(season, row[‘T1_TeamID’])]t2_elo = elo_ratings[(season, row[‘T2_TeamID’])]new_t1_elo, new_t2_elo = update_elo(t1_elo, t2_elo, row[‘location’], row[‘T1_Score’], row[‘T2_Score’])# Only keep the last season ratingelo_ratings[(season, row[‘T1_TeamID’])] = new_t1_eloelo_ratings[(season, row[‘T2_TeamID’])] = new_t2_elo# Collect final Elo ratings for the seasonfor team in season_teams:final_elo_list.append({‘Season’: season, ‘TeamID’: team, ‘Elo’: elo_ratings[(season, team)]})# Convert list to DataFramefinal_elo_df = pd.DataFrame(final_elo_list)# Separate DataFrames for T1 and T2final_elo_t1_df = final_elo_df.copy().rename(columns={‘TeamID’: ‘T1_TeamID’, ‘Elo’: ‘T1_Elo’})final_elo_t2_df = final_elo_df.copy().rename(columns={‘TeamID’: ‘T2_TeamID’, ‘Elo’: ‘T2_Elo’})# Convert the pandas DataFrames back to Spark DataFramesfinal_elo_t1_df = spark.createDataFrame(final_elo_t1_df)final_elo_t2_df = spark.createDataFrame(final_elo_t2_df)return final_elo_t1_df, final_elo_t2_dfIdeally, we wouldn’t calculate Elo changes on a match-by-match basis to determine each team’s final Elo for the season. However, we couldn’t come up with a better approach. Do you have any ideas? If so, let us know!Value AddedThe feature engineering steps demonstrated show how we can transform raw data — regular season statistics — into valuable information with predictive power. It is reasonable to assume that a team’s performance during the regular season is indicative of its potential performance in the final tournaments. By calculating the mean of observed match-by-match statistics for both the teams and their opponents, along with each team’s Elo rating in their final match, we were able to create a dataset suitable for modelling. Then, models were trained to predict the outcome of tournament matches using these features, among others developed in a similar way. With these models, we only need the two team IDs to look up the mean of their regular season statistics and their Elos to feed into the model and predict a score!In this post, we looked at some of the theory behind Spark and PySpark, how that can be applied, and a concrete practical example. We explored how feature engineering can be done in the case of sports data, creating regular season statistics to use as features for final tournament games. Hopefully you’ve found this interesting and helpful — happy feature engineering!The full source code for this post and others in the series can be found here.