Blog

From raw telematics data to optimized fleet performance: The PySpark approach

As urban mobility evolves, electric scooters and bikes are transforming how people navigate cities. However, this transformation brings pressing operational challenges to fleet operators, especially when it comes to understanding and reducing vehicle downtime. While traditional analytics often fall short in pinpointing these inefficiencies, the fusion of telematics data with advanced big data tools like Apache Spark and Python's Pandas library offers a breakthrough.

In this article, we will follow a real-world case study leveraging Navixy’s Raw Data API, continuing to showcase a complete data-driven strategy — from cleaning and validating the data all the way to segmenting it spatially and visualizing inefficiencies in Looker Studio.

By applying this method, system integrators and tech-savvy operators can gain granular insights into vehicle idle patterns, redistribute the fleet more intelligently, and significantly boost overall fleet efficiency. This data-centric approach is a blueprint for transforming raw signals into strategic actions. It is universal and can be applied to any other scenario.

Case study: A big business from a small ride

According to Cognitive Market Research, the global electric scooter sharing market size will reach $2,142.5 million in 2025. It will expand at a compound annual growth rate (CAGR) of 17.00% from 2025 to 2033.

Kicksharing services are revolutionizing urban transportation by providing a more sustainable option compared to traditional methods. For example, in the first two days after the launch of the electric scooter sharing service in Mexico City, two thousand trips were made. Another example is Berlin, where electric bicycles have seen a surge in popularity, with over 2 million rides recorded in 2021. However, this rapid expansion comes with its own set of challenges for operators, such as efficiently managing fleet distribution, ensuring scooters are available where demand is high, and minimizing operational costs.

One of the major hurdles involves managing periods when electric bicycles are not in use. Extended non-usage times can lead to inefficiencies in operations, reduced revenues, and increased maintenance needs. To tackle these issues, the industry is increasingly turning to big data analytics. By effectively analyzing raw data, companies can gain valuable insights that improve strategic decision-making. This example highlights the importance of raw data in leveraging big data for a variety of applications, which depend greatly on the depth of analysis performed.

While big data holds great promise for transforming industries and fostering innovation, it also brings several notable challenges. A primary issue is the enormous volume and complexity of the data, which necessitates a robust infrastructure and advanced technologies for efficient storage and processing. The article ‘Small steps to big data: How to retrieve telematics data at Navixy for further analysis’ covers the subject of data retrieval.

Another significant concern is data quality; it must be accurate, complete, and consistent to draw reliable insights. Additionally, integrating data from diverse sources can be quite complex and presents its own set of difficulties.

We will demonstrate how to ensure data quality through cleaning, validation, and utilizing big data tools such as the Pandas library in Python or the Spark tool for data aggregation and transformation.

How to clean and validate data for accurate analysis

Once you have retrieved the raw data for your mobility units, it is crucial to validate and clean the data to ensure its accuracy and reliability. Raw data can often contain discrepancies, such as spikes in locations or attributes. For example, the speed transmitted by a tracker could be abnormally high, or the latitude and longitude could have discrepancies due to a weak GPS signal. These issues can significantly impact the accuracy of your analysis. The steps listed below can be applied based on your specific needs to address these discrepancies and ensure high data quality.

Step 1: Identify and remove outliers

Outliers in the data can skew your analysis and lead to incorrect conclusions. To identify and remove outliers, you can use statistical methods or set thresholds based on domain knowledge. For instance, if the speed of a mobility unit exceeds a reasonable limit (e.g., 30 km/h for an electric scooter), it is likely an outlier and should be removed or corrected.

Use the code

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import pandas as pd
# Load the raw data into a DataFrame
df = pd.read_csv('all_raw_data.csv')
# Define threshold
speed_threshold = 30 # km/h
# Remove outliers based on speed
df = df[df['speed'] <= speed_threshold]
import pandas as pd # Load the raw data into a DataFrame df = pd.read_csv('all_raw_data.csv') # Define threshold speed_threshold = 30 # km/h # Remove outliers based on speed df = df[df['speed'] <= speed_threshold]
import pandas as pd

# Load the raw data into a DataFrame
df = pd.read_csv('all_raw_data.csv')

# Define threshold 
speed_threshold = 30  # km/h

# Remove outliers based on speed
df = df[df['speed'] <= speed_threshold]

Step 2: Handle missing or incorrect data

Missing or incorrect data points can also affect the accuracy of your analysis. Such cases must be handled appropriately. You can choose to remove rows with missing data, fill them with default values, or use interpolation methods to estimate the missing values, depending on your needs.

Use the code

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Remove rows with missing data
df = df.dropna()
# Optionaly, fill missing values with a default value (e.g., 0 for speed)
df['speed'].fillna(0, inplace=True)
# Remove rows with missing data df = df.dropna() # Optionaly, fill missing values with a default value (e.g., 0 for speed) df['speed'].fillna(0, inplace=True)
# Remove rows with missing data
df = df.dropna()

# Optionaly, fill missing values with a default value (e.g., 0 for speed)
df['speed'].fillna(0, inplace=True)

Step 3: Validate GPS data

GPS data can be particularly prone to discrepancies due to weak signals or interference. To validate it, you can check for unrealistic jumps in latitude and longitude. If the change in position between consecutive data points is too substantial, it may indicate a discrepancy. There could be other methods applied for this step, e.g., the three-sigma rule.

Use the code

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Calculate the distance between consecutive GPS points
df['distance'] = df.apply(lambda row: calculate_distance(row['latitude'], row['longitude'], row['prev_latitude'], row['prev_longitude']), axis=1)
# Define a threshold for abnormal GPS data
distance_threshold = 20 # meters
# Remove rows with abnormal GPS data
df = df[df['distance'] <= distance_threshold]
# Calculate the distance between consecutive GPS points df['distance'] = df.apply(lambda row: calculate_distance(row['latitude'], row['longitude'], row['prev_latitude'], row['prev_longitude']), axis=1) # Define a threshold for abnormal GPS data distance_threshold = 20 # meters # Remove rows with abnormal GPS data df = df[df['distance'] <= distance_threshold]
# Calculate the distance between consecutive GPS points
df['distance'] = df.apply(lambda row: calculate_distance(row['latitude'], row['longitude'], row['prev_latitude'], row['prev_longitude']), axis=1)

# Define a threshold for abnormal GPS data
distance_threshold = 20  # meters

# Remove rows with abnormal GPS data
df = df[df['distance'] <= distance_threshold]

Step 4: Ensure data consistency

Ensure that the data is consistent across all records. For example, timestamps should be stored as datetime objects, and all units should use the same measurement system (e.g., metric or imperial).

Use the code

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Convert timestamps to a standard format
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Ensure all units use the same measurement system (e.g., km/h for speed)
df['speed'] = df['speed'].apply(lambda x: x * 1.60934 if x > 100 else x) # Convert mph to km/h if necessary
# Convert timestamps to a standard format df['timestamp'] = pd.to_datetime(df['timestamp']) # Ensure all units use the same measurement system (e.g., km/h for speed) df['speed'] = df['speed'].apply(lambda x: x * 1.60934 if x > 100 else x) # Convert mph to km/h if necessary
# Convert timestamps to a standard format
df['timestamp'] = pd.to_datetime(df['timestamp'])

# Ensure all units use the same measurement system (e.g., km/h for speed)
df['speed'] = df['speed'].apply(lambda x: x * 1.60934 if x > 100 else x)  # Convert mph to km/h if necessary

By following these steps, you can validate and clean up the raw data, ensuring that it is accurate and reliable for further analysis.

Using Spark for data aggregation and transformation

Let’s proceed to the next stage of our data transformation process by leveraging Apache Spark.

  1. First, initialize a Spark session, which allows us to engage the power of distributed computing for the efficient handling of large datasets.
  2. Read the concatenated raw data CSV file into a Spark DataFrame, ensuring that the 
    msg_time
    msg_time column is converted to a timestamp format for accurate time-based calculations. Please note that CSV could be used with Spark, but it could lead to a decrease in the calculation performance.
  3. Define a window specification to partition the data by
    source_id
    source_id and order it by
    msg_time
    msg_time. This step is essential for calculating the time difference between consecutive rows, which helps us identify idle periods.
  4. Filter the rows where the speed is 0 and the time difference is 5 minutes or more, indicating that the mobility unit is not in use. The time difference is then converted to minutes to calculate the idle time.
  5. Group the data by 
    source_id
    source_id, date, latitude, and longitude, and sum the idle time for each group.
  6. To gain deeper insights, define the boundaries and size of each sub-sector by splitting the data into 100 x 100 sectors for the predefined analysis area. This step allows us to analyze the data at a granular level.
  7. Calculate the sub-sector for each row based on these boundaries and then group the data by
    lat_sector
    lat_sector and
    lng_sector
    lng_sector, summing the idle time for each sub-sector.
  8. To facilitate visualization, calculate the coordinates of the middle of each sector.

Finally, write the results to a new CSV file for further analysis.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead, expr, sum as spark_sum
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("TelematicsAnalysis").getOrCreate()
# Read prepared CSV file into a Spark DataFrame
df = spark.read.csv("concatenated_raw_data.csv", header=True, inferSchema=True)
# Convert msg_time to timestamp
df = df.withColumn("msg_time", col("msg_time").cast("timestamp"))
# Calculate non usage time for each object per day
window_spec = Window.partitionBy("source_id").orderBy("msg_time")
# Calculate the time difference between consecutive rows
df = df.withColumn("next_msg_time", lead("msg_time").over(window_spec))
df = df.withColumn("time_diff", col("next_msg_time").cast("long") - col("msg_time").cast("long"))
# Filter rows where speed is 0 and time difference is 5 minutes or more
idle_df = df.filter((col("speed") == 0) & (col("time_diff") >= 300))
# Calculate non usage in minutes
idle_df = idle_df.withColumn("idle_time", col("time_diff") / 60)
# Sum non usage per source_id per day, including lat and lng
idle_time_per_day = idle_df.groupBy("source_id", expr("date_format(msg_time, 'yyyy-MM-dd')").alias("date"), "lat", "lng").agg(spark_sum("idle_time").alias("total_idle_time"))
# Define the sector boundaries
lat_min, lat_max = 19.28416103, 19.5256355
lng_min, lng_max = -99.239600, -99.0087829
# Define the size of each sub-sector
lat_step = (lat_max - lat_min) / 100
lng_step = (lng_max - lng_min) / 100
# Calculate the sub-sector for each row
idle_time_per_day = idle_time_per_day.withColumn("lat_sector", ((col("lat") - lat_min) / lat_step).cast("int"))
idle_time_per_day = idle_time_per_day.withColumn("lng_sector", ((col("lng") - lng_min) / lng_step).cast("int"))
# Calculate the total non usage time for each sub-sector
sector_idle_time = idle_time_per_day.groupBy("lat_sector", "lng_sector").agg(spark_sum("total_idle_time").alias("total_idle_time"))
# Calculate the coordinates of the middle of each sector
sector_idle_time = sector_idle_time.withColumn("lat_middle", lat_min + (col("lat_sector") + 0.5) * lat_step)
sector_idle_time = sector_idle_time.withColumn("lng_middle", lng_min + (col("lng_sector") + 0.5) * lng_step)
# Write the results to a new CSV file
sector_idle_time.toPandas().to_csv("sector_non_usage_time.csv", index=False)
from pyspark.sql import SparkSession from pyspark.sql.functions import col, lag, lead, expr, sum as spark_sum from pyspark.sql.window import Window # Initialize Spark session spark = SparkSession.builder.appName("TelematicsAnalysis").getOrCreate() # Read prepared CSV file into a Spark DataFrame df = spark.read.csv("concatenated_raw_data.csv", header=True, inferSchema=True) # Convert msg_time to timestamp df = df.withColumn("msg_time", col("msg_time").cast("timestamp")) # Calculate non usage time for each object per day window_spec = Window.partitionBy("source_id").orderBy("msg_time") # Calculate the time difference between consecutive rows df = df.withColumn("next_msg_time", lead("msg_time").over(window_spec)) df = df.withColumn("time_diff", col("next_msg_time").cast("long") - col("msg_time").cast("long")) # Filter rows where speed is 0 and time difference is 5 minutes or more idle_df = df.filter((col("speed") == 0) & (col("time_diff") >= 300)) # Calculate non usage in minutes idle_df = idle_df.withColumn("idle_time", col("time_diff") / 60) # Sum non usage per source_id per day, including lat and lng idle_time_per_day = idle_df.groupBy("source_id", expr("date_format(msg_time, 'yyyy-MM-dd')").alias("date"), "lat", "lng").agg(spark_sum("idle_time").alias("total_idle_time")) # Define the sector boundaries lat_min, lat_max = 19.28416103, 19.5256355 lng_min, lng_max = -99.239600, -99.0087829 # Define the size of each sub-sector lat_step = (lat_max - lat_min) / 100 lng_step = (lng_max - lng_min) / 100 # Calculate the sub-sector for each row idle_time_per_day = idle_time_per_day.withColumn("lat_sector", ((col("lat") - lat_min) / lat_step).cast("int")) idle_time_per_day = idle_time_per_day.withColumn("lng_sector", ((col("lng") - lng_min) / lng_step).cast("int")) # Calculate the total non usage time for each sub-sector sector_idle_time = idle_time_per_day.groupBy("lat_sector", "lng_sector").agg(spark_sum("total_idle_time").alias("total_idle_time")) # Calculate the coordinates of the middle of each sector sector_idle_time = sector_idle_time.withColumn("lat_middle", lat_min + (col("lat_sector") + 0.5) * lat_step) sector_idle_time = sector_idle_time.withColumn("lng_middle", lng_min + (col("lng_sector") + 0.5) * lng_step) # Write the results to a new CSV file sector_idle_time.toPandas().to_csv("sector_non_usage_time.csv", index=False)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead, expr, sum as spark_sum
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("TelematicsAnalysis").getOrCreate()

# Read prepared CSV file into a Spark DataFrame
df = spark.read.csv("concatenated_raw_data.csv", header=True, inferSchema=True)

# Convert msg_time to timestamp
df = df.withColumn("msg_time", col("msg_time").cast("timestamp"))

# Calculate non usage time for each object per day
window_spec = Window.partitionBy("source_id").orderBy("msg_time")

# Calculate the time difference between consecutive rows
df = df.withColumn("next_msg_time", lead("msg_time").over(window_spec))
df = df.withColumn("time_diff", col("next_msg_time").cast("long") - col("msg_time").cast("long"))

# Filter rows where speed is 0 and time difference is 5 minutes or more
idle_df = df.filter((col("speed") == 0) & (col("time_diff") >= 300))

# Calculate non usage in minutes
idle_df = idle_df.withColumn("idle_time", col("time_diff") / 60)

# Sum non usage per source_id per day, including lat and lng
idle_time_per_day = idle_df.groupBy("source_id", expr("date_format(msg_time, 'yyyy-MM-dd')").alias("date"), "lat", "lng").agg(spark_sum("idle_time").alias("total_idle_time"))

# Define the sector boundaries
lat_min, lat_max = 19.28416103, 19.5256355
lng_min, lng_max = -99.239600, -99.0087829

# Define the size of each sub-sector
lat_step = (lat_max - lat_min) / 100
lng_step = (lng_max - lng_min) / 100

# Calculate the sub-sector for each row
idle_time_per_day = idle_time_per_day.withColumn("lat_sector", ((col("lat") - lat_min) / lat_step).cast("int"))
idle_time_per_day = idle_time_per_day.withColumn("lng_sector", ((col("lng") - lng_min) / lng_step).cast("int"))

# Calculate the total non usage time for each sub-sector
sector_idle_time = idle_time_per_day.groupBy("lat_sector", "lng_sector").agg(spark_sum("total_idle_time").alias("total_idle_time"))

# Calculate the coordinates of the middle of each sector
sector_idle_time = sector_idle_time.withColumn("lat_middle", lat_min + (col("lat_sector") + 0.5) * lat_step)
sector_idle_time = sector_idle_time.withColumn("lng_middle", lng_min + (col("lng_sector") + 0.5) * lng_step)

# Write the results to a new CSV file
sector_idle_time.toPandas().to_csv("sector_non_usage_time.csv", index=False)

Visualizing data with Looker Studio for future decisions

Once the data has been aggregated and transformed into a CSV file, our next step is to visualize it to gain valuable insights. We are going to use Google Looker Studio (formerly known as Google Data Studio) — a powerful tool that allows creating interactive and visually appealing reports and dashboards.

  1. To get started, upload the CSV file containing the aggregated data to Google Drive. This is the easiest way to store CSV for further analysis. Depending on the performance and security requirements, you can choose other storage like GCS or AWS S3 for the data sources you have.
  2. Open Looker Studio and create a new report.
  3. In the report, add a new data source and select the uploaded CSV file from Google Drive. Looker Studio will automatically detect the columns and data types, making it easy to start building your visualizations.

You can use a Map (Bubble Map or Heat Map) visual to show data on the map for better understanding. For example, you can create a map visualization to display the idle time for each sub-sector using the latitude and longitude coordinates calculated earlier. You can also create bar charts or line graphs to show the total idle time per

source_id
source_id over different days. Looker Studio offers a wide range of customization options, allowing you to tailor the visualizations to your specific needs.

Big data vizualization
A heatmap reveals how non-usage time varies across urban areas
Vizualization data Insights
Larger circles highlight zones of high idle time, while smaller bubbles indicate shorter non-usage time

At this point, we come back to our case study of a self-driving scooter kickshare service. The following heatmap reveals how non-usage time varies across urban areas. Larger circles highlight zones of high idle time, signaling areas where redistribution can significantly enhance efficiency, while smaller bubbles indicate shorter non-usage time.

This visualization helps identify areas where mobility units are underutilized, suggesting that these units could be redistributed to areas with higher demand. By analyzing this data, operators can make informed decisions to optimize the distribution of their fleet, ensuring that mobility units are available where they are needed most.

Paving the way for data-driven solutions in the telematics industry

In this case study, we have explored how to transform historical telematics big data into actionable insights using Navixy APIs, Python, and Apache Spark. By retrieving, cleaning, validating, and aggregating raw data, we demonstrated a structured approach to optimizing fleet operations and reducing downtime for mobility units. This workflow showcases the power of retrospective data analysis in improving service availability, efficiency, and strategic decision-making.

While this article focuses on leveraging historical data, companies can achieve similar goals using real-time telematics data with Navixy's IoT Logic product. It provides a powerful framework for real-time event processing, automated workflows, and instant response mechanisms — perfect for dynamic fleet operations.

Discover how you can unlock the full potential of telematics with Navixy. Schedule a consultation with our experts.

← Previous article
Ready for the most innovative GPS tracking software?
SIGN UP
Recent posts