APIs: simulator

simulator.environments

class simulator.environments.CustomizedEnvInterface

Customized interface for extending gym for DRLTT.

__weakref__

list of weak references to the object (if defined)

abstract export_environment_data() Environment

Export environment data.

Returns:

Environment data in proto structure.

Return type:

Environment

abstract get_state() ndarray

Get the underlying state.

Returns:

Vectorized underlying state.

Return type:

np.ndarray

class simulator.environments.ExtendedGymEnv

The type object for the extended gym environmnet.

class simulator.environments.TrajectoryTrackingEnv(env_info: Environment | None = None, dynamics_model_configs: Iterable[Dict[str, Any]] | None = None, **kwargs)

Environment for Trajectory Tracking.

observation_space

State space, utilized by RL framework such as Stable Baselines3.

Type:

gym.spaces.space.Space

action_space

State space, utilized by RL framework such as Stable Baselines3.

Type:

gym.spaces.space.Space

state_space

State space.

Type:

gym.spaces.space.Space

init_state_space

Initial state space.

Type:

gym.spaces.space.Space

env_info

Serialized data structure that contains hyper-parameter and episode data.

__init__(env_info: Environment | None = None, dynamics_model_configs: Iterable[Dict[str, Any]] | None = None, **kwargs)
Parameters:

dynamics_model_configs – Configurations of all dynamics models.

_build_spaces()

Build spaces (observation/action/state/init_state/etc.).

Dependency (class attributes needed to be set before): - self.env_info - self.dynamics_model_manager - self.observation_manager

get_dynamics_model_info() str

Get the infromation about configuration of dynamics models.

Returns:

Pretty string of information for dynamics models.

Return type:

str

get_reference_line() ReferenceLine

Get the current reference line.

Returns:

The current reference line.

Return type:

ReferenceLine

get_state() ndarray

Get the underlying state.

Returns:

Vectorized underlying state.

Return type:

np.ndarray

classmethod parse_dynamics_model_hyper_parameter(hyper_parameter: TrajectoryTrackingHyperParameter, dynamics_model_manager: DynamicsModelManager)

Parse the hyper-parameters of dynamics models from the manager.

Parameters:
  • hyper_parameter (TrajectoryTrackingHyperParameter) – Hyper-parameter to store parsed info.

  • dynamics_model_manager (DynamicsModelManager) – Dynamics model manager.

classmethod parse_hyper_parameter(hyper_parameter: TrajectoryTrackingHyperParameter, step_interval: float, tracking_length_lb: int, tracking_length_ub: int, reference_line_pad_mode: str, init_state_lb: List[float | None], init_state_ub: List[float | None], n_observation_steps: int, max_n_episodes: int = 1000)

Parse hyper-parameter.

Parameters:
  • hyper_parameter – Destination of the parsed hyper-parameters.

  • step_interval – Step interval by which each step moves forward temporally.

  • tracking_length_lb – Lower bound of tracking length.

  • tracking_length_ub – Upper bound of tracking length.

  • reference_line_pad_mode – Mode for padding reference line.

  • init_state_lb – lower Bound of state space.

  • init_state_ub – upper Bound of state space.

  • n_observation_steps – Number of the steps within the observation.

reset(init_state: ndarray | None = None, dynamics_model_name: str = '', reference_line: ReferenceLine | None = None) ndarray

Reset environment for Trajectory Tracking.

  • Clear and replace episode data.

  • Randomly sample a dynamics model.

  • Setup reference line and initial state.

Parameters:

init_state – Specified initial state of dynamics model. Default to None which denotes random sampling from pre-defined initial state space.

Returns:

First observation from the environment.

Return type:

np.ndarray

step(action: ndarray)
Parameters:

action – Action given to the environment for stepping the state.

simulator.dynamics_models

class simulator.dynamics_models.BaseDynamicsModel(init_state: ndarray | None = None, **kwargs)

Base class for dynamics models defining state/action space, dynamics/transition functions, and more.

hyper_parameter

Hyper-parameter of the dynamics model.

Type:

drltt_proto.dynamics_model.hyper_parameter_pb2.HyperParameter

state

Vectorized state of the dynamics model.

Type:

drltt_proto.dynamics_model.state_pb2.State

__init__(init_state: ndarray | None = None, **kwargs)
Parameters:

init_state – Initial state to be set.

__weakref__

list of weak references to the object (if defined)

abstract compute_next_state(action: ndarray, delta_t: float) State

Proceed a step forward by a specified time interval without update of internal state.

Parameters:
  • action – Applied vectorized action.

  • delta_t – Time interval.

abstract classmethod deserialize_action(action: Action) ndarray

Deserialize action to np.ndarray.

Parameters:

action – Serialized action.

Returns:

Vectorized action.

Return type:

np.ndarray

abstract classmethod deserialize_state(state: State) ndarray

Deserialize state to np.ndarray.

Parameters:

state – Serialized state.

Returns:

Vectorized state.

Return type:

np.ndarray

abstract get_action_space() Space

Get action space.

Returns:

Action space.

Return type:

Space

abstract get_body_state_proto() BodyState

Return agent body’s state in proto (serialized form).

Returns:

Body state of the dynamics model.

Return type:

BodyState

abstract get_dynamics_model_observation() ndarray

Get dynamics model observationm usually containing hyper-parameter of the model.

Returns:

Vectorized dynamics model observation.

Return type:

np.ndarray

get_dynamics_model_observation_space() Space

Get dynamics model observation space where the dynamics model obervation lies in.

Returns:

Dynamics model observation space.

Return type:

Space

get_name() str

Get the dynamics model’s name.

Returns:

The dynamics model’s name.

Return type:

str

get_state() ndarray

Get the state in np.ndarray (deserialized form).

Returns:

Returned state.

Return type:

np.ndarray

abstract get_state_observation() ndarray

Return state observation of the dynamics model, which is usually body state-independent/ego centric.

Returns:

Vectorised State observation.

Return type:

np.ndarray

get_state_observation_space() Space

Get state observation space where the state observation lies in.

Returns:

State obervstion space.

Return type:

Space

get_state_proto() State

Get the state in proto (serialized form).

Returns:

State in proto.

Return type:

State

abstract get_state_space() Space

Get state space.

Returns:

State space.

Return type:

Space

abstract jacobian(state: ndarray, action: ndarray) ndarray

Compute jacobian by performing linearization at a given point (state-action pair).

Parameters:
  • state – State at the linearization point, shape=(n_dims_state,).

  • action – Action at the linearization point, shape(n_dims_action,).

Returns:

Jacobian matrix, shape=(n_dims_state, n_dims_state + n_dims_action).

Return type:

np.ndarray

abstract classmethod serialize_action(action: ndarray) Action

Serialize action to proto.

Parameters:

action – Vectorized action.

Returns:

Serialized action.

Return type:

Action

abstract classmethod serialize_state(state: ndarray) State

Serialize state to proto.

Parameters:

state – Vectorized state.

Returns:

Serialized state.

Return type:

State

step(action: ndarray, delta_t: float)

Step the model’s state forward by a specified time interval.

Parameters:
  • action – Applied action.

  • delta_t – Time interval.

class simulator.dynamics_models.BicycleModel(hyper_parameter: HyperParameter | None = None, **kwargs)

Bicycle model, suitable for modeling vehicle/bicycle kinematics.

__init__(hyper_parameter: HyperParameter | None = None, **kwargs)
Parameters:

hyper_parameters – hyper parameter of bicylce model if protobuf-typed hyper-parameters is not provided, then parse arguments from kwargs. if protobuf-typed hyper-parameters are provided, then assign them to the underlying hyper_hyperparameters.

Compute variables related to the rotation of the Center of Gravity (CoG)

Parameters:
  • steering_angle – Current steering_angle of the vehicle.

  • hyper_parameter – Vehicle’s hyper parameter.

Returns:

The angle between the heading of vehicle and the speed direction of the CoG. rotation_radius_inv: The inverse of the radius of the rotation of the CoG. Ensuring numerical stability.

Return type:

omega

property cog_relative_position_between_axles: float

Relative position of Center of Gravity (CoG) between axles

compute_next_state(action: ndarray, delta_t: float) Any

Proceed a step forward by a specified time interval without update of internal state.

Parameters:
  • action – Applied vectorized action.

  • delta_t – Time interval.

classmethod deserialize_action(action: Action) ndarray

Deserialize action to np.ndarray.

Parameters:

action – Serialized action.

Returns:

Vectorized action.

Return type:

np.ndarray

classmethod deserialize_state(state: State) ndarray

Deserialize state to np.ndarray.

Parameters:

state – Serialized state.

Returns:

Vectorized state.

Return type:

np.ndarray

get_action_space() Space

Get action space

get_body_state_proto()

Return agent body’s state in proto (serialized form).

Returns:

Body state of the dynamics model.

Return type:

BodyState

get_dynamics_model_observation() ndarray

Get dynamics model observationm usually containing hyper-parameter of the model.

Returns:

Vectorized dynamics model observation.

Return type:

np.ndarray

get_state_observation() ndarray

Return state observation of the dynamics model, which is usually body state-independent/ego centric.

Returns:

Vectorised State observation.

Return type:

np.ndarray

get_state_space() Space

Get state space.

Returns:

State space.

Return type:

Space

jacobian(state: ndarray, action: ndarray) ndarray

Compute jacobian by performing linearization at a given point (state-action pair).

Parameters:
  • state – State at the linearization point, shape=(n_dims_state,).

  • action – Action at the linearization point, shape(n_dims_action,).

Returns:

Jacobian matrix, shape=(n_dims_state, n_dims_state + n_dims_action).

Return type:

np.ndarray

property max_steer: float

Maximum steering angle brought by the limit on lateral acceleration.

Returns:

Maximum steering angle.

classmethod parse_hyper_parameter(hyper_parameter: HyperParameter = None, name: str = 'bicycle model', front_overhang: float = 0.0, wheelbase: float = 0.0, rear_overhang: float = 0.0, width: float = 0.0, action_space_lb: Iterable[float] = (-inf, -inf), action_space_ub: Iterable[float] = (inf, inf), max_lat_acc: float = 4.0, **kwargs)

TODO: refer this part to protobuf and so for remove redundant doc. :param hyper_parameter: Hyper-parameter as parsing result. :param front_overhang: Distance from front axle to vehicle front. :param wheelbase: Distance between front axle and rear axle. :param rear_overhang: Distance from rear axle to vehicle rear. :param width: Width of vehicle. :param action_space_lb: Lower bound of action space. :param action_space_ub: Upper bound of action space. :param max_lat_acc: Maximum lateral acceleration.

classmethod serialize_action(action: ndarray) Action

Serialize action to proto.

Parameters:

action – Vectorized action.

Returns:

Serialized action.

Return type:

Action

classmethod serialize_state(state: ndarray) State

Serialize state to proto.

Parameters:

state – Vectorized state.

Returns:

Serialized state.

Return type:

State

class simulator.dynamics_models.DynamicsModelManager(hyper_parameters: Iterable[HyperParameter] = (), dynamics_model_configs: Iterable = ())

Manager for Dynamics Models, supporting random sampling/operating on sampled dynamics model/etc.

dynamics_models

Collection of dynamics models

Type:

List[simulator.dynamics_models.base_dynamics_model.BaseDynamicsModel]

sampled_dynamics_model

The currently sampled dynamics model.

Type:

simulator.dynamics_models.base_dynamics_model.BaseDynamicsModel | None

sampled_dynamics_model_index

THe index of currently sampled dynamics model.

Type:

int

__init__(hyper_parameters: Iterable[HyperParameter] = (), dynamics_model_configs: Iterable = ())
Parameters:
  • hyper_parameters – Hyper-parameters of dynamics models to be managed by the manager.

  • dynamics_model_configs – configurations of dynamics models.

__weakref__

list of weak references to the object (if defined)

get_all_hyper_parameters() List[HyperParameter]

Get the collection of hyper-parameters of all dynamics models.

Returns:

The list of dynamics models.

Return type:

List[HyperParameter]

get_dynamics_model_info() str

Get the string about of information of dynamics models.

Returns:

The string of information of dynamics models.

Return type:

str

get_dynamics_model_observation_space() Space

Get the dynamics model observation space. Assuming this space is identical across models within the collection

Returns:

Dynamics model observation space.

Return type:

Space

get_sampled_dynamics_model() BaseDynamicsModel

Return the sampled dynamics model.

Returns:

Sampled dynamics model.

Return type:

BaseDynamicsModel

get_state_observation_space() Space

Get the state observation space. Assuming this space is identical across models within the collection.

Returns:

State observation space

Return type:

Space

sample_dynamics_model() Tuple[int, BaseDynamicsModel]

Randomly sample a dynamics model.

Returns:

sampled dynamics model

Return type:

BaseDynamicsModel

select_dynamics_model_by_name(name: str) Tuple[int, BaseDynamicsModel]

Select a dynamics model by name.

Parameters:

name (str) – The name of dynamics model.

Returns:

The index and object of the selected dynamics model.

Return type:

Tuple[int, BaseDynamicsModel]

simulator.rl_learning

simulator.rl_learning.compute_bicycle_model_metrics(episode: TrajectoryTrackingEpisode, environment: ExtendedGymEnv) Dict[str, Any]

Compute metrics for the bicycle model for an episode.

Parameters:
  • episode – Data of the episode.

  • environment – Associated environment.

Returns:

Computed metrics.

  • l2_distance_median: median L2 distance

  • scaled_action_norm_median

  • reward_median

Return type:

Dict[str, Any]

simulator.rl_learning.eval_with_sb3(environment: Env, algorithm: BaseAlgorithm, report_dir: str, n_episodes: int, compute_metrics_name: str, viz_interval: int = 10)

RL Evaluation with Stable Baselines3.

Parameters:
  • environment – Evaluation environment.

  • algorithm – The algorithm with models to be evaluated.

  • report_dir – Directory to export report JSON.

  • n_episodes – Number of episodes.

  • compute_metrics_name – Name of compute_metrics.

  • viz_interval – Interval of episodes that this function performs visualization. TODO: set it with argument passed through Shell script.

simulator.rl_learning.export_sb3_jit_module(algorithm: BasePolicy, environment: Env, export_dir: str, test_case_save_format: str = 'protobuf', device: device | str = 'cpu', n_test_cases: int = 1) ScriptModule

Export jit module from Stable Baselines 3 algorithm.

Reference: https://stable-baselines3.readthedocs.io/en/master/guide/export.html#trace-export-to-c

Parameters:
  • algorithm – SB3 algorithm to be traced.

  • input_size – Input tensor size.

  • device – Desired device where the tranced module is to be inferred.

  • export_dir – Directory to export traced module.

  • test_case_save_format – (‘protobuf’|’numpy’)

  • n_test_cases – Number of test cases.

simulator.rl_learning.roll_out_one_episode(environment: ExtendedGymEnv, policy_func: Callable, **kwargs) Tuple[List[ndarray], List[ndarray], List[ndarray]]

Roll out one episode and return a trajectory.

Parameters:
  • environment – The associated environment.

  • policy_func – The policy function, observation -> action.

Returns:

States. List[np.ndarray]: Actions. List[np.ndarray]: Observations.

Return type:

List[np.ndarray]

simulator.rl_learning.test_sb3_jit_module(jit_module_path: str, test_cases_path: str, test_case_save_format: str = 'protobuf', device: device | str = 'cpu', rtol: float = 0.001, atol: float = 0.0001) bool

Test numeric accuracy of JIT module exported from Stable Baselines 3.

Parameters:
  • jit_module_path – Path to the JIT module.

  • test_cases_path – Path to protobuf file of test cases.

  • test_case_save_format – Save format (protobuf|numpy)

  • device – Device to test module.

  • rtol – Relative tolerance, same definition as numpy.allclose and torch.allclose.

  • atol – Absolute tolerance, same definition as numpy.allclose and torch.allclose.

Returns:

Flag of allclose between original module and traced module.

Return type:

bool

simulator.rl_learning.train_with_sb3(environment: ExtendedGymEnv, algorithm_config: Dict, learning_config: Dict, checkpoint_file_prefix: str = '') BaseAlgorithm | None

RL Training with Stable Baselines3.

Parameters:
  • environment – Training environment.

  • algorithm_config – Configuration of the algorithm.

  • learning_config – Configuration of the learning.

  • checkpoint_file_prefix – File prefix (i.e. path without extension) to save checkpoint file.

Returns:

The algorithm object with trained models.

Return type:

Union[BaseAlgorithm, None]

simulator.observation

class simulator.observation.ObservationManager(reference_line_manager: ReferenceLineManager, dynamics_model_manager: DynamicsModelManager)

Manager for observation.

reference_line_manager

handler of underlying reference line manager

Type:

simulator.trajectory.reference_line.ReferenceLineManager

dynamics_model_manager

handler of underlying Dynamics model manager

Type:

simulator.dynamics_models.dynamics_model_manager.DynamicsModelManager

__init__(reference_line_manager: ReferenceLineManager, dynamics_model_manager: DynamicsModelManager)
Parameters:
  • reference_line_manager – Underlying reference line manager

  • dynamics_model_manager – underlying Dynamics model manager

__weakref__

list of weak references to the object (if defined)

get_observation(episode_data, body_state: BodyState) ndarray

Return the vectorized observation, which is usually ego-centric.

Parameters:
  • episode_data – Episode data.

  • body_state – (Serialized) body state of agent/dynamics model.

Returns:

Vectorized observation.

Return type:

np.ndarray

get_observation_space() Space

Return observation space, usually consisting of multiple sub-observation spaces.

Returns:

observation space.

Return type:

Space

simulator.trajectory

class simulator.trajectory.ReferenceLineManager(n_observation_steps: int, pad_mode: str = 'none', dtype: ~numpy.dtype = <class 'numpy.float32'>)

Manager for Reference Line.

reference_line

Handler of underlying reference line manager.

Type:

drltt_proto.trajectory.trajectory_pb2.ReferenceLine

pad_mode

Mode used for reference line padding.

Type:

str

dtype

Data type for reference line representation and observation

Type:

numpy.dtype

__init__(n_observation_steps: int, pad_mode: str = 'none', dtype: ~numpy.dtype = <class 'numpy.float32'>)
Parameters:
  • n_observation_steps – Number of observation steps on the forward part of the reference line.

  • pad_mode – Desired mode used for reference line padding.

  • dtype – Desired data type.

__weakref__

list of weak references to the object (if defined)

get_observation_by_index(episode_data, body_state: BodyState) ndarray

Get vectorized observation of reference line given waypoint index.

Parameters:
  • episode_data – Episode data.

  • body_state – Body state for ego-centric observation.

Returns:

Vectorized reference line observation. Format: (x, y) x length.

Return type:

np.ndarray

get_observation_by_state(body_state: BodyState, reference_line: ReferenceLine = None) ndarray

TODO: to implement

get_observation_space() Space

Get observation space.

Returns:

Observation space.

Return type:

Space

get_projected_waypoint_index(body_state: BodyState) ndarray

TODO: to implement

get_reference_line() ReferenceLine

Return the underlying reference line.

Returns:

Returned reference line

Return type:

ReferenceLine

get_reference_line_waypoint(index: int) ndarray

Get a waypoint on the reference line.

Parameters:

index – Step index of the desired waypoint.

Returns:

Reference line waypoint (vectorized form).

Return type:

np.ndarray

set_reference_line(reference_line: ReferenceLine, tracking_length: int = 0)

Set reference line.

Parameters:
  • reference_line – Reference line to be set.

  • tracking_length – Desired tracking length of reference line.

simulator.trajectory.random_walk(dynamics_model: BaseDynamicsModel, step_interval: float, walk_length: int) Tuple[ReferenceLine, Trajectory]

Perform random walk to generate reference line.

Parameters:
  • dynamics_model – Dynamics model for random walk.

  • step_interval – Time interval of a step.

  • walk_length – Length of the generated trajectory.

Returns:

Random walk results:

  • ReferenceLine: Generated reference line.

  • Trajectory: Generated trajectory.

Return type:

Tuple[ReferenceLine, Trajectory]