Note
Click here to download this notebook and run this example.
(AI)r Traffic Controller Challenge¶
Welcome to the (AI)r Traffic Controller Challenge! This is your starting point for the competition. It walks you through the simulation environment, shows you how to build a working agent, and suggests where to start with development.
What is Air Traffic Control?¶
The general concepts of air traffic control are defined in the ATC concepts section of the documentation. In this competition, your agent plays the role of the ATCO. At each simulation step it receives the current state of the airspace and outputs a set of clearances (instructions) for each aircraft.
The Competition¶
The competition runs in three phases:
| Phase | Benchmark | Airspace | Release |
|---|---|---|---|
| Warm-up (4 weeks) | Flight School | X-Plus sector | 6 July 2026 |
| Development (12 weeks) | Flight School + (AI)r Traffic Control | X-Plus & Springfield | 3 August 2026 |
| Assessment | Final evaluation by ATCOs | Springfield | Deadline 26 October 2026 |
Flight School uses the simpler X-Plus sector. Aircraft spawn continuously and you must keep them separated for as long as possible. Your score is the number of seconds before the first violation (a loss of separation or sector excursion).
(AI)r Traffic Control (Springfield) is released later and uses a realistic sector with a fixed scenario. Your score is a hierarchical ranking across seven safety and efficiency metrics.
Installing the digital twin gymnasium¶
There are different ways to use the digital twin described in the getting started documentation.
This notebook will guide you with using digital twin gymnasium for the competition. It is available on pypi and can be installed using
%pip install bluebird-dt bluebird-gymnasium
Section 1 — The X-Plus Sector and the Flight School Benchmark¶
1.1 Generating the X-Plus airspace¶
The X-Plus sector is an artificial, cross-shaped airspace designed for the competition warm-up phase. It has four arms converging on a central hub fix called EGL. Aircraft enter from the tip of one arm and must be guided to the correct exit at the opposite or adjacent arm tip.
Because it is artificial and procedurally generated it is simpler than a real-world sector — which makes it the right place to start.
ArtificialAirspace("xplus").generate_airspace() returns two things:
airspace— anAirspaceobject holding sector geometry (the physical shape of the controlled volume) and the locations of all navigation fixes.routes— a list ofRouteobjects. Each route is a legal path through the sector defined as an ordered list of fixes (e.g.SWIFT → EGL → QUAIL).
import math
import matplotlib.pyplot as plt
import numpy as np
# Tell matplotlib to draw figures inline in the notebook (rather than opening a window)
%matplotlib inline
# ── Bluebird DT: airspace and scenario setup ──────────────────────────────────
from bluebird_dt.airspace_generator.artificial_airspace import ArtificialAirspace
from bluebird_dt.simulator.common import list_sim_scenarios
from bluebird_dt.core import Pos2D
from bluebird_dt.render import Radar
# ── Bluebird Gymnasium: the reinforcement-learning / agent interface ───────────
from bluebird_gymnasium.envs import BaseEnv, InfiniteEnv
# ──────────────────────────────────────────────────────────────────────────────
# Generate the X-Plus airspace.
# This creates a synthetic cross-shaped sector from scratch.
airspace, routes = ArtificialAirspace("xplus").generate_airspace()
print("Airspace and routes loaded successfully.")
1.2 Exploring routes¶
Every Route object has a filed attribute — this is the list of fix names the aircraft is planned to pass through (the "filed flight plan"). Printing all filed routes gives you a complete picture of how aircraft move through the sector.
# Each Route object has a `.filed` attribute — a list of fix names in order.
# For example ["SWIFT", "EGL", "QUAIL"] means: enter at SWIFT, pass EGL, exit at QUAIL.
print("All routes through X-Plus:")
for route in routes:
print(" → ".join(route.filed))
1.3 Sector boundary and fix positions¶
Your agent can use the sector geometry directly — for example to check whether an aircraft is close to the boundary, or to compute distances to fixes. Below we access two useful data structures:
airspace.fixes.places— a dictionary mapping fix name →Pos2D(latitude/longitude). Use this to look up where each waypoint is on the map.airspace.sectors— a dictionary mapping sector name →Sector. EachSectorhas a.boundary()method that returns its polygon, and.boundary().boundary_verticesfor the raw lat/lon corner points.
These are the same objects the Radar display and the simulator use internally — there is no hidden information.
# ── Fix positions ─────────────────────────────────────────────────────────────
# airspace.fixes.places is a dict: { fix_name: Pos2D(lat, lon) }
# Pos2D is BluebirdDT's lat/lon coordinate type.
print("Fix name | Latitude | Longitude")
print("-" * 40)
for fix_name, pos in airspace.fixes.places.items():
print(f" {fix_name:<10} {pos.lat:.4f}°N {pos.lon:.4f}°")
print()
# ── Sector boundary ───────────────────────────────────────────────────────────
# airspace.sectors is a dict: { sector_name: Sector }
# Each Sector's boundary() returns a polygon; boundary_vertices gives the corners.
print("Sector boundary vertices (lat, lon):")
for sector_name, sector in airspace.sectors.items():
print(f"\n Sector: {sector_name}")
for v in sector.boundary().boundary_vertices:
print(f" ({v.lat:.4f}, {v.lon:.4f})")
1.4 Setting up the Flight School benchmark (InfiniteEnv)¶
The actual competition benchmark uses an Infinite scenario — aircraft keep spawning continuously and you must keep them separated for as long as possible. The spawn rate increases over time, making it progressively harder.
InfiniteEnv wraps this inside a standard Gymnasium environment. Gymnasium is a widely used Python library for reinforcement-learning environments (although you can use it to interface with any type of agent). It gives you a clean reset() / step() interface:
reset(seed=...)— starts a new episode, places aircraft at their initial positions, and returns the first observation for each aircraftstep(actions)— advances the simulation by one time step given a dict of actions (one per aircraft); returns new observations, rewards, done flags, and extra info
The config dictionary controls everything about how the environment behaves:
state_repr_config— which observation features to compute per aircraft (the "encoder")action_config— which clearance types the agent can issue (left/right turns, climb/descend, etc.)reward_config— which reward function terms to use and their weights
The extra_minimal encoder produces a compact vector per aircraft containing:
- The angular difference between the aircraft's heading and the bearing to its next fix (how far off-track it is)
- The distance from the route centreline
- For each tracked neighbour: the relative heading angle and distance to that neighbour
The reward in this example combines:
position_status_const(weight 1.0): penalty if an aircraft exits incorrectlylateral_centreline_distance_shaped(weight 1.0): reward for staying on the route centrelinesafety_simple_avoidance_exp(weight 1.2): exponentially increasing reward as separation grows
The reward is a useful training signal for RL agents, but your Flight School score is the number of seconds before the first violation — not the gym reward.
We also set up the Radar display object here and draw it after reset() so the first image actually contains aircraft. Radar takes a centre point, a width in nautical miles, and an aspect ratio; radar.draw(environment) returns a matplotlib figure.
Want to explore the available encoders, reward functions, or more config options?
- Observation encoders:
bluebird-gymnasium/bluebird_gymnasium/state_repr/- Reward functions:
bluebird-gymnasium/bluebird_gymnasium/rewards/
from bluebird_gymnasium.envs.infinite import ScenarioName
# ── Build the environment config ──────────────────────────────────────────────
# get_default_env_config() returns a config object populated with sensible defaults.
# We then modify individual fields to suit the competition setup.
cfg_infinite = InfiniteEnv.get_default_env_config()
# State representation: use the "extra_minimal" encoder.
# This produces a short feature vector per aircraft — small enough to train on quickly.
cfg_infinite.state_repr_config = {
"encoder_cls": "extra_minimal",
"k_nearest_aircraft": 1, # include info about the 1 closest neighbour
}
# Action space: which clearances the agent can give.
# Here we only allow lateral (heading) commands — left and right turns of 10°.
# You can also enable vertical commands: "simple_fl_climb", "simple_fl_descent".
cfg_infinite.action_config = {
"simple_heading_left": [10], # turn left 10 degrees
"simple_heading_right": [10], # turn right 10 degrees
}
# Reward function: a weighted combination of named reward terms.
# These are summed each step to give a single scalar reward per aircraft.
cfg_infinite.reward_config = {
"fns": [
"position_status_const",
"lateral_centreline_distance_shaped",
"safety_simple_avoidance_exp",
],
"coeffs": [1.0, 1.0, 1.2],
}
# Which sector to use — sector_xplus is the Flight School benchmark sector.
cfg_infinite.scenario_config["scenario_name"] = ScenarioName.sector_xplus
# "decentralized" view: each aircraft gets its own observation vector.
# This is the natural setup for multi-agent RL — each agent only sees its local view.
# Switch to "centralized" if you want a single flat observation for all aircraft combined.
cfg_infinite.view_config["type"] = "decentralized"
cfg_infinite.view_config["decentralized_params"] = {}
# How many simulated seconds to run before automatically ending the episode.
# The benchmark ends earlier if there is a loss of separation or sector excursion.
cfg_infinite.scenario_duration = 600 # 10 minutes
# ── Set up Radar renderer ─────────────────────────────────────────────────────
# X-Plus is centred roughly at (50.75°N, 3.5°W) and is ~100 nm across.
# We create the Radar object now but draw it after reset() so aircraft are visible.
view_centre_xplus = Pos2D.from_str("50.75N 3.5W")
view_width_xplus = 100.0 # nautical miles
aspect_ratio = 1.0
radar_xplus = Radar(
view_centre_xplus, view_width_xplus, aspect_ratio, sector_name="sector_xplus"
)
1.5 The action space and the Actions enum¶
Every clearance your agent can issue is represented as an integer. env.step() expects a dict of the form {callsign: int}. Those integers map to actions defined in action_config — but raw integers are error-prone to read and write.
actions_to_enum reads the action space directly from the environment and builds a Python Enum so you can write Actions.LEFT_10.value instead of 2 (or whatever the index happens to be). The enum is built once here and reused throughout the notebook.
The available actions depend entirely on what you put in action_config when you built the environment. If you add climbs and descents later, the enum will automatically include CLIMB_* and DESCEND_* members.
from enum import Enum
def actions_to_enum(env: BaseEnv) -> Enum:
"""
Read the action space from a bluebird-gymnasium environment and return a
Python Enum that maps human-readable names to integer action indices.
Each action the environment supports is assigned an integer index internally
(0, 1, 2, …). The enum maps descriptive names to those indices so your agent
code can write Actions.LEFT_10.value instead of a raw integer.
The name already encodes the full action:
LEFT_10 → turn left 10 degrees (from "simple_heading_left": [10] in action_config)
RIGHT_10 → turn right 10 degrees
NOOP → do nothing this step
So Actions.LEFT_10.value is the correct integer to pass to env.step() for
a 10-degree left turn — no further combination is needed.
"""
actions_map = env.get_action_parser().action_formatter_map
new_actions_map = {}
for action_int, action_name in actions_map.items():
parts = action_name.split("__")
base_name = parts[0]
magnitude = parts[1] if len(parts) == 2 else None
if base_name == "action_noop":
enum_name = "NOOP"
elif base_name == "simple_heading_left":
enum_name = "LEFT"
elif base_name == "simple_heading_right":
enum_name = "RIGHT"
elif base_name == "simple_fl_descent":
enum_name = "DESCEND"
elif base_name == "simple_fl_climb":
enum_name = "CLIMB"
elif base_name == "simple_heading_route_parallel":
enum_name = "ROUTE_PARALLEL"
else:
enum_name = base_name.upper()
if magnitude is not None:
enum_name = f"{enum_name}_{magnitude}"
new_actions_map[enum_name] = action_int
return Enum("Actions", list(new_actions_map.items()))
# Build the Actions enum for our infinite environment so we can reference
# actions by name (Actions.NOOP, Actions.LEFT_10, etc.) throughout the notebook.
env_infinite = InfiniteEnv(config=cfg_infinite)
Actions = actions_to_enum(env_infinite)
print("Available actions:")
for a in Actions:
print(f" {a.name:20s} = integer index {a.value}")
print()
1.6 Watching the simulation¶
Before writing an agent it is useful to watch the simulation run with no interventions — to see natural aircraft behaviour, where routes cross, and where conflicts tend to emerge.
Each call to env.step() advances the simulation by one 6-second tick. We pass NOOP for every active aircraft (no clearances) and animate the radar after each step. The loop below runs 50 steps — 5 minutes of simulated time.
import IPython.display
# Create the environment, then reset with a seed for a reproducible spawn pattern.
# reset() spawns the first batch of aircraft and returns their initial observations.
env_infinite = InfiniteEnv(config=cfg_infinite)
seed = 42
obs_inf, info_inf = env_infinite.reset(seed=seed)
print("Aircraft in initial state:", list(obs_inf.keys()))
print("Observation vector length per aircraft:", next(iter(obs_inf.values())).shape)
# 6-second steps × 50 iterations = 5 minutes of simulated time.
# 6 s matches the standard UK radar refresh rate.
# obs_inf keys change each step as aircraft enter and exit the sector —
# always rebuild the action dict from the latest obs keys.
for _ in range(50):
noop_actions = {cs: Actions.NOOP.value for cs in obs_inf}
obs_inf, reward_inf, done_inf, trunc_inf, info_inf = env_infinite.step(noop_actions)
figure, ax = radar_xplus.draw(info_inf["simulator_environment"])
IPython.display.display(figure)
IPython.display.clear_output(wait=True)
1.7 Inspecting simulation state programmatically¶
At any point in a simulation you can access the full state of every aircraft directly from the environment. This is useful for:
- Debugging your agent's decisions
- Writing rules-based logic (e.g. "if an aircraft is within 10 nm of another, turn it away")
- Building custom features on top of the raw state
The raw state lives in info_dict["simulator_environment"], which is a BluebirdDT Environment object. Its .aircraft attribute is a dict mapping callsign → Aircraft.
Each Aircraft object has these key attributes:
| Attribute | Type | Description |
|---|---|---|
callsign |
str | Unique ID for the aircraft (e.g. "AIR01") |
lat, lon |
float | Current position in decimal degrees |
fl |
float | Current flight level (in hundreds of feet) |
heading |
float | Current heading in degrees (0–360, clockwise from North) |
flight_plan |
FlightPlan | Full route and co-ordination conditions |
on_route |
bool | Whether the aircraft is currently following its filed route. In the competition setup this is False, so your agent must issue lateral clearances to guide aircraft towards their co-ordination conditions |
Explore the full
Aircraftclass atbluebird_dt/core/aircraft.py
# Advance the simulation a few steps so aircraft have spawned and moved.
# env.step() requires an action dict {callsign: action_int} for every active aircraft.
# We'll use NOOP (do nothing) here just to get the simulation running.
for _ in range(5):
noop_actions = {ac_id: Actions.NOOP.value for ac_id in obs_inf}
obs_inf, reward_inf, done_inf, trunc_inf, info_inf = env_infinite.step(noop_actions)
# info_inf["simulator_environment"] is the live Environment object.
# .aircraft is a dict of callsign -> Aircraft
sim_environment = info_inf["simulator_environment"]
print("Aircraft state after 5 steps:\n")
print(
f" {'Callsign':<10} {'Lat':>8} {'Lon':>9} {'FL':>5} {'Heading':>8} Next Fix"
)
print(" " + "-" * 68)
for callsign, ac in sim_environment.aircraft.items():
# flight_plan contains the route and co-ordination conditions.
# .route.current is the remaining waypoints (the ones not yet passed).
next_fix = (
ac.flight_plan.route.current[0]
if ac.flight_plan and ac.flight_plan.route.current
else "—"
)
print(
f" {callsign:<10} {ac.lat:>8.4f} {ac.lon:>9.4f} {ac.fl:>5.0f} {ac.heading:>8.1f} {next_fix}"
)
Co-ordination conditions¶
Each aircraft has exit co-ordination conditions — the fix it must pass near when it leaves, and the flight level it must be at. Meeting co-ordinations is exactly what the ATCO (and your agent) must achieve for each aircraft.
In the data, co-ordinations are stored on each aircraft's flight_plan. You can also access them via the simulator's dynamic_data() snapshot as shown below.
# env_infinite.simulator is the underlying BluebirdDT Simulator object.
# env_infinite.simulator_env is the live Environment (aircraft, airspace, coordinations).
sim_internal = env_infinite.simulator
# env_infinite.simulator_env.coordinations is a CoordinationsManager —
# a dict indexed by (callsign, from_sector) → Coordination.
# Each Coordination has:
# .callsign — the aircraft it applies to
# .fix — the named waypoint the aircraft must pass near when it exits
# .fl — the flight level it must be at when it exits
# .from_sector — the sector it is flying out of
# .to_sector — the sector it is flying into (None if leaving controlled airspace)
coordinations_manager = env_infinite.simulator_env.coordinations
print("Co-ordination conditions per aircraft:\n")
if not coordinations_manager.coords:
print(" (no active co-ordinations yet — aircraft may not have entered the sector)")
else:
for (callsign, from_sector), coord in coordinations_manager.coords.items():
if from_sector != "sector_xplus":
continue # skip any co-ordinations not related to our sector
print(f" Aircraft : {coord.callsign}")
print(f" Exit fix : {coord.fix} at FL{coord.fl:.0f}")
print(f" Sector : {coord.from_sector} → {coord.to_sector}")
print()
Section 2 — Building Agents¶
All agents — rules-based, search-based, or neural-network-based — follow the same pattern:
class MyAgent:
def generate_action(self, env, observation_dict, info_dict) -> dict[str, int]:
...
At each simulation step, generate_action receives:
env— the live gym environment (use this to access the action parser, for example)observation_dict—{callsign: np.ndarray}— the pre-computed feature vector for each aircraftinfo_dict— extra data;info_dict["simulator_environment"]gives the full rawEnvironmentobject with every aircraft's position, heading, flight level, and route
The method must return {callsign: int} — one integer action index per aircraft. We already defined actions_to_enum and the Actions enum in Section 1.5 above — they are reused here.
2.1 Rules-Based Agent¶
A rules-based agent encodes explicit, hand-written logic. It does not learn — it looks at the current state and applies a fixed set of if-then rules to decide what to do.
For conflict avoidance, a natural heuristic is:
- For each aircraft, compute the lateral distance to every other aircraft.
- If a neighbour is closer than some alert threshold, work out which side it is on.
- Turn away from the threat (standard ATC practice: if the threat is to your right, turn left).
- If no threat is detected, issue NOOP.
This approach is simple, transparent, and a useful baseline — but it has clear limitations: purely reactive (no look-ahead), ignores co-ordinations, and only reacts to the single closest threat. Your job is to do better.
We need two geometric helpers: haversine_nm (great-circle distance in nm) and bearing_to (compass bearing between two points).
def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Compute the great-circle distance in nautical miles between two points
given as (latitude, longitude) pairs in decimal degrees.
The 'haversine formula' accounts for the curvature of the Earth — a simple
Euclidean distance would give the wrong answer at aviation scales.
"""
R_NM = 3440.065 # Earth's mean radius in nautical miles
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
# The haversine formula:
a = (
math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
)
return R_NM * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def bearing_to(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Compute the initial compass bearing (0–360°, clockwise from North) from
point 1 to point 2, both in decimal degrees.
This tells us the direction we would need to point to fly directly from
point 1 towards point 2 along the shortest path (great circle).
"""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dlam = math.radians(lon2 - lon1)
x = math.sin(dlam) * math.cos(phi2)
y = math.cos(phi1) * math.sin(phi2) - math.sin(phi1) * math.cos(phi2) * math.cos(
dlam
)
return (math.degrees(math.atan2(x, y)) + 360) % 360
class RulesBasedAgent:
"""
A simple conflict-avoidance agent for the X-Plus sector.
At each time step, for every aircraft:
1. Read its position and heading from the raw simulator state.
2. Scan all other aircraft and compute lateral distance.
3. If any aircraft is within ALERT_THRESHOLD nm, work out which side it
is on relative to our current heading (left or right).
4. Turn away: LEFT if the threat is to our right, RIGHT if to our left.
This mirrors standard ATC practice.
5. If no threat is detected, issue NOOP (do nothing).
Limitations to consider when improving:
- Only reacts to the single closest threat — ignores multiple threats.
- Purely reactive — no look-ahead or prediction of future conflicts.
- Ignores co-ordination conditions (exit fix and flight level).
- Does not recover back onto route after turning away.
"""
# Distance threshold in nautical miles at which we start reacting.
# The actual LoS threshold is 5 nm, so 10 nm gives us a reaction window.
ALERT_THRESHOLD_NM = 10.0
def generate_action(
self,
gym_env: BaseEnv,
observation_dict: dict,
info_dict: dict,
) -> dict:
# Access the raw simulator environment for full aircraft state.
# observation_dict contains the pre-encoded feature vectors, but we
# need raw lat/lon/heading here for geometric calculations.
simulator = info_dict["simulator_environment"]
all_aircraft = simulator.aircraft # dict: callsign -> Aircraft
A = actions_to_enum(gym_env) # build Actions enum for this env
actions = {}
for ac_id in observation_dict:
action = A.NOOP.value # default: do nothing
ac = all_aircraft.get(ac_id)
if ac is None:
# Aircraft may have just left the sector; skip it.
actions[ac_id] = action
continue
# Find the closest aircraft within the alert threshold.
min_dist = self.ALERT_THRESHOLD_NM
threat_bearing = None
for other_id, other_ac in all_aircraft.items():
if other_id == ac_id:
continue # don't compare an aircraft with itself
dist = haversine_nm(ac.lat, ac.lon, other_ac.lat, other_ac.lon)
if dist < min_dist:
# This is the closest threat so far — record its bearing.
min_dist = dist
threat_bearing = bearing_to(
ac.lat, ac.lon, other_ac.lat, other_ac.lon
)
if threat_bearing is not None:
# Convert absolute bearing to relative bearing
# (how many degrees from our current heading to the threat).
rel = (threat_bearing - ac.heading) % 360
# If rel is in [0, 180), the threat is to our RIGHT → turn LEFT.
# If rel is in [180, 360), the threat is to our LEFT → turn RIGHT.
if rel < 180:
action = A.LEFT_10.value
else:
action = A.RIGHT_10.value
actions[ac_id] = action
return actions
Running the rules-based agent on the Flight School benchmark¶
The loop below runs a full episode with the rules-based agent:
- Create a fresh
InfiniteEnvand reset it (same config as above, same seed — same aircraft spawn pattern). - Call
agent.generate_action()to get an action for each aircraft. - Call
env.step(actions)to advance one timestep and receive the next observations and rewards. - Repeat until
doneisTrue(LoS, sector excursion, or time limit).
IPython.display animates the radar screen — each step replaces the previous frame.
agent = RulesBasedAgent()
# Re-create and reset the environment so we start fresh.
env_rules = InfiniteEnv(config=cfg_infinite)
env_rules.set_render_mode("human") # enable the radar animation
seed = 42
obs, info = env_rules.reset(seed=seed)
done = False
rewards_rules = []
while not done:
# Ask the agent which action to take for each aircraft this step.
actions = agent.generate_action(env_rules, obs, info)
# Advance the simulation by one step.
# env.step() returns five values:
#
# obs dict[callsign, np.ndarray]
# The new encoded observation vector for each aircraft,
# computed by the state encoder after this step.
#
# reward_dict dict[callsign, float]
# A scalar reward for each aircraft this step, calculated
# as the weighted sum of the reward function terms in reward_config.
# Positive = good (on-route, separated); negative = bad (close to LoS).
#
# done_dict dict[callsign, bool]
# True for each aircraft that has *naturally* finished —
# left the sector at its exit fix, or caused a loss of separation.
# The episode ends when all values are True.
#
# truncated_dict dict[callsign, bool]
# True if the episode was cut short by the time limit
# (scenario_duration) rather than by a terminal event.
# Important for RL: truncation ≠ termination —
# a truncated episode may still have future value.
#
# info dict
# Extra data. Key entries:
# info["simulator_environment"] → raw Environment object
# (all aircraft, sector geometry)
# info["sim_time"] → elapsed simulated seconds
# info["metrics"] → per-aircraft metric contributions
obs, reward_dict, done_dict, truncated_dict, info = env_rules.step(actions)
rewards_rules.append(reward_dict)
# done_dict has True for each aircraft that has finished.
# The episode ends when all aircraft are done.
done = all(done_dict.values())
# Display the current radar frame, replacing the previous one.
IPython.display.display(env_rules.radar.figure)
IPython.display.clear_output(wait=True)
print(f"Episode ended after {len(rewards_rules)} steps.")
# Plot the mean reward per step over the episode.
# A higher reward indicates the agent kept aircraft on-track and separated.
mean_rewards = [np.mean(list(r.values())) for r in rewards_rules]
plt.figure(figsize=(8, 3))
plt.plot(mean_rewards)
plt.xlabel("Time step")
plt.ylabel("Mean reward across aircraft")
plt.title("Rules-Based Agent — Flight School Benchmark")
plt.tight_layout()
plt.show()
2.2 Reinforcement Learning Agent (scaffold)¶
Reinforcement learning (RL) is a machine learning approach where an agent learns a policy by trial and error.
The gymnasium interface makes it straightforward to plug in any RL library (e.g. Stable Baselines 3, RLlib).
How to use this scaffold:
- Design your neural network architecture inside
__init__. - Implement the
train()method — run episodes ofInfiniteEnvwith varying random seeds, callingenv.reset(seed=...)andenv.step()and updating your weights after each episode. You can set a shortscenario_duration(e.g. 120 s) to keep early training episodes manageable. - Save trained weights to a file.
- In
generate_action(), load the weights and run your network forward to select an action.
class RLAgent:
"""
Scaffold for a reinforcement learning agent.
Fill in the three methods below to build your RL agent:
- __init__: define and (optionally) load your neural network
- train: implement a training loop using InfiniteEnv
- generate_action: run the network forward and return actions
Until you implement training, generate_action returns NOOP for every aircraft,
which is identical to the LazyAgent — it's a starting point, not a working agent.
"""
def __init__(self):
# ── Define your model here ────────────────────────────────────────────
# Example: a simple MLP using NumPy (no deep learning library needed to start)
# self.model = MyMLP(obs_dim=4, hidden=64, action_dim=3)
# Or, using PyTorch:
# self.model = torch.nn.Sequential(...)
# Or, using Stable Baselines 3:
# self.model = PPO("MlpPolicy", env)
pass
def train(self, env: BaseEnv, num_episodes: int = 100):
"""
Implement your training loop here.
A straightforward approach is to train on many episodes of InfiniteEnv,
varying the random seed each time so the agent sees different spawn
patterns. Setting a short scenario_duration (e.g. 120 s) in the config
keeps early episodes manageable — gradually increase it as the agent improves.
Example training loop:
for episode in range(num_episodes):
obs, info = env.reset(seed=episode) # new seed = new spawn pattern
done = False
while not done:
action = self.generate_action(env, obs, info) # or random during exploration
obs, reward, done_dict, truncated_dict, info = env.step(action)
done = all(done_dict.values()) or all(truncated_dict.values())
# store (obs, action, reward, next_obs, done) in replay buffer
# update weights every N steps or at episode end
Note: truncated_dict values become True when the time limit is hit.
Truncation is *not* the same as a terminal failure — a truncated episode
may still have future value, so handle it separately in your loss function
(most RL libraries do this automatically if you pass `truncated` correctly).
"""
raise NotImplementedError("Implement your RL training loop here.")
def generate_action(
self,
gym_env: BaseEnv,
observation_dict: dict,
info_dict: dict,
) -> dict:
"""
Select an action for each aircraft using the trained network.
Replace the NOOP stub below with:
1. Stack observations into a batch tensor.
2. Run a forward pass through your network.
3. Select the action with the highest Q-value (or sample from the policy distribution).
4. Return the selected actions as a dict {callsign: int}.
"""
A = actions_to_enum(gym_env)
# ── Stub: do nothing until the model is trained ───────────────────────
return {ac_id: A.NOOP.value for ac_id in observation_dict}
# Instantiate the RL agent (weights not yet trained — will behave like a lazy agent)
rl_agent = RLAgent()
print(
"RLAgent instantiated. Implement .train() and .generate_action() to make it work."
)
Section 3 — Getting started with Springfield¶
Springfield is the sector used in the (AI)r Traffic Control benchmark, which will be released on 3 August 2026. It is a realistic (though fictional) sector with a more complex route structure than X-Plus.
Unlike the Flight School benchmark (which runs indefinitely), Springfield uses finite scenarios — a fixed set of aircraft with known entry times and routes. The episode ends when all aircraft have left the sector.
The gym interface is identical to InfiniteEnv — you just swap the class and pick a scenario name instead of a seed. The same RulesBasedAgent (and any other agent) works without modification.
3.1 Listing Available Scenarios¶
First, let's see what scenarios are available to us. A large training set of scenarios will be provided to participants at the start of the (AI)r traffic control benchmark. Scenarios must be placed in bluebird-dt/scenario_data/Springfield/scenarios to be accessible for the simulator.
from bluebird_gymnasium.envs import SpringfieldEnv
# List all Springfield scenarios available on this machine.
scenarios = list_sim_scenarios("Springfield")
print(f"Available Springfield scenarios ({len(scenarios)} total):\n")
for s in scenarios:
print(f" • {s}")
3.2 Running an agent on a scenario¶
# ── Build the config ──────────────────────────────────────────────────────────
# get_default_env_config sets sensible defaults for Springfield.
# We switch to "decentralized" view so each aircraft gets its own obs vector —
# the same structure as our InfiniteEnv setup above.
cfg_spf = SpringfieldEnv.get_default_env_config(view_type="decentralized")
# Pick the first available scenario (swap for any name from the list above).
cfg_spf.scenario_config["scenario"] = scenarios[0]
# ── Create and reset the environment ─────────────────────────────────────────
# SpringfieldEnv accepts the exact same reset()/step() interface as InfiniteEnv.
# Note: the constructor calls reset() once internally to set up state;
# we call it again here to get obs and info in the standard gym way.
env_spf = SpringfieldEnv(config=cfg_spf)
env_spf.set_render_mode("human") # enables env_spf.radar.figure for animation
obs_spf, info_spf = env_spf.reset()
print(f"\nRunning scenario: {scenarios[0]}")
print(f"Aircraft at episode start: {list(obs_spf.keys())}")
# ── Run the same RulesBasedAgent ──────────────────────────────────────────────
# The agent interface is identical — generate_action() only needs the env,
# obs dict, and info dict; it does not care which environment type it is running on.
done = False
step_count = 0
while not done:
actions_spf = agent.generate_action(env_spf, obs_spf, info_spf)
obs_spf, _, done_dict_spf, trunc_dict_spf, info_spf = env_spf.step(actions_spf)
# Episode ends when every aircraft has exited or the time limit is hit.
done = all(done_dict_spf.values()) or all(trunc_dict_spf.values())
step_count += 1
IPython.display.display(env_spf.radar.figure)
IPython.display.clear_output(wait=True)
print(f"Episode complete after {step_count} steps ({step_count * 6} s simulated).")
Section 4 — The HMI (Browser-Based Radar Display)¶
The HMI (Human–Machine Interface) is a browser-based radar display that lets you load scenarios, watch them play out in real time, and interact with the sector — all without writing any Python. It uses the same BluebirdATC simulator under the hood.
Starting the HMI¶
From the BluebirdATC repo root, run:
uv run uvicorn bluebird_api:app --port 8000
Then open http://localhost:8000/hmi in your browser.
Using the HMI¶
- Click Load new scenario in the left panel.
- Select Springfield from the airspace dropdown.
- Choose a scenario from the list — the aircraft will appear on the radar display.
- Use the controls in the left panel to:
- Change simulation speed (real-time, 2×, 5×, …)
- Toggle aircraft velocity vectors, range rings and aircraft route
Section 5 — Next Steps and Further Reading¶
What to work on next¶
| Goal | Where to start |
|---|---|
| Improve the rules-based agent | Add co-ordination awareness: check each aircraft's exit FL and issue vertical clearances; handle multiple simultaneous threats |
| Train an RL agent | Use InfiniteEnv with varying random seeds; start with a short scenario_duration (e.g. 120 s); plug in any RL library (Stable Baselines 3, RLlib, CleanRL) |
| Search-based methods (MCTS, beam search) | Use sim.evolve() directly to roll out future states without the gym wrapper |
| Learn about Springfield before 3 August | Replay the scenarios in the HMI; look at the sector boundary and routes using the code in Section 3 above |
Competition Submissions¶
Full submission instructions will be released before the Codabench leaderboards open.
- Codabench leaderboards: Flight School, (AI)r Traffic Control
- Flight School score: number of seconds before first LoS or exit violation
- (AI)r Traffic Control score: hierarchical ranking across 7 safety + efficiency metrics (details released 3 August 2026)
Source code reference¶
| Topic | File |
|---|---|
Aircraft class (attributes, flight plan, coordinations) |
bluebird_dt/core/aircraft.py |
Airspace and Sector classes |
bluebird_dt/core/sector.py |
| Observation encoders (extra_minimal, minimal, full, …) | bluebird_gymnasium/state_repr/ |
| Reward functions | bluebird_gymnasium/rewards/ |
InfiniteEnv and SectorXPlusEnv |
bluebird_gymnasium/envs/ |
| Aircraft performance data (JSON) | bluebird_dt/aircraft_data/ |
| Predictor (physics model) | bluebird_dt/predictor/ |
Other notebooks to help you get started¶
| Notebook | Contents |
|---|---|
bluebird-dt/examples/Intro-Part-1.ipynb |
Gentle introduction to the simulator from scratch |
bluebird-dt/examples/Intro-Part-2.ipynb |
Programmatic state access and scenario configuration |
bluebird-dt/examples/Intro-Part-3.ipynb |
Predictor/physics model deep dive |
bluebird-dt/examples/Intro-Part-4.ipynb |
Custom scenarios and advanced configuration |
bluebird-dt/examples/NonPythonAgents.ipynb |
How to connect agents written in any language |
bluebird-gymnasium/examples/agent_demo.ipynb |
Agent types and training examples |