Learn practical skills, build real-world projects, and advance your career
Updated 3 years ago
Test demo for functions
try:
import rosbag
except ModuleNotFoundError:
!pip3 install rosbag --extra-index-url https://rospypi.github.io/simple/
import rosbag
Failed to load Python extension for LZ4 support. LZ4 compression will not be available.
if 'google.colab' in str(get_ipython()):
from google.colab import drive
from google.colab import files
drive.mount('/content/drive')
path = '/content/drive/MyDrive/SA'
# src = list(files.upload().values())[0]
# open('functions.py','wb').write(src)
else:
path = '../data'
#from functions import *
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
function.py
import rosbag
import random
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import json
import os
import glob
from scipy.spatial import distance
from matplotlib import animation, rc
import PIL
from PIL import Image
from matplotlib.pyplot import cm
from datetime import datetime as dt
from matplotlib.dates import date2num
from scipy import fftpack
from math import pi
import math
import yaml
import warnings
warnings.filterwarnings("ignore")
def read_slam_data(path):
bag = rosbag.Bag(path)
conel_x, conel_y = [], []
coner_x, coner_y = [], []
car_x, car_y = [], []
time = []
for topic, msg, t in bag.read_messages(topics=['/cone_right', '/cone_left', '/pose_stamped']):
time.append(t)
if topic == "/cone_right":
for pose in msg.poses:
coner_x.append(pose.position.x)
coner_y.append(pose.position.y)
elif topic == "/cone_left":
for pose in msg.poses:
conel_x.append(pose.position.x)
conel_y.append(pose.position.y)
elif topic == "/pose_stamped":
car_x.append(msg.pose.position.x)
car_y.append(msg.pose.position.y)
length = min(len(time), len(car_x), len(car_y))
pose_dict = {'time': time[:length], 'x': car_x[:length], 'y': car_y[:length]}
cone_l_dict = {'x': conel_x, 'y': conel_y}
cone_r_dict = {'x': coner_x, 'y': coner_y}
car_pose = pd.DataFrame(pose_dict)
cone_l = pd.DataFrame(cone_l_dict)
cone_r = pd.DataFrame(cone_r_dict)
bag.close()
return car_pose, cone_l.sort_values(by=['x']), cone_r.sort_values(by=['x'])
def scatter_plot_cones_and_car(car_pose, cone_l, cone_r):
plt.scatter(car_pose['x'], car_pose['y'], color='b')
plt.scatter(cone_l['x'], cone_l['y'], color='g')
plt.scatter(cone_r['x'], cone_r['y'], color='g')
plt.grid(color='lightgray', linestyle='--')
plt.show()
def smooth_data_fft(arr, span): # the scaling of "span" is open to suggestions
w = fftpack.rfft(arr)
spectrum = w ** 2
cutoff_idx = spectrum < (spectrum.max() * (1 - np.exp(-span / 2000)))
w[cutoff_idx] = 0
return fftpack.irfft(w)
def plot_cones_and_car(car_pose, cone_l, cone_r):
plt.plot(car_pose['x'], smooth_data_fft(car_pose['y'].to_numpy(), 10), color='b')
plt.plot(cone_l['x'], smooth_data_fft(cone_l['y'].to_numpy(), 10), color='g')
plt.plot(cone_r['x'], smooth_data_fft(cone_r['y'].to_numpy(), 10), color='g')
plt.grid(color='lightgray', linestyle='--')
plt.show()
def compare_cones_position(cone_l_slam, cone_r_slam, cone_l, cone_r):
plt.plot(cone_l_slam['x'], cone_l_slam['y'], marker='<', color='y', label='yellow cones slam')
plt.plot(cone_r_slam['x'], cone_r_slam['y'], marker='<', color='b', label='yellow cones slam')
plt.plot(cone_l['x'], cone_l['y'], marker='>', color='y', label='yellow cones simulator ')
plt.plot(cone_r['x'], cone_r['y'], marker='>', color='b', label='yellow cones simulator ')
plt.grid(color='lightgray', linestyle='--')
plt.legend()
plt.show()
def compare_car_position(car_pose_slam, car_pose):
plt.plot(car_pose_slam['x'], car_pose_slam['y'], marker='<', color='orange', label='car position slam')
plt.plot(car_pose['x'], car_pose['y'], marker='>', color='gold', label='car position simulator')
plt.grid(color='lightgray', linestyle='--')
plt.legend()
plt.show()
def build_telemetry(path_to_bag):
b_laps = rosbag.Bag(path_to_bag)
acc_x, acc_y, acc_z, speed, steer, throttle, brake, gear, pose_x, pose_y, time = [], [], [], [], [], [], [], [], [], [], []
for topic, msg, t in b_laps.read_messages(topics='/carla/ego_vehicle/vehicle_status'):
time.append(t)
acc_x.append(msg.acceleration.linear.x)
acc_y.append(msg.acceleration.linear.y)
acc_z.append(msg.acceleration.linear.z)
speed.append(msg.velocity)
steer.append(msg.control.steer)
throttle.append(msg.control.throttle)
brake.append(msg.control.brake)
gear.append(msg.control.gear)
acc = np.linalg.norm(np.vstack((acc_x, acc_y, acc_z)).T, axis=1)
telemetry_dict = {'time': time, 'speed': speed, 'acceleration': acc, 'steering angle': steer, 'throttle': throttle,
'brake': brake, 'gear': gear}
telemetry = pd.DataFrame(telemetry_dict)
return telemetry
def find_max_difference(line1: np.ndarray, line2: np.ndarray):
"""
Finds the point where line1 and line2 differs the most
:param line1: stream of data, list containing samples for some timesteps
:param line2: stream of data, list containing samples for some timesteps
:return: the position (index) where line1 and line2 differ the most and the difference itself
"""
# if line1 is longer than line2 or vice versa, they will be aligned on their first element
max_diff, max_diff_idx = 0, 0
for i in range(min(len(line1), len(line2))):
diff = np.linalg.norm(line1[i] - line2[i])
if diff > max_diff:
max_diff = diff
max_diff_idx = i
return max_diff_idx, max_diff
def check_position_accuracy(slam_posits, ground_truth, plot=True):
"""
This functions creates a plot over the time steps of the difference between the actual position of the car in the
simulator (ground truth) and the position of the car computed by the SLAM
"""
# random data only to test the function, comment the following lines for "real" usage
# ground_truth = np.array([(random.randint(0, 100), random.randint(0, 100)) for _ in range(10)])
# slam_posits = np.array([(random.randint(0, 100), random.randint(0, 100)) for _ in range(10)])
# find max difference on x axis and y axis
idx_x, max_diff_x = find_max_difference(ground_truth[:, 0], slam_posits[:, 0])
idx_y, max_diff_y = find_max_difference(ground_truth[:, 1], slam_posits[:, 1])
# plot the differences over time, first on the x axis and then on the y axis
if plot:
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
connection_x = [idx_x, idx_x]
connection_y = [ground_truth[idx_x][0], slam_posits[idx_x][0]]
ax[0].plot(ground_truth[:, 0], label="Ground truth")
ax[0].plot(slam_posits[:, 0], label="SLAM")
ax[0].plot(connection_x, connection_y, color="red", label="Max difference")
ax[0].set_title("Position difference on the X axis")
ax[0].legend()
connection_x = [idx_y, idx_y]
connection_y = [ground_truth[idx_y][1], slam_posits[idx_y][1]]
ax[1].plot(ground_truth[:, 1], label="Ground truth")
ax[1].plot(slam_posits[:, 1], label="SLAM")
ax[1].plot(connection_x, connection_y, color="red", label="Max difference")
ax[1].set_title("Position difference on the Y axis")
ax[1].legend()
plt.show()
# max absolute difference (not along a specific axis)
_, max_abs_diff = find_max_difference(ground_truth, slam_posits)
# standard deviation of the difference
diffs = [np.linalg.norm(ground_truth[i] - slam_posits[i]) for i in range(len(ground_truth))]
# print results
print(f"Max difference on x axis: {max_diff_x:.4f}")
print(f"Max difference on y axis: {max_diff_y:.4f}")
print(f"Max absolute difference: {max_abs_diff:.4f}")
print(f"Standard deviation of the difference: {np.std(diffs):.4f}")
def plot_track_layout(path, json_file):
"""
Plot track layout from json file
1. plot the track layout from json file
2. return the position of the yellow/blue cones and the center line
"""
track_path = os.path.join(path, json_file)
cone_y_x, cone_y_y = [], [] # yellow cones
cone_b_x, cone_b_y = [], [] # blue cones
center_x, center_y = [], [] # center points between yellow and blue cones
with open(track_path) as f:
track = json.load(f)
for id in range(len(track['yellow_cones'])):
cone_y_x.append(track['yellow_cones'][id]['x'])
cone_y_y.append(track['yellow_cones'][id]['y'])
cone_b_x.append(track['blue_cones'][id]['x'])
cone_b_y.append(track['blue_cones'][id]['y'])
center_x.append((track['yellow_cones'][id]['x'] + track['blue_cones'][id]['x']) / 2)
center_y.append((track['yellow_cones'][id]['y'] + track['blue_cones'][id]['y']) / 2)
cone_y = {'x': cone_y_x, 'y': cone_y_y}
cone_b = {'x': cone_b_x, 'y': cone_b_y}
center = {'x': center_x, 'y': center_y}
plt.plot(cone_y_x, cone_y_y, color='y', label='yellow cones')
plt.plot(cone_b_x, cone_b_y, color='b', label='blue cones')
plt.plot(center_x, center_y, color='r', label='center points')
plt.grid(color='lightgray', linestyle='--')
plt.legend()
return cone_y, cone_b, center
def prepare_data(b_laps):
"""
Read the data from the bag file
return :
1. telemetry data
2. car positions
"""
acc_x, acc_y, acc_z, speed, steer, throttle, brake, gear, pose_x, pose_y, time = [], [], [], [], [], [], [], [], [], [], []
car_pose_x, car_pose_y, car_pose_z = [], [], []
for topic, msg, t in b_laps.read_messages(topics='/carla/ego_vehicle/vehicle_status'):
time.append(t)
acc_x.append(msg.acceleration.linear.x)
acc_y.append(msg.acceleration.linear.y)
acc_z.append(msg.acceleration.linear.z)
speed.append(msg.velocity)
steer.append(msg.control.steer)
throttle.append(msg.control.throttle)
brake.append(msg.control.brake)
gear.append(msg.control.gear)
for topic, msg, t in b_laps.read_messages(topics='/carla/ego_vehicle/odometry'):
car_pose_x.append(msg.pose.pose.position.x)
car_pose_y.append(msg.pose.pose.position.y)
car_pose_z.append(msg.pose.pose.position.z)
car_positions = {'x': car_pose_x, 'y': car_pose_y, 'z': car_pose_z}
acc = np.linalg.norm(np.vstack((acc_x, acc_y, acc_z)).T, axis=1)
telemetry_dict = {'time': time, 'speed': speed, 'acceleration': acc, 'steering angle': steer, 'throttle': throttle,
'brake': brake, 'gear': gear}
telemetry = pd.DataFrame(telemetry_dict)
return telemetry, car_positions
def find_lap_indices(car_pose, threshold, skip):
"""
Find lap indices
parameters:
1. car_position dict
2. use the threshold to define the minimal distance between the start point and the neighbour point specified by threshold.
3. the number of points to skip for finding the nearest point
return:
a dict for each lap the start point and end point
"""
postion_indices = {}
lap_indces = []
lap_dict = {}
min_distance = distance.euclidean([car_pose['x'][0], car_pose['y'][0]],
[car_pose['x'][threshold], car_pose['y'][threshold]])
dist = 0
# print("min_distance: ", min_distance)
for i in range(skip, len(car_pose['x'])):
dist = distance.euclidean([car_pose['x'][0], car_pose['y'][0]], [car_pose['x'][i], car_pose['y'][i]])
if dist < min_distance and car_pose['x'][i] < car_pose['x'][0] < car_pose['x'][(i + 1) % len(car_pose['x'])]:
lap_indces.append(i)
lap_dict = {lap_id: (lap_indces[lap_id - 1] + 1 if lap_id > 0 else 0, lap_indces[lap_id]) for lap_id in
range(len(lap_indces))}
return lap_dict
def workaroud(center, car_pose):
"""
workaroud for testing
parameter:
1. center line of the tarck layout
2. dict of the car position
return:
1. Adjusted car position
"""
car_pose_x = [x for x in car_pose['x']]
car_pose_y = [y for y in car_pose['y']]
car_pose_x_new = [(x - 215) * 7 / 9 for x in car_pose['x']]
car_pose_y_new = [(y + 235) * 8 / 9 for y in car_pose['y']]
car_pose_new = {"x": car_pose_x_new, "y": car_pose_y_new}
plt.plot(car_pose_x, car_pose_y, color='b', label="original car positions")
plt.plot(car_pose_x_new, car_pose_y_new, color='g', label="modified car positions")
plt.plot(center["x"], center["y"], color='r', label="center line")
plt.grid(color='lightgray', linestyle='--')
plt.legend()
return car_pose_new
def project_car_position(laps_indices, center, car_pose, threshold):
"""
Project the position of the car into center line
Project the position of the car into center line and plot the projected position of the car for each lap
parameters:
1. dict of the laps indices
2. center line of track layout
3. dict of car positions
4. the last threshold index of neighbor to skip when i < 10
return:
1. the projected indices for each lap
2. the position of the car for each lap
"""
project_laps_indices = {}
positions_car_x, positions_car_y = [], []
for lap in range(len(laps_indices)):
postion_indices = {}
black_list = set()
for i in range(len(center["x"])):
min_distance = float('inf')
dist = 0
for j in range(laps_indices[lap][0], laps_indices[lap][1] - (threshold if i < 10 else 0)):
dist = distance.euclidean([center["x"][i], center["y"][i]], [car_pose["x"][j], car_pose["y"][j]])
if dist < min_distance and j not in black_list:
min_distance = dist
postion_indices[i] = j
black_list.add(j)
project_laps_indices[lap] = postion_indices
for lap in range(len(laps_indices)):
positions_car_x.append([car_pose["x"][project_laps_indices[lap][id]] for id in range(len(center["x"]))])
positions_car_y.append([car_pose["y"][project_laps_indices[lap][id]] for id in range(len(center["y"]))])
car_positions_laps = {"x": positions_car_x, "y": positions_car_y}
for lap in range(len(laps_indices)):
plt.scatter(car_positions_laps['x'][lap], car_positions_laps['y'][lap], label="lap " + str(lap))
plt.grid(color='lightgray', linestyle='--')
plt.legend()
return project_laps_indices, car_positions_laps
def delta_plot(telemetry, metric, project_laps_indices, lap_id):
"""
parameters:
1. the telemetry data of the laps
2. a certain metric (e.g. speed)
3. projected indices for each lap
4. index of the lap
"""
assert lap_id > 0
project_indices = {}
telemetry_laps = {}
delta = []
for lap in range(len(project_laps_indices)):
project_indices[lap] = [y for (x, y) in project_laps_indices[lap].items()]
for lap in range(len(project_laps_indices)):
telemetry_laps[lap] = telemetry.loc[project_indices[lap], 'speed'].reset_index(drop=True)
for id in range(len(project_laps_indices[lap_id])):
delta.append(telemetry_laps[lap_id][id] - telemetry_laps[lap_id - 1][id])
plt.plot(range(len(project_laps_indices[lap_id])), delta)
plt.ylabel('Position indices of the track')
plt.ylabel('Delta ' + str(metric))
plt.grid(color='lightgray', linestyle='--')
plt.show()
def plot_telemetry(telemetry):
"""
Plot telemetry data
parameters:
1. telemetry to plot
"""
fig, axs = plt.subplots(len(telemetry.columns) - 1, 1, sharex=True)
fig.set_size_inches(15, 10)
for i in range(len(telemetry.columns) - 1):
axs[i].plot(range(len(telemetry)), telemetry.iloc[:, i + 1])
axs[i].set_ylabel(telemetry.columns[i + 1])
axs[i].xaxis.grid(color='gray', linestyle='dashed')
axs[i].yaxis.grid(color='gray', linestyle='dashed')
if i == 0:
axs[i].spines['top'].set_visible(True)
else:
axs[i].spines['top'].set_visible(False)
if i == len(telemetry.columns) - 2:
axs[i].spines['bottom'].set_visible(True)
else:
axs[i].spines['bottom'].set_visible(False)
plt.subplots_adjust(hspace=0.01)
plt.show()
def plot_lap_times(telemetry, laps_indices):
"""
Plot lap times
parameters:
1. telemetry data which include timestamps
2. timestamps indices for each lap
return:
1. time used for each lap
"""
lap_total_times = {}
for lap in range(len(laps_indices)):
lap_total_times[lap] = dt.fromtimestamp(
telemetry.loc[laps_indices[lap][1], 'time'].to_sec()) - dt.fromtimestamp(
telemetry.loc[laps_indices[lap][0], 'time'].to_sec())
time_list = [y for (x, y) in lap_total_times.items()]
min_value = min(time_list)
min_index = time_list.index(min_value)
faster = []
slower = []
plt.plot(range(len(laps_indices)), [lap_total_times[i].total_seconds() for i in range(len(laps_indices))], 'o-',
label="lap time")
for i in range(1, len(laps_indices)):
if time_list[i] < time_list[i - 1]:
faster.append(i)
else:
slower.append(i)
plt.plot([i for i in faster], [time_list[i].total_seconds() for i in faster], marker='o', color='g',
label="faster than the lap before")
plt.plot([i for i in slower], [time_list[i].total_seconds() for i in slower], marker='o', color='y',
label="slower than the lap before")
plt.plot(min_index, min_value.total_seconds(), marker='o', color='purple', label="current fastest lap")
plt.title('Plot lap times')
plt.ylabel('Lap time (s)')
plt.xlabel('Lap number')
plt.xticks(range(len(laps_indices)))
plt.grid(color='lightgray', linestyle='--')
plt.legend()
plt.show()
return lap_total_times
def animate_telemetry(telemetry):
"""
Telemetry data animation
parameters:
1. telemetry to animate
"""
# Create figure for plotting
fig, axs = plt.subplots(len(telemetry.columns) - 1, 1, sharex=True)
fig.set_size_inches(10, 8)
xs = []
ys = []
# animation function. This is called sequentially
def drawframe(n, xs, ys):
# Add x and y to lists
xs.append(n)
ys.append(telemetry.iloc[n, 1:])
# Limit x and y lists to 20 items
xs = xs[-20:]
ys = ys[-20:]
for i in range(len(telemetry.columns) - 1):
axs[i].clear()
for i in range(len(telemetry.columns) - 1):
axs[i].plot(xs, pd.DataFrame(ys).iloc[:, i])
axs[i].set_ylabel(telemetry.columns[i + 1])
axs[i].xaxis.grid(color='gray', linestyle='dashed')
axs[i].yaxis.grid(color='gray', linestyle='dashed')
if i == 0:
axs[i].spines['top'].set_visible(True)
axs[i].spines['bottom'].set_visible(False)
axs[i].set_ylim(-1, 16)
elif i == len(telemetry.columns) - 2:
axs[i].spines['top'].set_visible(False)
axs[i].spines['bottom'].set_visible(True)
axs[i].set_ylim(-0.5, 5.5)
else:
axs[i].spines['top'].set_visible(False)
axs[i].spines['bottom'].set_visible(False)
axs[i].set_ylim(-0.1, 1.1)
axs[1].set_ylim(-1, 230)
axs[2].set_ylim(-0.8, 0.8)
axs[0].set_title('Telemetry data')
# Format plot
plt.subplots_adjust(hspace=0.1)
# plt.xticks(rotation=45, ha='right')
plt.subplots_adjust(bottom=0.30)
if n == len(telemetry) - 1:
plt.close()
# Set up plot to call animate() function periodically
ani = animation.FuncAnimation(fig, drawframe, fargs=(xs, ys), interval=200, frames=len(telemetry))
rc('animation', html='html5')
return ani
def compare_telemetry(list_laps):
"""
Telemetry lines comparison
parameters:
1. a list of telemetry data, one lap for each row
"""
N_laps = len(list_laps)
color = cm.rainbow(np.linspace(0, 1, len(list_laps)))
fig, axs = plt.subplots(len(list_laps[0].columns) - 1, 1, sharex=True)
fig.set_size_inches(15, 10)
for i in range(len(list_laps[0].columns) - 1):
for j in range(N_laps):
axs[i].plot(range(len(list_laps[j])), list_laps[j].iloc[:, i + 1], c=color[j], lw=1, label='lap %s' % j)
axs[i].set_ylabel(list_laps[j].columns[i + 1])
axs[i].xaxis.grid(color='gray', linestyle='dashed')
axs[i].yaxis.grid(color='gray', linestyle='dashed')
if i == 0:
axs[i].spines['top'].set_visible(True)
axs[i].legend(loc="upper right")
else:
axs[i].spines['top'].set_visible(False)
if i == len(list_laps[j].columns) - 2:
axs[i].spines['bottom'].set_visible(True)
else:
axs[i].spines['bottom'].set_visible(False)
plt.subplots_adjust(hspace=0.01)
plt.show()
def animate_telemetry_lines_comparison(list_laps):
"""
Telemetry lines comparison animation
parameters:
1. a list of telemetry data, one lap for each row
"""
# Create figure for plotting
N_laps = len(list_laps)
color = cm.rainbow(np.linspace(0, 1, N_laps))
fig, axs = plt.subplots(len(list_laps[0].columns) - 1, 1, sharex=True)
fig.set_size_inches(10, 8)
xs = []
ys = dict()
for id in range(N_laps):
ys[id] = []
# animation function. This is called sequentially
def drawframe(n, xs, ys):
# Add x and y to lists
xs.append(n)
for i in range(N_laps):
ys[i].append(list_laps[i].iloc[n, 1:])
# Limit x and y lists to 20 items
xs = xs[-20:]
for i in range(N_laps):
ys[i] = ys[i][-20:]
for i in range(len(list_laps[0].columns) - 1):
axs[i].clear()
for i in range(len(list_laps[0].columns) - 1):
for j in range(N_laps):
axs[i].plot(xs, pd.DataFrame(ys[j]).iloc[:, i], c=color[j], label='lap %s' % j, lw=1)
axs[i].set_ylabel(list_laps[0].columns[i + 1])
axs[i].xaxis.grid(color='gray', linestyle='dashed')
axs[i].yaxis.grid(color='gray', linestyle='dashed')
if i == 0:
axs[i].spines['top'].set_visible(True)
axs[i].spines['bottom'].set_visible(False)
axs[i].legend(loc="upper right")
axs[i].set_ylim(-1, 16)
elif i == len(list_laps[0].columns) - 2:
axs[i].spines['top'].set_visible(False)
axs[i].spines['bottom'].set_visible(True)
axs[i].set_ylim(-0.5, 5.5)
else:
axs[i].spines['top'].set_visible(False)
axs[i].spines['bottom'].set_visible(False)
axs[i].set_ylim(-0.1, 1.1)
axs[1].set_ylim(-1, 26)
axs[2].set_ylim(-0.8, 0.8)
axs[0].set_title('Compare Telemetry data from differnt laps over time')
# Format plot
plt.subplots_adjust(hspace=0.1)
# plt.xticks(rotation=45, ha='right')
plt.subplots_adjust(bottom=0.30)
if n == len(list_laps[0]) - 1:
plt.close()
# Set up plot to call animate() function periodically
ani = animation.FuncAnimation(fig, drawframe, fargs=(xs, ys), interval=200, frames=len(list_laps[0]), repeat=False)
rc('animation', html='html5')
return ani
def plot_track_layout_depending_on_time_spend(project_laps_indices, telemetry, car_positions_laps, cone_y, cone_b,
lap_id):
"""
Display the track layout in different colors depending on the time spend for each projected position during the lap
parameters:
1. projected indices for each lap
2. telemetry dataframe
3. positions of the car for each lap
4. positions of yellow cones
5. positions of blue cones
6. id of the lap
return:
1. elemetry dataframe with new columns 'time_spend', 'min_time_spend' and 'max_time_spend'
"""
project_indices = {}
time_spend_lap = {}
min_time_spend_lap = {}
telemetry_laps = {}
max_time_spend_lap = {}
for lap in range(len(project_laps_indices)):
project_indices[lap] = [y for (x, y) in project_laps_indices[lap].items()]
time_spend_lap[lap] = [time_spend.to_sec() for time_spend in (
telemetry.loc[project_indices[lap], "time"].values -
telemetry.loc[project_indices[lap], "time"].values[0])]
for loc in range(len(project_indices[0])):
min_time_spend_lap[loc] = min(
[time for time in [time_spend_lap[lap][loc] for lap in range(len(project_laps_indices))]])
max_time_spend_lap[loc] = max(
[time for time in [time_spend_lap[lap][loc] for lap in range(len(project_laps_indices))]])
for lap in range(len(project_laps_indices)):
telemetry_laps[lap] = telemetry.loc[project_indices[lap], :].reset_index(drop=True)
telemetry_laps[lap].insert(len(telemetry_laps[lap].columns), "time_spend", pd.DataFrame(time_spend_lap[lap]))
telemetry_laps[lap].insert(len(telemetry_laps[lap].columns), "min_time_spend",
pd.DataFrame(min_time_spend_lap.values()))
telemetry_laps[lap].insert(len(telemetry_laps[lap].columns), "max_time_spend",
pd.DataFrame(max_time_spend_lap.values()))
if lap_id > 0:
# faster than the lap before
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] < telemetry_laps[lap_id - 1]['time_spend']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] < telemetry_laps[lap_id - 1][
'time_spend']].index.tolist()],
color='green', label='faster than the lap before')
# slower than the lap before
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] > telemetry_laps[lap_id - 1]['time_spend']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] > telemetry_laps[lap_id - 1][
'time_spend']].index.tolist()],
color='gold', label='slower than the lap before')
else:
# for lap_id==0 and the slowest
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] == telemetry_laps[lap_id]['max_time_spend']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] == telemetry_laps[lap_id][
'max_time_spend']].index.tolist()],
color='olive', label='current slowest')
# current fastest
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] == telemetry_laps[lap_id]['min_time_spend']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['time_spend'] == telemetry_laps[lap_id]['min_time_spend']].index.tolist()],
color='purple', label='current fastest')
plt.plot(cone_y['x'], cone_y['y'], color='y', label='yellow cones')
plt.plot(cone_b['x'], cone_b['y'], color='b', label='blue cones')
plt.legend()
plt.grid(color='lightgray', linestyle='--')
plt.show()
return telemetry_laps
def plot_track_layout_depending_on_speed(project_laps_indices, telemetry, car_positions_laps, cone_y, cone_b, lap_id):
"""
Display the track layout in different colors depending on the speed
parameters:
1. projected indices for each lap
2. telemetry dataframe
3. positions of the car for each lap
4. positions of yellow cones
5. positions of blue cones
6. id of the lap
return:
1. elemetry dataframe with new columns 'time_spend', 'min_time_spend' and 'max_time_spend'
"""
project_indices = {}
speed_lap = {}
min_speed_lap = {}
telemetry_laps = {}
max_speed_lap = {}
for lap in range(len(project_laps_indices)):
project_indices[lap] = [y for (x, y) in project_laps_indices[lap].items()]
speed_lap[lap] = [speed for speed in telemetry.loc[project_indices[lap], "speed"].values]
for loc in range(len(project_indices[0])):
min_speed_lap[loc] = min([speed for speed in [speed_lap[lap][loc] for lap in range(len(project_laps_indices))]])
for lap in range(len(project_laps_indices)):
telemetry_laps[lap] = telemetry.loc[project_indices[lap], :].reset_index(drop=True)
telemetry_laps[lap].insert(len(telemetry_laps[lap].columns), "min_speed", pd.DataFrame(min_speed_lap.values()))
if lap_id > 0:
# faster than the lap before
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['speed'] < telemetry_laps[lap_id - 1]['speed']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['speed'] < telemetry_laps[lap_id - 1]['speed']].index.tolist()],
color='green', label='faster than the lap before')
# slower than the lap before
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['speed'] > telemetry_laps[lap_id - 1]['speed']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['speed'] > telemetry_laps[lap_id - 1]['speed']].index.tolist()],
color='red', label='slower than the lap before')
# current fastest
plt.scatter([car_positions_laps['x'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['speed'] == telemetry_laps[lap_id]['min_speed']].index.tolist()],
[car_positions_laps['y'][lap_id][j] for j in telemetry_laps[lap_id][
telemetry_laps[lap_id]['speed'] == telemetry_laps[lap_id]['min_speed']].index.tolist()],
color='purple', label='current fastest')
plt.plot(cone_y['x'], cone_y['y'], color='y', label='yellow cones')
plt.plot(cone_b['x'], cone_b['y'], color='b', label='blue cones')
plt.legend()
plt.grid(color='lightgray', linestyle='--')
plt.show()
return telemetry_laps
def display_track_layout_depending_on_gears_used(telemetry, car_pose, cone_y, cone_b):
""" Display track layout in different colors depending on the gears used
parameters:
1. telemetry data to plot
2. dict of car positions
3. dict of yellow cones position
4. dict of blue cones position
"""
color_list = ['cyan', 'aqua', 'aquamarine', 'lightblue', 'blue', 'darkblue']
for i in range(6):
plt.scatter([car_pose['x'][j] for j in telemetry[telemetry['gear'] == i].index.tolist()],
[car_pose['y'][j] for j in telemetry[telemetry['gear'] == i].index.tolist()],
color=color_list[i], label='gear ' + str(i))
plt.plot(cone_y['x'], cone_y['y'], color='y', label='yellow cones')
plt.plot(cone_b['x'], cone_b['y'], color='b', label='blue cones')
plt.legend()
plt.grid(color='lightgray', linestyle='--')
plt.show()
def display_track_layout_depending_on_brake_pressure(telemetry, car_pose, cone_y, cone_b):
"""
Display track layout in different colors depending on the brake pressure
parameters:
1. telemetry data to plot
2. dict of car positions
3. dict of yellow cones position
4. dict of blue cones position
"""
plt.scatter([car_pose['x'][j] for j in telemetry[telemetry['brake'] == 0].index.tolist()],
[car_pose['y'][j] for j in telemetry[telemetry['brake'] == 0].index.tolist()],
color='coral', label='brake pressure == 0')
plt.scatter([car_pose['x'][j] for j in telemetry[
(telemetry['brake'] > 0) & (telemetry['brake'] <= telemetry['brake'].mean())].index.tolist()],
[car_pose['y'][j] for j in telemetry[
(telemetry['brake'] > 0) & (telemetry['brake'] <= telemetry['brake'].mean())].index.tolist()],
color='tomato', label='brake pressure <= mean')
plt.scatter([car_pose['x'][j] for j in telemetry[
(telemetry['brake'] > 0) & (telemetry['brake'] <= telemetry['brake'].mean())].index.tolist()],
[car_pose['y'][j] for j in
telemetry[(telemetry['brake'] < 1) & (telemetry['brake'] > telemetry['brake'].mean())].index.tolist()],
color='red', label='brake pressure >= mean')
plt.scatter([car_pose['x'][j] for j in telemetry[telemetry['brake'] == 1].index.tolist()],
[car_pose['y'][j] for j in telemetry[telemetry['brake'] == 1].index.tolist()],
color='crimson', label='brake pressure == 1')
plt.plot(cone_y['x'], cone_y['y'], color='y', label='yellow cones')
plt.plot(cone_b['x'], cone_b['y'], color='b', label='blue cones')
plt.legend()
plt.grid(color='lightgray', linestyle='--')
plt.show()
def animate_percentage(path, percent):
"""
Display telemetry data as percentage
parameter:
1. array of percentage of data to display
"""
fig, ax = plt.subplots()
# X and Y coordinates of the center bottom of the needle starting from the top left corner
# of the image
x = 825
y = 825
loc = (x, y)
# ims is a list of lists, each row is a list of gauge to draw in the
# current frame; here we are just animating one gauge, the image, in
# each frame
ims = []
for i in range(len(percent)):
rotation = 180 * percent[i] # 180 degrees because the gauge is half a circle
rotation = 90 - rotation # Factor in the needle graphic pointing to 50 (90 degrees)
dial = Image.open(os.path.join(path, "needle.png"))
dial = dial.rotate(rotation, resample=PIL.Image.BICUBIC, center=loc) # Rotate needle
gauge = Image.open(os.path.join(path, "gauge.png"))
gauge.paste(dial, mask=dial) # Paste needle onto gauge
im = ax.imshow(gauge, animated=True)
if i == 0:
ax.imshow(gauge) # show an initial one first
ims.append([im])
plt.close()
ani = animation.ArtistAnimation(fig, ims, interval=50, blit=True)
rc('animation', html='html5')
return ani
def get_cones_positions(coords, ds=0.0002):
"""
reference:
https://stackoverflow.com/questions/57065080/draw-perpendicular-line-of-fixed-length-at-a-point-of-another-line
parameters:
1. coodinates of circuit
2. distance between cones and circuit
return:
1. positions of yellow cones
2. positions of blue cones
"""
cones_yellow = []
cones_blue = []
coords = np.array(coords)
for i in range(1, len(coords)):
slope = (coords[i][1]-coords[i-1][1])/(coords[i][0]-coords[i-1][0])
if not math.isinf(slope):
dy = math.sqrt(ds**2/(slope**2+1))
dx = -slope*dy
else:
dy = 0
dx = ds
if (not math.isinf(slope) and np.sign(coords[i][1]-coords[i-1][1]) != np.sign(slope)) or (slope == 0 and coords[i][0]<coords[i-1][0]) or (math.isinf(slope) and coords[i][1]>coords[i-1][1]):
cones_blue.append({'x' : coords[i-1][0]+dx, 'y': coords[i-1][1]+dy})
cones_yellow.append({'x' : coords[i-1][0]-dx, 'y': coords[i-1][1]-dy})
else:
cones_yellow.append({'x' : coords[i-1][0]+dx, 'y': coords[i-1][1]+dy})
cones_blue.append({'x' : coords[i-1][0]-dx, 'y': coords[i-1][1]-dy})
if np.sign(coords[i][1]-coords[i-1][1]) != np.sign(slope):
cones_blue.append({'x' : coords[i][0]+dx, 'y': coords[i][1]+dy})
cones_yellow.append({'x' : coords[i][0]-dx, 'y': coords[i][1]-dy})
else:
cones_yellow.append({'x' : coords[i][0]+dx, 'y': coords[i][1]+dy})
cones_blue.append({'x' : coords[i][0]-dx, 'y': coords[i][1]-dy})
plt.figure(figsize=(15,10))
plt.plot([item[0] for item in coords], [item[1] for item in coords], '-o', c='coral', label='track layout')
plt.plot([item["x"] for item in cones_yellow], [item["y"] for item in cones_yellow], '-o', c='gold', label='yellow cones')
plt.plot([item["x"] for item in cones_blue], [item["y"] for item in cones_blue], '-o', c='turquoise', label='blue cones')
plt.plot([cones_yellow[0]["x"], cones_blue[0]["x"]], [cones_yellow[0]["y"], cones_blue[0]["y"]], '--s', c='orange', label='start line')
plt.grid(color='lightgray',linestyle='--')
plt.legend()
plt.show()
return cones_yellow, cones_blue
def generate_ellipse_shaped_track(x0, y0, rx, ry, path, filename, distance=3):
"""
parameters:
1. x0 : x-position of the center
2. y0 : y-position of the center
3. rx : radius on the x-axis of track layout
4. ry : radius on the y-axis of track layout
5. file path
6. file name
7. distance between the cones and track layout
return:
1. infomation of the track
"""
t = np.linspace(0, 2 * pi, 100)
tx = x0 - rx * np.cos(t)
ty = y0 - ry * np.sin(t)
cones = []
yellow_cones, blue_cones = get_cones_positions(list(zip(tx, ty)), distance)
cones.append({
"position": [float(yellow_cones[0]['x']), float(yellow_cones[0]['y']), 0],
"type": "big orange"
})
cones.append({
"position": [float(blue_cones[0]['x']), float(blue_cones[0]['y']), 0],
"type": "big orange"
})
for i in range(1, len(yellow_cones)):
cones.append({
"position": [float(yellow_cones[i]['x']), float(yellow_cones[i]['y']), 0],
"type": "small yellow"
})
for i in range(1, len(blue_cones)):
cones.append({
"position": [float(blue_cones[i]['x']), float(blue_cones[i]['y']), 0],
"type": "small blue"
})
data = {
"name": "ellipse",
"description": "ellipse-shaped track layout",
"init_offset": 2, # move vehicle 2 meters back to align with start line
"cones": cones
}
with open(os.path.join(path, filename), 'w') as outfile:
yaml.dump(data, outfile, default_flow_style=False)
return data
def get_circuits_and_cones_postions_from_geojson(path, distance=0.0002):
"""
parameters:
1. geojson file path
2. distance between cones and circuit
return:
1. coordinates of circuits
2. positions of cones
"""
geojsons = glob.glob(os.path.join(path + '/*.geojson'))
circuits = {}
postions_cones = {}
cones = []
for idx, geojson in enumerate(geojsons):
with open(geojson, 'r') as j:
contents = json.loads(j.read())
coords = contents['features'][0]['geometry']["coordinates"]
yellow_cones, blue_cones = get_cones_positions(coords, distance)
cones.append({
"position": [float(yellow_cones[0]['x']), float(yellow_cones[0]['y']), 0],
"type": "big orange"
})
cones.append({
"position": [float(blue_cones[0]['x']), float(blue_cones[0]['y']), 0],
"type": "big orange"
})
for i in range(1, len(yellow_cones)):
cones.append({
"position": [float(yellow_cones[i]['x']), float(yellow_cones[i]['y']), 0],
"type": "small yellow"
})
for i in range(1, len(blue_cones)):
cones.append({
"position": [float(blue_cones[i]['x']), float(blue_cones[i]['y']), 0],
"type": "small blue"
})
data = {
"name": geojson[-15:-8],
"description": "F1 circuits " + geojson[-15:-8],
"init_offset": 2, # move vehicle 2 meters back to align with start line
"cones": cones
}
circuits[idx] = {"x": [item[0] for item in coords], "y": [item[1] for item in coords]}
postions_cones[idx] = cones
with open(os.path.join(path, geojson[-15:-8] + '.yaml'), 'w') as outfile:
yaml.dump(data, outfile, default_flow_style=False)
return circuits, postions_cones