APIs: simulator¶
simulator.environments¶
- class simulator.environments.CustomizedEnvInterface¶
Customized interface for extending gym for DRLTT.
- __weakref__¶
list of weak references to the object (if defined)
- abstract export_environment_data() Environment¶
Export environment data.
- Returns:
Environment data in proto structure.
- Return type:
Environment
- abstract get_state() ndarray¶
Get the underlying state.
- Returns:
Vectorized underlying state.
- Return type:
np.ndarray
- class simulator.environments.ExtendedGymEnv¶
The type object for the extended gym environmnet.
- class simulator.environments.TrajectoryTrackingEnv(env_info: Environment | None = None, dynamics_model_configs: Iterable[Dict[str, Any]] | None = None, **kwargs)¶
Environment for Trajectory Tracking.
- observation_space¶
State space, utilized by RL framework such as Stable Baselines3.
- Type:
gym.spaces.space.Space
- action_space¶
State space, utilized by RL framework such as Stable Baselines3.
- Type:
gym.spaces.space.Space
- state_space¶
State space.
- Type:
gym.spaces.space.Space
- init_state_space¶
Initial state space.
- Type:
gym.spaces.space.Space
- env_info¶
Serialized data structure that contains hyper-parameter and episode data.
- __init__(env_info: Environment | None = None, dynamics_model_configs: Iterable[Dict[str, Any]] | None = None, **kwargs)¶
- Parameters:
dynamics_model_configs – Configurations of all dynamics models.
- _build_spaces()¶
Build spaces (observation/action/state/init_state/etc.).
Dependency (class attributes needed to be set before): - self.env_info - self.dynamics_model_manager - self.observation_manager
- get_dynamics_model_info() str¶
Get the infromation about configuration of dynamics models.
- Returns:
Pretty string of information for dynamics models.
- Return type:
str
- get_reference_line() ReferenceLine¶
Get the current reference line.
- Returns:
The current reference line.
- Return type:
ReferenceLine
- get_state() ndarray¶
Get the underlying state.
- Returns:
Vectorized underlying state.
- Return type:
np.ndarray
- classmethod parse_dynamics_model_hyper_parameter(hyper_parameter: TrajectoryTrackingHyperParameter, dynamics_model_manager: DynamicsModelManager)¶
Parse the hyper-parameters of dynamics models from the manager.
- Parameters:
hyper_parameter (TrajectoryTrackingHyperParameter) – Hyper-parameter to store parsed info.
dynamics_model_manager (DynamicsModelManager) – Dynamics model manager.
- classmethod parse_hyper_parameter(hyper_parameter: TrajectoryTrackingHyperParameter, step_interval: float, tracking_length_lb: int, tracking_length_ub: int, reference_line_pad_mode: str, init_state_lb: List[float | None], init_state_ub: List[float | None], n_observation_steps: int, max_n_episodes: int = 1000)¶
Parse hyper-parameter.
- Parameters:
hyper_parameter – Destination of the parsed hyper-parameters.
step_interval – Step interval by which each step moves forward temporally.
tracking_length_lb – Lower bound of tracking length.
tracking_length_ub – Upper bound of tracking length.
reference_line_pad_mode – Mode for padding reference line.
init_state_lb – lower Bound of state space.
init_state_ub – upper Bound of state space.
n_observation_steps – Number of the steps within the observation.
- reset(init_state: ndarray | None = None, dynamics_model_name: str = '', reference_line: ReferenceLine | None = None) ndarray¶
Reset environment for Trajectory Tracking.
Clear and replace episode data.
Randomly sample a dynamics model.
Setup reference line and initial state.
- Parameters:
init_state – Specified initial state of dynamics model. Default to None which denotes random sampling from pre-defined initial state space.
- Returns:
First observation from the environment.
- Return type:
np.ndarray
- step(action: ndarray)¶
- Parameters:
action – Action given to the environment for stepping the state.
simulator.dynamics_models¶
- class simulator.dynamics_models.BaseDynamicsModel(init_state: ndarray | None = None, **kwargs)¶
Base class for dynamics models defining state/action space, dynamics/transition functions, and more.
- hyper_parameter¶
Hyper-parameter of the dynamics model.
- Type:
drltt_proto.dynamics_model.hyper_parameter_pb2.HyperParameter
- state¶
Vectorized state of the dynamics model.
- Type:
drltt_proto.dynamics_model.state_pb2.State
- __init__(init_state: ndarray | None = None, **kwargs)¶
- Parameters:
init_state – Initial state to be set.
- __weakref__¶
list of weak references to the object (if defined)
- abstract compute_next_state(action: ndarray, delta_t: float) State¶
Proceed a step forward by a specified time interval without update of internal state.
- Parameters:
action – Applied vectorized action.
delta_t – Time interval.
- abstract classmethod deserialize_action(action: Action) ndarray¶
Deserialize action to np.ndarray.
- Parameters:
action – Serialized action.
- Returns:
Vectorized action.
- Return type:
np.ndarray
- abstract classmethod deserialize_state(state: State) ndarray¶
Deserialize state to np.ndarray.
- Parameters:
state – Serialized state.
- Returns:
Vectorized state.
- Return type:
np.ndarray
- abstract get_action_space() Space¶
Get action space.
- Returns:
Action space.
- Return type:
Space
- abstract get_body_state_proto() BodyState¶
Return agent body’s state in proto (serialized form).
- Returns:
Body state of the dynamics model.
- Return type:
BodyState
- abstract get_dynamics_model_observation() ndarray¶
Get dynamics model observationm usually containing hyper-parameter of the model.
- Returns:
Vectorized dynamics model observation.
- Return type:
np.ndarray
- get_dynamics_model_observation_space() Space¶
Get dynamics model observation space where the dynamics model obervation lies in.
- Returns:
Dynamics model observation space.
- Return type:
Space
- get_name() str¶
Get the dynamics model’s name.
- Returns:
The dynamics model’s name.
- Return type:
str
- get_state() ndarray¶
Get the state in np.ndarray (deserialized form).
- Returns:
Returned state.
- Return type:
np.ndarray
- abstract get_state_observation() ndarray¶
Return state observation of the dynamics model, which is usually body state-independent/ego centric.
- Returns:
Vectorised State observation.
- Return type:
np.ndarray
- get_state_observation_space() Space¶
Get state observation space where the state observation lies in.
- Returns:
State obervstion space.
- Return type:
Space
- get_state_proto() State¶
Get the state in proto (serialized form).
- Returns:
State in proto.
- Return type:
State
- abstract get_state_space() Space¶
Get state space.
- Returns:
State space.
- Return type:
Space
- abstract jacobian(state: ndarray, action: ndarray) ndarray¶
Compute jacobian by performing linearization at a given point (state-action pair).
- Parameters:
state – State at the linearization point, shape=(n_dims_state,).
action – Action at the linearization point, shape(n_dims_action,).
- Returns:
Jacobian matrix, shape=(n_dims_state, n_dims_state + n_dims_action).
- Return type:
np.ndarray
- abstract classmethod serialize_action(action: ndarray) Action¶
Serialize action to proto.
- Parameters:
action – Vectorized action.
- Returns:
Serialized action.
- Return type:
Action
- abstract classmethod serialize_state(state: ndarray) State¶
Serialize state to proto.
- Parameters:
state – Vectorized state.
- Returns:
Serialized state.
- Return type:
State
- step(action: ndarray, delta_t: float)¶
Step the model’s state forward by a specified time interval.
- Parameters:
action – Applied action.
delta_t – Time interval.
- class simulator.dynamics_models.BicycleModel(hyper_parameter: HyperParameter | None = None, **kwargs)¶
Bicycle model, suitable for modeling vehicle/bicycle kinematics.
- __init__(hyper_parameter: HyperParameter | None = None, **kwargs)¶
- Parameters:
hyper_parameters – hyper parameter of bicylce model if protobuf-typed hyper-parameters is not provided, then parse arguments from kwargs. if protobuf-typed hyper-parameters are provided, then assign them to the underlying hyper_hyperparameters.
Compute variables related to the rotation of the Center of Gravity (CoG)
- Parameters:
steering_angle – Current steering_angle of the vehicle.
hyper_parameter – Vehicle’s hyper parameter.
- Returns:
The angle between the heading of vehicle and the speed direction of the CoG. rotation_radius_inv: The inverse of the radius of the rotation of the CoG. Ensuring numerical stability.
- Return type:
omega
- property cog_relative_position_between_axles: float¶
Relative position of Center of Gravity (CoG) between axles
- compute_next_state(action: ndarray, delta_t: float) Any¶
Proceed a step forward by a specified time interval without update of internal state.
- Parameters:
action – Applied vectorized action.
delta_t – Time interval.
- classmethod deserialize_action(action: Action) ndarray¶
Deserialize action to np.ndarray.
- Parameters:
action – Serialized action.
- Returns:
Vectorized action.
- Return type:
np.ndarray
- classmethod deserialize_state(state: State) ndarray¶
Deserialize state to np.ndarray.
- Parameters:
state – Serialized state.
- Returns:
Vectorized state.
- Return type:
np.ndarray
- get_action_space() Space¶
Get action space
- get_body_state_proto()¶
Return agent body’s state in proto (serialized form).
- Returns:
Body state of the dynamics model.
- Return type:
BodyState
- get_dynamics_model_observation() ndarray¶
Get dynamics model observationm usually containing hyper-parameter of the model.
- Returns:
Vectorized dynamics model observation.
- Return type:
np.ndarray
- get_state_observation() ndarray¶
Return state observation of the dynamics model, which is usually body state-independent/ego centric.
- Returns:
Vectorised State observation.
- Return type:
np.ndarray
- get_state_space() Space¶
Get state space.
- Returns:
State space.
- Return type:
Space
- jacobian(state: ndarray, action: ndarray) ndarray¶
Compute jacobian by performing linearization at a given point (state-action pair).
- Parameters:
state – State at the linearization point, shape=(n_dims_state,).
action – Action at the linearization point, shape(n_dims_action,).
- Returns:
Jacobian matrix, shape=(n_dims_state, n_dims_state + n_dims_action).
- Return type:
np.ndarray
- property max_steer: float¶
Maximum steering angle brought by the limit on lateral acceleration.
- Returns:
Maximum steering angle.
- classmethod parse_hyper_parameter(hyper_parameter: HyperParameter = None, name: str = 'bicycle model', front_overhang: float = 0.0, wheelbase: float = 0.0, rear_overhang: float = 0.0, width: float = 0.0, action_space_lb: Iterable[float] = (-inf, -inf), action_space_ub: Iterable[float] = (inf, inf), max_lat_acc: float = 4.0, **kwargs)¶
TODO: refer this part to protobuf and so for remove redundant doc. :param hyper_parameter: Hyper-parameter as parsing result. :param front_overhang: Distance from front axle to vehicle front. :param wheelbase: Distance between front axle and rear axle. :param rear_overhang: Distance from rear axle to vehicle rear. :param width: Width of vehicle. :param action_space_lb: Lower bound of action space. :param action_space_ub: Upper bound of action space. :param max_lat_acc: Maximum lateral acceleration.
- classmethod serialize_action(action: ndarray) Action¶
Serialize action to proto.
- Parameters:
action – Vectorized action.
- Returns:
Serialized action.
- Return type:
Action
- classmethod serialize_state(state: ndarray) State¶
Serialize state to proto.
- Parameters:
state – Vectorized state.
- Returns:
Serialized state.
- Return type:
State
- class simulator.dynamics_models.DynamicsModelManager(hyper_parameters: Iterable[HyperParameter] = (), dynamics_model_configs: Iterable = ())¶
Manager for Dynamics Models, supporting random sampling/operating on sampled dynamics model/etc.
- dynamics_models¶
Collection of dynamics models
- sampled_dynamics_model¶
The currently sampled dynamics model.
- sampled_dynamics_model_index¶
THe index of currently sampled dynamics model.
- Type:
int
- __init__(hyper_parameters: Iterable[HyperParameter] = (), dynamics_model_configs: Iterable = ())¶
- Parameters:
hyper_parameters – Hyper-parameters of dynamics models to be managed by the manager.
dynamics_model_configs – configurations of dynamics models.
- __weakref__¶
list of weak references to the object (if defined)
- get_all_hyper_parameters() List[HyperParameter]¶
Get the collection of hyper-parameters of all dynamics models.
- Returns:
The list of dynamics models.
- Return type:
List[HyperParameter]
- get_dynamics_model_info() str¶
Get the string about of information of dynamics models.
- Returns:
The string of information of dynamics models.
- Return type:
str
- get_dynamics_model_observation_space() Space¶
Get the dynamics model observation space. Assuming this space is identical across models within the collection
- Returns:
Dynamics model observation space.
- Return type:
Space
- get_sampled_dynamics_model() BaseDynamicsModel¶
Return the sampled dynamics model.
- Returns:
Sampled dynamics model.
- Return type:
- get_state_observation_space() Space¶
Get the state observation space. Assuming this space is identical across models within the collection.
- Returns:
State observation space
- Return type:
Space
- sample_dynamics_model() Tuple[int, BaseDynamicsModel]¶
Randomly sample a dynamics model.
- Returns:
sampled dynamics model
- Return type:
- select_dynamics_model_by_name(name: str) Tuple[int, BaseDynamicsModel]¶
Select a dynamics model by name.
- Parameters:
name (str) – The name of dynamics model.
- Returns:
The index and object of the selected dynamics model.
- Return type:
Tuple[int, BaseDynamicsModel]
simulator.rl_learning¶
- simulator.rl_learning.compute_bicycle_model_metrics(env_data: Environment, environment: ExtendedGymEnv) Dict[str, Any]¶
Compute metrics for the bicycle model for an episode.
- Parameters:
episode – Data of the episode.
environment – Associated environment.
- Returns:
Computed metrics.
l2_distance_median: median L2 distance
scaled_action_norm_median
reward_median
- Return type:
Dict[str, Any]
- simulator.rl_learning.eval_with_sb3(environment: Env, algorithm: BaseAlgorithm, report_dir: str, n_episodes: int, compute_metrics_name: str, visualization_function_name: str, viz_interval: int = 10)¶
RL Evaluation with Stable Baselines3.
- Parameters:
environment – Evaluation environment.
algorithm – The algorithm with models to be evaluated.
report_dir – Directory to export report JSON.
n_episodes – Number of episodes.
compute_metrics_name – Name of compute_metrics.
visualization_function_name – Name of visualization_function.
viz_interval – Interval of episodes that this function performs visualization. TODO: set it with argument passed through Shell script.
- simulator.rl_learning.export_sb3_jit_module(algorithm: BasePolicy, environment: Env, export_dir: str, test_case_save_format: str = 'protobuf', device: device | str = 'cpu', n_test_cases: int = 1) ScriptModule¶
Export jit module from Stable Baselines 3 algorithm.
Reference: https://stable-baselines3.readthedocs.io/en/master/guide/export.html#trace-export-to-c
- Parameters:
algorithm – SB3 algorithm to be traced.
input_size – Input tensor size.
device – Desired device where the tranced module is to be inferred.
export_dir – Directory to export traced module.
test_case_save_format – (‘protobuf’|’numpy’)
n_test_cases – Number of test cases.
- simulator.rl_learning.roll_out_one_episode(environment: ExtendedGymEnv, policy_func: Callable, **kwargs) Tuple[List[ndarray], List[ndarray], List[ndarray]]¶
Roll out one episode and return a trajectory.
- Parameters:
environment – The associated environment.
policy_func – The policy function, observation -> action.
- Returns:
States. List[np.ndarray]: Actions. List[np.ndarray]: Observations.
- Return type:
List[np.ndarray]
- simulator.rl_learning.test_sb3_jit_module(jit_module_path: str, test_cases_path: str, test_case_save_format: str = 'protobuf', device: device | str = 'cpu', rtol: float = 0.001, atol: float = 0.0001) bool¶
Test numeric accuracy of JIT module exported from Stable Baselines 3.
- Parameters:
jit_module_path – Path to the JIT module.
test_cases_path – Path to protobuf file of test cases.
test_case_save_format – Save format (protobuf|numpy)
device – Device to test module.
rtol – Relative tolerance, same definition as numpy.allclose and torch.allclose.
atol – Absolute tolerance, same definition as numpy.allclose and torch.allclose.
- Returns:
Flag of allclose between original module and traced module.
- Return type:
bool
- simulator.rl_learning.train_with_sb3(environment: ExtendedGymEnv, algorithm_config: Dict, learning_config: Dict, checkpoint_file_prefix: str = '') BaseAlgorithm | None¶
RL Training with Stable Baselines3.
- Parameters:
environment – Training environment.
algorithm_config – Configuration of the algorithm.
learning_config – Configuration of the learning.
checkpoint_file_prefix – File prefix (i.e. path without extension) to save checkpoint file.
- Returns:
The algorithm object with trained models.
- Return type:
Union[BaseAlgorithm, None]
simulator.observation¶
- class simulator.observation.ObservationManager(reference_line_manager: ReferenceLineManager, dynamics_model_manager: DynamicsModelManager)¶
Manager for observation.
- reference_line_manager¶
handler of underlying reference line manager
- dynamics_model_manager¶
handler of underlying Dynamics model manager
- __init__(reference_line_manager: ReferenceLineManager, dynamics_model_manager: DynamicsModelManager)¶
- Parameters:
reference_line_manager – Underlying reference line manager
dynamics_model_manager – underlying Dynamics model manager
- __weakref__¶
list of weak references to the object (if defined)
- get_observation(episode_data, body_state: BodyState) ndarray¶
Return the vectorized observation, which is usually ego-centric.
- Parameters:
episode_data – Episode data.
body_state – (Serialized) body state of agent/dynamics model.
- Returns:
Vectorized observation.
- Return type:
np.ndarray
- get_observation_space() Space¶
Return observation space, usually consisting of multiple sub-observation spaces.
- Returns:
observation space.
- Return type:
Space
simulator.trajectory¶
- class simulator.trajectory.ReferenceLineManager(n_observation_steps: int, pad_mode: str = 'none', dtype: ~numpy.dtype = <class 'numpy.float32'>)¶
Manager for Reference Line.
- reference_line¶
Handler of underlying reference line manager.
- Type:
drltt_proto.trajectory.trajectory_pb2.ReferenceLine
- pad_mode¶
Mode used for reference line padding.
- Type:
str
- dtype¶
Data type for reference line representation and observation
- Type:
numpy.dtype
- __init__(n_observation_steps: int, pad_mode: str = 'none', dtype: ~numpy.dtype = <class 'numpy.float32'>)¶
- Parameters:
n_observation_steps – Number of observation steps on the forward part of the reference line.
pad_mode – Desired mode used for reference line padding.
dtype – Desired data type.
- __weakref__¶
list of weak references to the object (if defined)
- get_observation_by_index(episode_data, body_state: BodyState) ndarray¶
Get vectorized observation of reference line given waypoint index.
- Parameters:
episode_data – Episode data.
body_state – Body state for ego-centric observation.
- Returns:
Vectorized reference line observation. Format: (x, y) x length.
- Return type:
np.ndarray
- get_observation_by_state(body_state: BodyState, reference_line: ReferenceLine = None) ndarray¶
TODO: to implement
- get_observation_space() Space¶
Get observation space.
- Returns:
Observation space.
- Return type:
Space
- get_projected_waypoint_index(body_state: BodyState) ndarray¶
TODO: to implement
- get_reference_line() ReferenceLine¶
Return the underlying reference line.
- Returns:
Returned reference line
- Return type:
ReferenceLine
- get_reference_line_waypoint(index: int) ndarray¶
Get a waypoint on the reference line.
- Parameters:
index – Step index of the desired waypoint.
- Returns:
Reference line waypoint (vectorized form).
- Return type:
np.ndarray
- set_reference_line(reference_line: ReferenceLine, tracking_length: int = 0)¶
Set reference line.
- Parameters:
reference_line – Reference line to be set.
tracking_length – Desired tracking length of reference line.
- simulator.trajectory.random_walk(dynamics_model: BaseDynamicsModel, step_interval: float, walk_length: int) Tuple[ReferenceLine, Trajectory]¶
Perform random walk to generate reference line.
- Parameters:
dynamics_model – Dynamics model for random walk.
step_interval – Time interval of a step.
walk_length – Length of the generated trajectory.
- Returns:
Random walk results:
ReferenceLine: Generated reference line.
Trajectory: Generated trajectory.
- Return type:
Tuple[ReferenceLine, Trajectory]