Introduction to the Digital Twin Part 4: Predictors and WindFields¶
In the first three notebooks, we have created some simulated scenarios and used the evolve method to step through them, but we haven't looked in any detail at what happens when we call evolve.
Essentially, the task of telling the simulation where an aircraft will move to in a time step, is done by a Predictor. The design of bluebird_dt is such that we have a common interface for Predictors, so users can choose between a selection of implemented classes, or, experienced developers can create their own.
Let's instantiate a simulated scenario, and take a look at the Predictor.
from bluebird_dt.simulator import Simulator
sim = Simulator.from_category("Springfield", "example-scenario")
sim.manager.predictor
<bluebird_dt.predictor.simple_predictor.SimplePredictor at 0x107a969b0>
We can see that we're using a "SimplePredictor". We can look at the code for this in bluebird_dt/predictor/simple_predictor.py or on the auto-generated code docs in the web documentation.
help(sim.manager.predictor)
Help on SimplePredictor in module bluebird_dt.predictor.simple_predictor object: class SimplePredictor(bluebird_dt.predictor.predictor.Predictor) | SimplePredictor(dt: 'float', fix_proximity_threshold: 'float', fixes: 'Fixes | None' = None) | | Evolves Aircraft with constant speed and vertical speed along their heading. | | Method resolution order: | SimplePredictor | bluebird_dt.predictor.predictor.Predictor | abc.ABC | builtins.object | | Methods defined here: | | __init__(self, dt: 'float', fix_proximity_threshold: 'float', fixes: 'Fixes | None' = None) | Construct a new instance. | | Parameters | ---------- | dt: float | The internal step time of the predictor. The time taken between each control point within the returned | Trajectory is dt. | fix_proximity_threshold: float | When an Aircraft is <= fix_proximity_threshold distance from the next Fix on its Route, the Fix is | considered to have been passed. | The next target Fix on Route for the Aircraft is then updated. | fixes: Fixes | None | Fixes from the airspace that aircraft are flying in | | distance_for_fl_change(self, aircraft: 'Aircraft') -> 'float | None' | Calculate the horizontal distance needed to change from the current altitude to the target altitude, using the | current vertical speed and ground speed. | | Parameters | ---------- | aircraft: Aircraft | | Returns | ------- | float | None | The horizontal distance needed for the flight level change, in nautical miles | | update_aircraft_state(self, aircraft: 'Aircraft', time_evolve: 'float', wind_field: 'WindField | None') | Update aircraft state over the period time_evolve. Currently this includes updating the aircraft speeds, | heading, and flight level. | | Parameters | ---------- | aircraft: Aircraft | the aircraft to update | time_evolve: float | Amount of time [sec] to evolve the Aircraft for | wind_field: WindField | The wind field to use to find ground speed and ground track angle from the true airspeed and heading | | update_flight_level(self, aircraft: 'Aircraft', time_evolve: 'float') | Move Aircraft vertically and update its flight level. | | Parameters | ---------- | aircraft: Aircraft | An aircraft | time_evolve: float | Amount of time [sec] to evolve the Aircraft for | | update_speeds(self, aircraft: 'Aircraft', wind_field: 'WindField | None') | Update Aircraft (total and vertical) speeds. | The horizontal TAS is set to the cleared_cas value. | | Parameters | ---------- | aircraft: Aircraft | An Aircraft | wind_field: Wind_Field | Wind field aircraft is flying through | | update_total_speeds_cas_is_tas(self, aircraft: 'Aircraft') | Updates total speeds of a selected aircraft by writing the selected_instructions.cas value directly to the true | airspeed (speed_tas) value. Mach values are ignored. This method is called when the flag "use_cas_as_tas" is set | to True. If the selected_instructions.cas value is None, then an error is raised: this must be set to use this | simplified speed modelling. | | Parameters | ---------- | aircraft: Aircraft | A selected aircraft for which to change the horizontal speeds. | | update_vertical_speeds(self, aircraft: 'Aircraft') | This method updates the aircraft vertical speed, considering the cleared vertical speed of the aircraft. | | Parameters | ---------- | aircraft: Aircraft | A selected aircraft | | ---------------------------------------------------------------------- | Data and other attributes defined here: | | __abstractmethods__ = frozenset() | | ---------------------------------------------------------------------- | Methods inherited from bluebird_dt.predictor.predictor.Predictor: | | distance_and_turn_radius_from_angle(self, aircraft: 'Aircraft', turn_angle: 'float', speed_kts: 'float') -> 'tuple[float, float]' | Calculate the turn radius and distance required to initiate the turn | for a given turn angle at a given ground speed. | | Parameters | ---------- | aircraft: Aircraft | The aircraft executing the turn | turn_angle: float | The change in heading required in degrees | speed_kts: float | The aircraft's horizontal ground speed in knots | | Returns | ------- | float | The distance before the fix to start the turn | float | The radius of the turn in nautical miles | | distance_before_fix_to_start_turn(self, aircraft: 'Aircraft', speed_kts: 'float') -> 'tuple[float, float]' | Calculate the distance before the next fix that an aircraft should start turning to end up on the next route leg | | Parameters | ---------- | aircraft: Aircraft | The aircraft for which to calculate the distance | speed_kts: float | The aircraft's horizontal ground speed in knots | | Returns | ------- | float | The distance before the fix to start turning towards the following fix | float | The radius of the turn in nautical miles | | get_target_pos(self, aircraft: 'Aircraft') -> 'Pos2D' | Get Pos2D of next Fix on Route. | | Parameters | ---------- | aircraft: Aircraft | A selected aircraft | | Returns | ------- | target_pos: Pos2D | Position of the next fix on the Aircraft route | | predict_aircraft(self, aircraft: 'Aircraft', delta_t: 'float', environment_time: 'float' = 0.0, wind_field: 'WindField | None' = None, deepcopy_aircraft: 'bool' = True) -> 'Aircraft' | Evolve an aircraft to a time delta_t in the future. Normally delta_t is the radar period. | | Parameters | ---------- | aircraft: Aircraft | The aircraft to evolve | delta_t: float | The duration [sec] over which to evolve the aircraft | environment_time: float, optional | Current time | wind_field: WindField | None, optional | The current wind field | deepcopy_aircraft: bool, optional | Whether to make a deepcopy of the aircraft so that the aircraft object passed by reference is unchanged, | defaults to True. | | Returns | ------- | Aircraft | The aircraft evolved to a time delta_t in the future | | predict_trajectory(self, aircraft: 'Aircraft', delta_t: 'float', environment_time: 'float' = 0.0, wind_field: 'WindField | None' = None, deepcopy_aircraft: 'bool' = True) -> 'list[Pos4D] | None' | Calculate the trajectory of an aircraft for a time period delta_t. | | Parameters | ---------- | aircraft: Aircraft | The aircraft to calculate a trajectory for | delta_t: float | The duration [sec] for which to calculate the trajectory | environment_time: float, optional | Current time | wind_field: WindField | None, optional | The current wind field | deepcopy_aircraft: bool, optional | Whether to make a deepcopy of the aircraft so that the aircraft object passed by reference is unchanged, | defaults to True. | | Returns | ------- | List[Pos4D] | None | The predicted trajectory, or None if there are fewer than two points in the trajectory | | rate_of_route_turn(self, speed_kts: 'float', turn_angle: 'float') -> 'float' | Calculate the rate of turn in degrees per second for an aircraft at a given speed. | | Parameters | ---------- | speed_kts: float | The aircraft's horizontal ground speed in knots | turn_angle: float | The angle of the turn in degrees | | Returns | ------- | float | The rate of turn in degrees per second | | set_integration_step(self, internal_step_max: 'float') -> 'None' | Change the internal time integration step maximum for the Euler method. | | Parameters | ---------- | internal_step_max: float | The maximum internal integration time step | | update_ground_speed(self, aircraft: 'Aircraft', wind_field: 'WindField | None') | This method updates the aircraft ground speed attributes, based on current speed_tas, | vertical_speed and heading. | | Parameters | ---------- | aircraft: Aircraft | A selected aircraft | wind_field: WindField | The wind field the aircraft is flying through | | update_position(self, aircraft: 'Aircraft', time_evolve: 'float', wind_field: 'WindField | None', use_turn_model: 'bool' = True) | Move aircraft laterally and update its heading if required. | | Parameters | ---------- | aircraft: Aircraft | An aircraft | time_evolve: float | Amount of time [sec] to evolve the Aircraft for | wind_field: WindField | None | The wind field to use to find ground speed and ground track angle from the true airspeed and heading | use_turn_model: bool | Flag indicating whether to use turn model or just instantaneously update heading. Default is True. | | update_position_with_turn_model(self, aircraft: 'Aircraft', time_evolve: 'float', wind_field: 'WindField | None') | Move aircraft and update its heading if required using turn model. | | Parameters | ---------- | aircraft: Aircraft | Selected aircraft to update heading of | time_evolve: float | Amount of time [sec] to evolve the Aircraft for | wind_field: WindField | None | The wind field to use to find ground speed and ground track angle from the true airspeed and heading | | update_position_without_turn_model(self, aircraft: 'Aircraft', time_evolve: 'float', wind_field: 'WindField | None') | Move aircraft laterally and update its heading, without using a turn model. Instead, the aircraft heading is | determined using the bearing to the next fix position, if on route, or from the heading_changing_to attribute | if not on route. | | Parameters | ---------- | aircraft: Aircraft | Selected aircraft to update heading of | time_evolve: float | Amount of time [sec] to evolve the Aircraft for | wind_field: WindField | The wind field to use to find ground speed and ground track angle from the true airspeed and heading | | ---------------------------------------------------------------------- | Data descriptors inherited from bluebird_dt.predictor.predictor.Predictor: | | __dict__ | dictionary for instance variables (if defined) | | __weakref__ | list of weak references to the object (if defined)
One important thing to note with the SimplePredictor is that it doesn't have an intrinsic way to determine tas, the true air speed of an aircraft. Instead, it uses cas, the cleared air speed.
Other predictors are being developed that can use look-up tables for different aircraft types to give a better estimate for the true air speed.
Rolling out a trajectory using a predictor¶
If you are creating an agent, it might be useful to use a Predictor to look at hypothetical trajectories for aircraft (e.g. to see if they will conflict with other aircraft in the future). For this, we can use the predict_trajectory method that is supplied by all Predictors.
from copy import deepcopy
from bluebird_dt.predictor import SimplePredictor
sim = Simulator.from_category("Artificial", "I-Sector Two Aircraft")
pred = SimplePredictor(dt=1, fix_proximity_threshold=0.5, fixes=sim.manager.environment.airspace.fixes)
# make a copy of one of the aircraft in the simulation, so we can see how it might evolve if we give it a target heading and FL
candidate_aircraft = deepcopy(sim.manager.environment.aircraft["AIR0"])
candidate_aircraft.on_route = False
# look at trajectory over the next 60 seconds if we don't change anything
traj = pred.predict_trajectory(candidate_aircraft, 60)
# plotting the trajectory
import matplotlib.pyplot as plt
import os
import shutil
from IPython.display import SVG, Image
%matplotlib inline
from bluebird_dt.core import Pos2D
from bluebird_dt.render import Radar
view_centre = Pos2D(candidate_aircraft.lat, candidate_aircraft.lon)
view_width = 4
aspect_ratio = 1
radar = Radar(view_centre, view_width, aspect_ratio)
radar.draw_trajectory(traj)
# see what happens if we change the aircraft heading to 270 and flight level to 360
candidate_aircraft.heading_changing_to = 270
candidate_aircraft.selected_instructions.fl = 360
traj = pred.predict_trajectory(candidate_aircraft, 60)
radar = Radar(view_centre, view_width, aspect_ratio)
radar.draw_trajectory(traj)
So we can see that the aircraft makes a turn, but it is not instantaneous - there is a relatively small heading change per second in the predictor's model of the aircraft behaviour.
Using a different Predictor¶
We can specify a predictor when we create the Simulator instance by calling from_category. As an example, let's use the RouteFollowPredictor:
from bluebird_dt.predictor import RouteFollowPredictor
pred = RouteFollowPredictor(dt=1, fix_proximity_threshold = 0.5, fixes=None)
sim = Simulator.from_category("Springfield", "example-scenario", predictor = pred)
type(sim.manager.predictor)
bluebird_dt.predictor.route_follow_predictor.RouteFollowPredictor
Making our own Predictor¶
It is not recommended, unless you really know what you are doing, but the modular BluebirdATC code makes it possible for people to implement their own predictors, as long as they inherit from the Predictor base class in bluebird-dt/predictors/predictor.py.
The only method that is mandatory to override is predict, which looks like:
@abstractmethod
def _predict(
self,
aircraft: Aircraft,
time_evolve: float,
environment_time: float = 0.0,
wind_field: WindField | None = None,
) -> tuple[list[Pos4D], Aircraft]:
from typing_extensions import override
from bluebird_dt.core import Aircraft, Pos4D, WindField
from bluebird_dt.predictor import Predictor
class StayStillPredictor(Predictor):
"""
A dummy example of a predictor, that makes all aircraft stay where they are.
"""
@override
def _predict(self, aircraft: Aircraft, time_evolve: float, environment_time: float = 0.0, wind_field: WindField | None = None):
num_steps is time_evolve // self.dt
control_points = []
for i in range(num_steps):
cp = Pos4D(lat=aircraft.lat, lon=aircraft.lon, fl=aircraft.fl, time=environment_time+i*self.dt)
control_points.append(cp)
return control_points, aircraft
pred = StayStillPredictor(dt=1.0, fix_proximity_threshold=0.5, fixes=None)
sim = Simulator.from_category("Springfield", "example-scenario", predictor=pred)
print(type(sim.manager.predictor))
<class '__main__.StayStillPredictor'>
Wind fields¶
The predictors are responsible for updating the positions of aircraft between time steps. In order to do this, they may take account of the wind in the area. In bluebird_dt, wind can be represented by a WindVector, giving the strength and direction of the wind, and a WindField, which is a collection of WindVectors at different points in space.
We can create a simple uniform wind field, with a wind speed of 20m/s, blowing from the East:
from bluebird_dt.core import WindField
wf = WindField.uniform(wind_speed=20, wind_direction=90)
We can visualise the effect of this wind field on a simple Two Aircraft scenario. When we first create the scenario, both aircraft will be "on_route", meaning they will fly directly over the fixes on their specified route. We can set one of the aircraft ("AIR0") to no longer be on route, to see the effect of the wind.
# Start up scenario with the default "SimplePredictor" predictor.
sim = Simulator.from_category("Artificial", "I-Sector Two Aircraft")
# set on_route to False for one aircraft
sim.manager.environment.aircraft["AIR0"].on_route = False
# set the wind field
sim.manager.environment.wind_field = wf
import matplotlib.pyplot as plt
%matplotlib inline
import IPython.display
from bluebird_dt.core import Pos2D
from bluebird_dt.render import Radar
view_centre = Pos2D.from_str("50.75N 3.5W")
view_width = 60.0 # [nmi] about 1 degree of longitude
aspect_ratio = 1
radar = Radar(view_centre, view_width, aspect_ratio) # re-introduce the spines/axes
for _ in range(200):
sim.evolve(6.0)
figure, ax = radar.draw(sim.manager.environment)
IPython.display.display(figure) # or IPython.display.display(plt.gcf())
IPython.display.clear_output(wait=True)
We can see that while "AIR1" continues to follow it's route, "AIR0" is being blown to the west by the wind.
Conclusions¶
In this notebook we have looked at Predictors, which contain the code used by the simulation to update aircraft positions. Key points are:
- The modular design of
bluebird-dtmeans that users can choose which Predictor to use in their scenario, and even create their own (though this is recommended for experts only). - Predictors are used "under the hood" to update aircraft positions when we call
Simulator.evolve, but they can also be used (e.g. by Agents) to roll out hypothetical trajectories. - One of the inputs that Predictors can use in their calculations of aircraft position, is a WindField, defined as a set of WindVectors at different points in space.
- Depending on how it is implemented in the Predictor, wind may blow an aircraft away from it's nominal heading, but if the aircraft's
on_routeattribute is set, it should continue to follow the route.