Skip to content

control.attitude_mpc

lsy_drone_racing.control.attitude_mpc

This module implements an example MPC using attitude control for a quadrotor.

It utilizes the collective thrust interface for drone control to compute control commands based on current state observations and desired waypoints.

The waypoints are generated using cubic spline interpolation from a set of predefined waypoints. Note that the trajectory uses pre-defined waypoints instead of dynamically generating a good path.

Classes

AttitudeMPC(obs, info, config)

Bases: Controller

Example of a MPC using the collective thrust and attitude interface.

Initialize the attitude controller.

Parameters:

Name Type Description Default
obs dict[str, NDArray[floating]]

The initial observation of the environment's state. See the environment's observation space for details.

required
info dict

Additional environment information from the reset.

required
config dict

The configuration of the environment.

required
Source code in lsy_drone_racing/control/attitude_mpc.py
def __init__(self, obs: dict[str, NDArray[np.floating]], info: dict, config: dict):
    """Initialize the attitude controller.

    Args:
        obs: The initial observation of the environment's state. See the environment's
            observation space for details.
        info: Additional environment information from the reset.
        config: The configuration of the environment.
    """
    super().__init__(obs, info, config)
    self._N = 25
    self._dt = 1 / config.env.freq
    self._T_HORIZON = self._N * self._dt

    # Same waypoints as in the state controller. Determined by trial and error.
    start_pos = obs["pos"]
    waypoints = np.array(
        [
            start_pos,
            [-1.0, 0.75, 0.4],
            [0.3, 0.35, 0.7],
            [1.3, -0.15, 0.9],
            [0.85, 0.85, 1.2],
            [-0.5, -0.05, 0.7],
            [-1.2, -0.2, 0.8],
            [-1.2, -0.2, 1.2],
            [-0.0, -0.7, 1.2],
            [0.5, -0.75, 1.2],
        ]
    )
    self._t_total = 15  # s
    t = np.linspace(0, self._t_total, len(waypoints))
    self._des_pos_spline = CubicSpline(t, waypoints)
    self._des_vel_spline = self._des_pos_spline.derivative()
    self._waypoints_pos = self._des_pos_spline(
        np.linspace(0, self._t_total, int(config.env.freq * self._t_total))
    )
    self._waypoints_vel = self._des_vel_spline(
        np.linspace(0, self._t_total, int(config.env.freq * self._t_total))
    )
    self._waypoints_yaw = self._waypoints_pos[:, 0] * 0

    self.drone_params = load_params("so_rpy", config.sim.drone_model)
    self._acados_ocp_solver, self._ocp = create_ocp_solver(
        self._T_HORIZON, self._N, self.drone_params
    )
    self._nx = self._ocp.model.x.rows()
    self._nu = self._ocp.model.u.rows()
    self._ny = self._nx + self._nu
    self._ny_e = self._nx

    self._tick = 0
    self._tick_max = len(self._waypoints_pos) - 1 - self._N
    self._config = config
    self._finished = False
Methods:
compute_control(obs, info=None)

Compute the next desired collective thrust and roll/pitch/yaw of the drone.

Parameters:

Name Type Description Default
obs dict[str, NDArray[floating]]

The current observation of the environment. See the environment's observation space for details.

required
info dict | None

Optional additional information as a dictionary.

None

Returns:

Type Description
NDArray[floating]

The orientation as roll, pitch, yaw angles, and the collective thrust

NDArray[floating]

[r_des, p_des, y_des, t_des] as a numpy array.

Source code in lsy_drone_racing/control/attitude_mpc.py
def compute_control(
    self, obs: dict[str, NDArray[np.floating]], info: dict | None = None
) -> NDArray[np.floating]:
    """Compute the next desired collective thrust and roll/pitch/yaw of the drone.

    Args:
        obs: The current observation of the environment. See the environment's observation space
            for details.
        info: Optional additional information as a dictionary.

    Returns:
        The orientation as roll, pitch, yaw angles, and the collective thrust
        [r_des, p_des, y_des, t_des] as a numpy array.
    """
    i = min(self._tick, self._tick_max)
    if self._tick >= self._tick_max:
        self._finished = True

    # Setting initial state
    obs["rpy"] = R.from_quat(obs["quat"]).as_euler("xyz")
    obs["drpy"] = ang_vel2rpy_rates(obs["quat"], obs["ang_vel"])
    x0 = np.concatenate((obs["pos"], obs["rpy"], obs["vel"], obs["drpy"]))
    self._acados_ocp_solver.set(0, "lbx", x0)
    self._acados_ocp_solver.set(0, "ubx", x0)

    # Setting state reference
    yref = np.zeros((self._N, self._ny))
    yref[:, 0:3] = self._waypoints_pos[i : i + self._N]  # position
    # zero roll, pitch
    yref[:, 5] = self._waypoints_yaw[i : i + self._N]  # yaw
    yref[:, 6:9] = self._waypoints_vel[i : i + self._N]  # velocity
    # zero drpy

    # Setting input reference (index > self._nx)
    # zero rpy
    # hover thrust
    yref[:, 15] = self.drone_params["mass"] * -self.drone_params["gravity_vec"][-1]
    for j in range(self._N):
        self._acados_ocp_solver.set(j, "yref", yref[j])

    # Setting final state reference
    yref_e = np.zeros((self._ny_e))
    yref_e[0:3] = self._waypoints_pos[i + self._N]  # position
    # zero roll, pitch
    yref_e[5] = self._waypoints_yaw[i + self._N]  # yaw
    yref_e[6:9] = self._waypoints_vel[i + self._N]  # velocity
    # zero drpy
    self._acados_ocp_solver.set(self._N, "y_ref", yref_e)

    # Solving problem and getting first input
    self._acados_ocp_solver.solve()
    u0 = self._acados_ocp_solver.get(0, "u")

    return u0
episode_callback()

Reset the integral error.

Source code in lsy_drone_racing/control/attitude_mpc.py
def episode_callback(self):
    """Reset the integral error."""
    self._tick = 0
episode_reset()

Reset the controller's internal state and models if necessary.

Source code in lsy_drone_racing/control/controller.py
def episode_reset(self):
    """Reset the controller's internal state and models if necessary."""
render_callback(sim)

Callback function called before the environment's rendering.

You can use this function to render additional information on the screen, such as the planned trajectory, the drone's target state, etc.

Source code in lsy_drone_racing/control/controller.py
def render_callback(self, sim: Sim):
    """Callback function called before the environment's rendering.

    You can use this function to render additional information on the screen, such as the
    planned trajectory, the drone's target state, etc.
    """
reset()

Reset internal variables if necessary.

Source code in lsy_drone_racing/control/controller.py
def reset(self):
    """Reset internal variables if necessary."""
step_callback(action, obs, reward, terminated, truncated, info)

Increment the tick counter.

Source code in lsy_drone_racing/control/attitude_mpc.py
def step_callback(
    self,
    action: NDArray[np.floating],
    obs: dict[str, NDArray[np.floating]],
    reward: float,
    terminated: bool,
    truncated: bool,
    info: dict,
) -> bool:
    """Increment the tick counter."""
    self._tick += 1

    return self._finished

Functions:

create_acados_model(parameters)

Creates an acados model from a symbolic drone_model.

Source code in lsy_drone_racing/control/attitude_mpc.py
def create_acados_model(parameters: dict) -> AcadosModel:
    """Creates an acados model from a symbolic drone_model."""
    # For more info on the models, check out https://github.com/learnsyslab/drone-models
    X_dot, X, U, _ = symbolic_dynamics_euler(
        mass=parameters["mass"],
        gravity_vec=parameters["gravity_vec"],
        J=parameters["J"],
        J_inv=parameters["J_inv"],
        acc_coef=parameters["acc_coef"],
        cmd_f_coef=parameters["cmd_f_coef"],
        rpy_coef=parameters["rpy_coef"],
        rpy_rates_coef=parameters["rpy_rates_coef"],
        cmd_rpy_coef=parameters["cmd_rpy_coef"],
    )

    # Initialize the nonlinear model for NMPC formulation
    model = AcadosModel()
    model.name = "basic_example_mpc"
    model.f_expl_expr = X_dot
    model.f_impl_expr = None
    model.x = X
    model.u = U

    return model

create_ocp_solver(Tf, N, parameters, verbose=False)

Creates an acados Optimal Control Problem and Solver.

Source code in lsy_drone_racing/control/attitude_mpc.py
def create_ocp_solver(
    Tf: float, N: int, parameters: dict, verbose: bool = False
) -> tuple[AcadosOcpSolver, AcadosOcp]:
    """Creates an acados Optimal Control Problem and Solver."""
    ocp = AcadosOcp()

    # Set model
    ocp.model = create_acados_model(parameters)

    # Get Dimensions
    nx = ocp.model.x.rows()
    nu = ocp.model.u.rows()
    ny = nx + nu
    ny_e = nx

    # Set dimensions
    ocp.solver_options.N_horizon = N

    ## Set Cost
    # For more Information regarding Cost Function Definition in Acados:
    # https://github.com/acados/acados/blob/main/docs/problem_formulation/problem_formulation_ocp_mex.pdf
    #

    # Cost Type
    ocp.cost.cost_type = "LINEAR_LS"
    ocp.cost.cost_type_e = "LINEAR_LS"

    # Weights
    # State weights
    Q = np.diag(
        [
            50.0,  # pos
            50.0,  # pos
            400.0,  # pos
            1.0,  # rpy
            1.0,  # rpy
            1.0,  # rpy
            10.0,  # vel
            10.0,  # vel
            10.0,  # vel
            5.0,  # drpy
            5.0,  # drpy
            5.0,  # drpy
        ]
    )
    # Input weights (reference is upright orientation and hover thrust)
    R = np.diag(
        [
            1.0,  # rpy
            1.0,  # rpy
            1.0,  # rpy
            50.0,  # thrust
        ]
    )

    Q_e = Q.copy()
    ocp.cost.W = scipy.linalg.block_diag(Q, R)
    ocp.cost.W_e = Q_e

    Vx = np.zeros((ny, nx))
    Vx[0:nx, 0:nx] = np.eye(nx)  # Select all states
    ocp.cost.Vx = Vx

    Vu = np.zeros((ny, nu))
    Vu[nx : nx + nu, :] = np.eye(nu)  # Select all actions
    ocp.cost.Vu = Vu

    Vx_e = np.zeros((ny_e, nx))
    Vx_e[0:nx, 0:nx] = np.eye(nx)  # Select all states
    ocp.cost.Vx_e = Vx_e

    # Set initial references. We will overwrite these later to track the trajectory
    ocp.cost.yref, ocp.cost.yref_e = np.zeros((ny,)), np.zeros((ny_e,))

    # Set State Constraints (rpy < 30°)
    ocp.constraints.lbx = np.array([-0.5, -0.5, -0.5])
    ocp.constraints.ubx = np.array([0.5, 0.5, 0.5])
    ocp.constraints.idxbx = np.array([3, 4, 5])

    # Set Input Constraints (rpy < 30°)
    ocp.constraints.lbu = np.array([-0.5, -0.5, -0.5, parameters["thrust_min"] * 4])
    ocp.constraints.ubu = np.array([0.5, 0.5, 0.5, parameters["thrust_max"] * 4])
    ocp.constraints.idxbu = np.array([0, 1, 2, 3])

    # We have to set x0 even though we will overwrite it later on.
    ocp.constraints.x0 = np.zeros((nx))

    # Solver Options
    ocp.solver_options.qp_solver = "FULL_CONDENSING_HPIPM"  # FULL_, PARTIAL_ ,_HPIPM, _QPOASES
    ocp.solver_options.hessian_approx = "GAUSS_NEWTON"
    ocp.solver_options.integrator_type = "ERK"
    ocp.solver_options.nlp_solver_type = "SQP"  # SQP, SQP_RTI
    ocp.solver_options.tol = 1e-6

    ocp.solver_options.qp_solver_cond_N = N
    ocp.solver_options.qp_solver_warm_start = 1

    ocp.solver_options.qp_solver_iter_max = 20
    ocp.solver_options.nlp_solver_max_iter = 50

    # set prediction horizon
    ocp.solver_options.tf = Tf

    acados_ocp_solver = AcadosOcpSolver(
        ocp,
        json_file="c_generated_code/lsy_example_mpc.json",
        verbose=verbose,
        build=True,
        generate=True,
    )

    return acados_ocp_solver, ocp