From ac36eb30fdaf6a13493357300cea7a1eb9a6e247 Mon Sep 17 00:00:00 2001 From: Jeremy Morgan Date: Sun, 3 Nov 2024 21:31:53 -0500 Subject: [PATCH 1/2] first commit --- .gitignore | 3 ++- pyproject.toml | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ce49701..1357ca0 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ scripts/*.pdf scripts/warp_test.py .pytest_cache/ venv-test/ -thirdparty/rerun-loader-python-example-urdf/ \ No newline at end of file +thirdparty/rerun-loader-python-example-urdf/ +thirdparty/curobo/ \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 61179d5..d57f98e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ more-itertools = "^9.0.0" tqdm = "^4.64.1" meshcat = "0.3.2" matplotlib = "3.5.1" +curobo = { path = "thirdparty/curobo", develop = true } [tool.poetry.group.dev.dependencies] qpth = {git = "https://github.com/locuslab/qpth"} From fbd082ae5cf66c8b004e58821833332927b8c083 Mon Sep 17 00:00:00 2001 From: Jeremy Morgan Date: Sun, 24 Nov 2024 15:55:18 -0500 Subject: [PATCH 2/2] 1/4 of first pass --- jrl/deprecated.py | 397 ++++++++++++++ jrl/math_utils.py | 139 ----- jrl/obstacles.py | 40 ++ jrl/robot.py | 568 +-------------------- jrl/utils.py | 19 + jrl/world_model.py | 13 + scripts/batch_ik_covergence_analysis.ipynb | 3 +- scripts/benchmark_ik.py | 7 +- 8 files changed, 483 insertions(+), 703 deletions(-) create mode 100644 jrl/obstacles.py create mode 100644 jrl/world_model.py diff --git a/jrl/deprecated.py b/jrl/deprecated.py index 2300543..defe884 100644 --- a/jrl/deprecated.py +++ b/jrl/deprecated.py @@ -27,3 +27,400 @@ def geodesic_distance_between_quaternions_warp(q1: torch.Tensor, q2: torch.Tenso dist_wp = wp.zeros(q1.shape[0], dtype=float, device=str(q1.device)) wp.launch(kernel=_geodesic_distance_quaternions, dim=len(q1), inputs=[q1_wp, q2_wp, dist_wp], device=str(q1.device)) return wp.to_torch(dist_wp).to(q1.device) + + + +def inverse_kinematics_autodiff_single_step_batch_pt( + self, + target_poses: torch.Tensor, + xs_current: torch.Tensor, + alpha: float = 0.10, + dtype: torch.dtype = DEFAULT_TORCH_DTYPE, +) -> torch.Tensor: + """Perform a single inverse kinematics step on a batch of joint angle vectors using pytorch. + + Notes: + 1. `target_poses` and `xs_current` need to be on the same device. + 2. the returned tensor will be on the same device as `target_poses` and `xs_current` + + Args: + target_poses (torch.Tensor): [batch x 7] poses to optimize the joint angles towards. + xs_current (torch.Tensor): [batch x ndofs] joint angles to start the optimization from. + alpha (float, optional): Step size for the optimization step. Defaults to 0.25. + + Returns: + torch.Tensor: Updated joint angles + """ + assert self._batch_fk_enabled, "_batch_fk_enabled is required for batch_ik, but is disabled for this robot" + _assert_is_pose_matrix(target_poses) + _assert_is_joint_angle_matrix(xs_current, self.ndof) + assert xs_current.shape[0] == target_poses.shape[0] + assert ( + xs_current.device == target_poses.device + ), f"xs_current and target_poses must be on the same device (got {xs_current.device} and {target_poses.device})" + + # New graph + xs_current = xs_current.detach() + xs_current.requires_grad = True + + # Run the xs_current through FK to get their realized poses + current_poses = self.forward_kinematics(xs_current, out_device=xs_current.device, dtype=dtype) + assert ( + current_poses.shape == target_poses.shape + ), f"current_poses.shape != target_poses.shape ({current_poses.shape} != {target_poses.shape})" + + t_err = target_poses[:, 0:3] - current_poses[:, 0:3] + R_err = geodesic_distance_between_quaternions(target_poses[:, 3:7], current_poses[:, 3:7]) + loss = torch.sum(t_err**2) + torch.sum(R_err**2) + loss.backward() + + xs_updated = xs_current - alpha * xs_current.grad + + assert torch.isnan(xs_updated).sum() == 0, "xs_updated contains NaNs" + assert xs_current.device == xs_updated.device + xs_updated = self.clamp_to_joint_limits(xs_updated) + return xs_updated.detach() + + + + + + + + + + + +######################################################################################################################## +######################################################################################################################## +# QP Collision Checking + +class Robot: + def __init__(self): + # ... + self._collision_capsules = None + self._capsule_idx_to_link_idx = None + self._collision_idx0 = None + self._collision_idx1 = None + if self._collision_capsules_by_link is not None: + ( + self._collision_capsules, + self._capsule_idx_to_link_idx, + self._collision_idx0, + self._collision_idx1, + ) = _generate_self_collision_pairs( + self._collision_capsules_by_link, + self._end_effector_kinematic_chain, + ignored_collision_pairs, + additional_link=self._additional_link_name, + additional_link_lca_joint=( + self._additional_link_lca_joint if self._additional_link_name is not None else None + ), + ) + + self._ignored_collision_pairs = ignored_collision_pairs + [(l2, l1) for l1, l2 in ignored_collision_pairs] + + + def self_collision_distances(self, x: torch.Tensor, use_qpth: bool = False) -> torch.Tensor: + """Returns the distance between all valid collision pairs of the robot + for each joint angle vector in x + + Args: + x (torch.Tensor): [n x ndofs] joint angle vectors + + Returns: + torch.Tensor: [n x n_pairs] distances + """ + # Capsule and joint indices are offset by 1 to make room for the base + # link. + batch_size = x.shape[0] + base_T_links = self.forward_kinematics(x, return_full_link_fk=True, out_device=x.device, dtype=x.dtype) + T1s = base_T_links[:, self._collision_idx0, :, :].reshape(-1, 4, 4) + T2s = base_T_links[:, self._collision_idx1, :, :].reshape(-1, 4, 4) + c1s = self._collision_capsules[self._collision_idx0, :].expand(batch_size, -1, -1).reshape(-1, 7) + c2s = self._collision_capsules[self._collision_idx1, :].expand(batch_size, -1, -1).reshape(-1, 7) + + dists = capsule_capsule_distance_batch(c1s, T1s, c2s, T2s, use_qpth).reshape(batch_size, -1) + + return dists + + def self_collision_distances_jacobian(self, x: torch.Tensor) -> torch.Tensor: + """Returns the jacobian of the self collision distance function with respect to the joint angles. + + Args: + x (torch.Tensor): [n x ndofs] joint angle vectors + + Returns: + torch.Tensor: [n x n_pairs x ndofs] jacobian + """ + nbatch = x.shape[0] + ndofs = x.shape[1] + + with torch.autograd.forward_ad.dual_level(): + dual_input = torch.autograd.forward_ad.make_dual( + x.unsqueeze(1).expand(nbatch, ndofs, ndofs).reshape(nbatch * ndofs, ndofs).clone(), + torch.eye(ndofs, device=x.device).expand(nbatch, ndofs, ndofs).reshape(-1, ndofs).clone(), + ) + dual_output = self.self_collision_distances(dual_input) + J = torch.autograd.forward_ad.unpack_dual(dual_output).tangent + ndists = J.shape[1] + J = J.reshape(nbatch, ndofs, ndists).permute(0, 2, 1) + + return J + + def env_collision_distances(self, x: torch.Tensor, cuboid: torch.Tensor, Tcuboid: torch.Tensor) -> torch.Tensor: + """Returns the distance between the robot collision capsules and the environment cuboid obstacle for each joint + angle vector in x. + + Args: + x (torch.Tensor): [n x ndofs] joint angle vectors + cuboid (torch.Tensor): [6] cuboid xyz min and xyz max + Tcuboid (torch.Tensor): [4 x 4] cuboid poses + + Returns: + torch.Tensor: [n x n_capsules] distances + """ + + batch_size = x.shape[0] + base_T_links = self.forward_kinematics(x, return_full_link_fk=True, out_device=x.device, dtype=x.dtype) + Tcapsules = base_T_links.reshape(-1, 4, 4) + big_batch_size = Tcapsules.shape[0] + capsules = self._collision_capsules.expand(batch_size, -1, -1).reshape(-1, 7) + Tcuboid = Tcuboid.expand(big_batch_size, 4, 4) + cuboid = cuboid.expand(big_batch_size, 6) + + dists = capsule_cuboid_distance_batch(capsules, Tcapsules, cuboid, Tcuboid).reshape(batch_size, -1) + + return dists + + def env_collision_distances_jacobian( + self, x: torch.Tensor, cuboid: torch.Tensor, Tcuboid: torch.Tensor + ) -> torch.Tensor: + nbatch = x.shape[0] + ndofs = x.shape[1] + + with torch.autograd.forward_ad.dual_level(): + dual_input = torch.autograd.forward_ad.make_dual( + x.unsqueeze(1).expand(nbatch, ndofs, ndofs).reshape(nbatch * ndofs, ndofs).clone(), + torch.eye(ndofs, device=x.device).expand(nbatch, ndofs, ndofs).reshape(-1, ndofs).clone(), + ) + dual_output = self.env_collision_distances(dual_input, cuboid, Tcuboid) + J = torch.autograd.forward_ad.unpack_dual(dual_output).tangent + ndists = J.shape[1] + J = J.reshape(nbatch, ndofs, ndists).permute(0, 2, 1) + + return J + +def _generate_self_collision_pairs( + collision_capsules_by_link: Dict[str, torch.Tensor], + joint_chain: List[Joint], + ignored_collision_pairs: List[Tuple[str, str]], + additional_link: Optional[str] = None, + additional_link_lca_joint: Optional[Joint] = None, +): + """ + Generate collision pairs from collision capsules and joint chain. Adjacent links in the joint chain are allowed to + collide. + + Returns capsules and idx0, idx1, such that capsules[idx0[i]] and capsules[idx1[i]] must be checked for collision. + + Args: + joint_chain (List[Joint]): Joint chain of the robot. + ignored_collision_pairs (List[Tuple[str, str]], optional): List of collision pairs to ignore. Defaults to []. + additional_link (Optional[str]): An optional additional link to add to the collision pairs. Defaults to None. + additional_link_lca_joint (Optional[Joint]): The artificial joint that connects the additional link to the + joint_chain + + Returns: + Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: capsules, idx0, idx1 + """ + ignored_collision_set = set(tuple(sorted(pair)) for pair in ignored_collision_pairs) + for link_name, capsule in collision_capsules_by_link.items(): + if capsule is None: + for other_link_name in collision_capsules_by_link.keys(): + ignored_collision_set.add(tuple(sorted((link_name, other_link_name)))) + collision_capsules_by_link[link_name] = torch.tensor( + [0, 0, 0, 0, 0, 0.01, 0.001], device=DEVICE, dtype=DEFAULT_TORCH_DTYPE + ) + + link_name_to_idx = {} + capsule_idx_to_joint_idx = [] + capsules = [] + for i, joint in enumerate(joint_chain): + if i == 0 and joint.parent in collision_capsules_by_link: + capsules.append(collision_capsules_by_link[joint.parent]) + link_name_to_idx[joint.parent] = 0 + capsule_idx_to_joint_idx.append(i) + + if joint.child in collision_capsules_by_link: + capsules.append(collision_capsules_by_link[joint.child]) + link_name_to_idx[joint.child] = i + 1 + capsule_idx_to_joint_idx.append(i + 1) + + ignored_collision_set.add(tuple(sorted((joint.parent, joint.child)))) + + # Add in the additional link + if additional_link is not None: + capsules.append(collision_capsules_by_link[additional_link]) + idx = capsule_idx_to_joint_idx[-1] + 1 + link_name_to_idx[additional_link] = idx + capsule_idx_to_joint_idx.append(idx) + ignored_collision_set.add(tuple(sorted((additional_link_lca_joint.parent, additional_link_lca_joint.child)))) + + idx0, idx1 = [], [] + link_names = list(collision_capsules_by_link.keys()) + for i in range(len(link_names)): + for j in range(i + 1, len(link_names)): + if (link_names[i], link_names[j]) in ignored_collision_set: + continue + if (link_names[j], link_names[i]) in ignored_collision_set: + continue + idx0.append(link_name_to_idx[link_names[i]]) + idx1.append(link_name_to_idx[link_names[j]]) + + return ( + torch.stack(capsules, dim=0), + torch.tensor(capsule_idx_to_joint_idx, dtype=torch.long, device=DEVICE), + torch.tensor(idx0, dtype=torch.long, device=DEVICE), + torch.tensor(idx1, dtype=torch.long, device=DEVICE), + ) + + +class QP: + def __init__(self, Q, p, G, h, A=None, b=None): + """Solve a quadratic program of the form + min 1/2 x^T Q x + p^T x + s.t. Gx <= h, Ax = b + + Args: + Q (torch.Tensor): [nbatch x dim x dim] positive semidefinite matrix + p (torch.Tensor): [nbatch x dim] vector + G (torch.Tensor): [nbatch x nc x dim] matrix + h (torch.Tensor): [nbatch x nc] vector + A (torch.Tensor, optional): [nbatch x neq x dim] matrix. Defaults to + None. + b (torch.Tensor, optional): [nbatch x neq] vector. Defaults to None. + + Raises: + NotImplementedError: Equality constraints not implemented + + Returns: + torch.Tensor: [nbatch x dim] solution + """ + self.nbatch = Q.shape[0] + self.dim = Q.shape[1] + self.nc = G.shape[1] + self.Q = Q + assert len(p.shape) == 2, f"p.shape: {p.shape}" + self.p = p.unsqueeze(2) + self.G = G + assert len(h.shape) == 2, f"h.shape: {h.shape}" + self.h = h.unsqueeze(2) + + assert ( + self.nbatch == self.p.shape[0] == self.G.shape[0] == self.h.shape[0] == self.Q.shape[0] + ), f"{self.nbatch}, {self.p.shape[0]}, {self.G.shape[0]}, {self.h.shape[0]}, {self.Q.shape[0]}" + assert ( + self.dim == self.p.shape[1] == self.G.shape[2] == self.Q.shape[1] == self.Q.shape[2] + ), f"{self.dim}, {self.p.shape[1]}, {self.G.shape[2]}, {self.Q.shape[1]}, {self.Q.shape[2]}" + assert self.nc == self.G.shape[1] == self.h.shape[1] + + if A is not None or b is not None: + raise NotImplementedError("Equality constraints not implemented") + self.A = A + self.b = b + + def solve(self, trace=False, iterlimit=None): + x = torch.zeros((self.nbatch, self.dim, 1), dtype=torch.float32, device=self.Q.device) + if trace: + trace = [x] + working_set = torch.zeros((self.nbatch, self.nc, 1), dtype=torch.bool, device=x.device) + converged = torch.zeros((self.nbatch, 1, 1), dtype=torch.bool, device=x.device) + + if iterlimit is None: + iterlimit = 10 * self.nc + + iterations = 0 + while not torch.all(converged): + A = self.G * working_set + b = self.h * working_set + + g = self.Q.bmm(x) + self.p + h = A.bmm(x) - b + + AT = A.transpose(1, 2) + AQinvAT = A.bmm(torch.linalg.solve(self.Q, AT)) + rhs = h - A.bmm(torch.linalg.solve(self.Q, g)) + diag = torch.diag_embed(~working_set.squeeze(dim=2)) + lmbd = torch.linalg.solve(AQinvAT + diag, rhs) + p = torch.linalg.solve(self.Q, -AT.bmm(lmbd) - g) + + at_ws_sol = torch.linalg.norm(p, dim=1, keepdim=True) < 1e-3 + + lmbdmin, lmbdmin_idx = torch.min( + torch.where( + at_ws_sol & working_set, + lmbd, + torch.inf, + ), + dim=1, + keepdim=True, + ) + + converged = converged | (at_ws_sol & (lmbdmin >= 0)) + + # remove inverted constraints from working set + working_set = working_set.scatter( + 1, + lmbdmin_idx, + (~at_ws_sol | (lmbdmin >= 0)) & working_set.gather(1, lmbdmin_idx), + ) + + # check constraint violations + mask = ~at_ws_sol & ~working_set & (self.G.bmm(p) > 0) + alpha, alpha_idx = torch.min( + torch.where( + mask, + (self.h - self.G.bmm(x)) / (self.G.bmm(p)), + torch.inf, + ), + dim=1, + keepdim=True, + ) + + # add violated constraints to working set + working_set = working_set.scatter( + 1, + alpha_idx, + working_set.gather(1, alpha_idx) | (~at_ws_sol & (alpha < 1)), + ) + alpha = torch.clamp(alpha, max=1) + + # update solution + x = x + alpha * p * (~at_ws_sol & ~converged) + if trace: + trace.append(x.squeeze(2)) + + iterations += 1 + if iterations > iterlimit: + # Qs = self.Q[~converged.flatten()] + # ps = self.p[~converged.flatten()] + # Gs = self.G[~converged.flatten()] + # hs = self.h[~converged.flatten()] + # xs = x[~converged.flatten()] + # for i in range(torch.sum(~converged)): + # print(f"Qs[{i}]:\n{Qs[i]}") + # print(f"ps[{i}]:\n{ps[i]}") + # print(f"Gs[{i}]:\n{Gs[i]}") + # print(f"hs[{i}]:\n{hs[i]}") + # print(f"xs[{i}]:\n{xs[i]}") + raise RuntimeError( + f"Failed to converge in {iterlimit} iterations\n\n{torch.sum(~converged).item()} out of" + f" {self.nbatch} not converged" + ) + + if trace: + return x.squeeze(2), trace + + return x.squeeze(2) + diff --git a/jrl/math_utils.py b/jrl/math_utils.py index 274b665..155c2cf 100644 --- a/jrl/math_utils.py +++ b/jrl/math_utils.py @@ -25,145 +25,6 @@ ) -class QP: - def __init__(self, Q, p, G, h, A=None, b=None): - """Solve a quadratic program of the form - min 1/2 x^T Q x + p^T x - s.t. Gx <= h, Ax = b - - Args: - Q (torch.Tensor): [nbatch x dim x dim] positive semidefinite matrix - p (torch.Tensor): [nbatch x dim] vector - G (torch.Tensor): [nbatch x nc x dim] matrix - h (torch.Tensor): [nbatch x nc] vector - A (torch.Tensor, optional): [nbatch x neq x dim] matrix. Defaults to - None. - b (torch.Tensor, optional): [nbatch x neq] vector. Defaults to None. - - Raises: - NotImplementedError: Equality constraints not implemented - - Returns: - torch.Tensor: [nbatch x dim] solution - """ - self.nbatch = Q.shape[0] - self.dim = Q.shape[1] - self.nc = G.shape[1] - self.Q = Q - assert len(p.shape) == 2, f"p.shape: {p.shape}" - self.p = p.unsqueeze(2) - self.G = G - assert len(h.shape) == 2, f"h.shape: {h.shape}" - self.h = h.unsqueeze(2) - - assert ( - self.nbatch == self.p.shape[0] == self.G.shape[0] == self.h.shape[0] == self.Q.shape[0] - ), f"{self.nbatch}, {self.p.shape[0]}, {self.G.shape[0]}, {self.h.shape[0]}, {self.Q.shape[0]}" - assert ( - self.dim == self.p.shape[1] == self.G.shape[2] == self.Q.shape[1] == self.Q.shape[2] - ), f"{self.dim}, {self.p.shape[1]}, {self.G.shape[2]}, {self.Q.shape[1]}, {self.Q.shape[2]}" - assert self.nc == self.G.shape[1] == self.h.shape[1] - - if A is not None or b is not None: - raise NotImplementedError("Equality constraints not implemented") - self.A = A - self.b = b - - def solve(self, trace=False, iterlimit=None): - x = torch.zeros((self.nbatch, self.dim, 1), dtype=torch.float32, device=self.Q.device) - if trace: - trace = [x] - working_set = torch.zeros((self.nbatch, self.nc, 1), dtype=torch.bool, device=x.device) - converged = torch.zeros((self.nbatch, 1, 1), dtype=torch.bool, device=x.device) - - if iterlimit is None: - iterlimit = 10 * self.nc - - iterations = 0 - while not torch.all(converged): - A = self.G * working_set - b = self.h * working_set - - g = self.Q.bmm(x) + self.p - h = A.bmm(x) - b - - AT = A.transpose(1, 2) - AQinvAT = A.bmm(torch.linalg.solve(self.Q, AT)) - rhs = h - A.bmm(torch.linalg.solve(self.Q, g)) - diag = torch.diag_embed(~working_set.squeeze(dim=2)) - lmbd = torch.linalg.solve(AQinvAT + diag, rhs) - p = torch.linalg.solve(self.Q, -AT.bmm(lmbd) - g) - - at_ws_sol = torch.linalg.norm(p, dim=1, keepdim=True) < 1e-3 - - lmbdmin, lmbdmin_idx = torch.min( - torch.where( - at_ws_sol & working_set, - lmbd, - torch.inf, - ), - dim=1, - keepdim=True, - ) - - converged = converged | (at_ws_sol & (lmbdmin >= 0)) - - # remove inverted constraints from working set - working_set = working_set.scatter( - 1, - lmbdmin_idx, - (~at_ws_sol | (lmbdmin >= 0)) & working_set.gather(1, lmbdmin_idx), - ) - - # check constraint violations - mask = ~at_ws_sol & ~working_set & (self.G.bmm(p) > 0) - alpha, alpha_idx = torch.min( - torch.where( - mask, - (self.h - self.G.bmm(x)) / (self.G.bmm(p)), - torch.inf, - ), - dim=1, - keepdim=True, - ) - - # add violated constraints to working set - working_set = working_set.scatter( - 1, - alpha_idx, - working_set.gather(1, alpha_idx) | (~at_ws_sol & (alpha < 1)), - ) - alpha = torch.clamp(alpha, max=1) - - # update solution - x = x + alpha * p * (~at_ws_sol & ~converged) - if trace: - trace.append(x.squeeze(2)) - - iterations += 1 - if iterations > iterlimit: - # Qs = self.Q[~converged.flatten()] - # ps = self.p[~converged.flatten()] - # Gs = self.G[~converged.flatten()] - # hs = self.h[~converged.flatten()] - # xs = x[~converged.flatten()] - # for i in range(torch.sum(~converged)): - # print(f"Qs[{i}]:\n{Qs[i]}") - # print(f"ps[{i}]:\n{ps[i]}") - # print(f"Gs[{i}]:\n{Gs[i]}") - # print(f"hs[{i}]:\n{hs[i]}") - # print(f"xs[{i}]:\n{xs[i]}") - raise RuntimeError( - f"Failed to converge in {iterlimit} iterations\n\n{torch.sum(~converged).item()} out of" - f" {self.nbatch} not converged" - ) - - if trace: - return x.squeeze(2), trace - - return x.squeeze(2) - - # batch*n def normalize_vector(v: torch.Tensor, return_mag: bool = False): batch = v.shape[0] diff --git a/jrl/obstacles.py b/jrl/obstacles.py new file mode 100644 index 0000000..8a7d730 --- /dev/null +++ b/jrl/obstacles.py @@ -0,0 +1,40 @@ + + + +@dataclass +class CuboidObstacle: + TODO + + +class BatchedCuboidObstacle: + TODO + + + + def env_collision_distances(self, x: torch.Tensor, cuboid: torch.Tensor, Tcuboid: torch.Tensor) -> torch.Tensor: + """Returns the distance between the robot collision capsules and the environment cuboid obstacle for each joint + angle vector in x. + + Args: + x (torch.Tensor): [n x ndofs] joint angle vectors + cuboid (torch.Tensor): [6] cuboid xyz min and xyz max + Tcuboid (torch.Tensor): [4 x 4] cuboid poses + + Returns: + torch.Tensor: [n x n_capsules] distances + """ + TODO + raise NotImplementedError("Implement self_collision_distances_jacobian") + + def env_collision_distances_jacobian( + self, x: torch.Tensor, cuboid: torch.Tensor, Tcuboid: torch.Tensor + ) -> torch.Tensor: + TODO + raise NotImplementedError("Implement self_collision_distances_jacobian") + + + + + + + diff --git a/jrl/robot.py b/jrl/robot.py index b72c4d5..827d91e 100644 --- a/jrl/robot.py +++ b/jrl/robot.py @@ -1,15 +1,8 @@ from typing import List, Tuple, Optional, Union, Dict from functools import cached_property -from more_itertools import locate import torch import numpy as np -import klampt -from klampt import IKSolver, WorldModel -from klampt.model import ik -from klampt.math import so3 -from klampt import robotsim -from klampt.model.collide import WorldCollider import tqdm from jrl.math_utils import ( @@ -20,7 +13,6 @@ quaternion_to_rpy, rotation_matrix_to_quaternion, quaternion_norm, - geodesic_distance_between_quaternions, DEFAULT_TORCH_DTYPE, ) from jrl.config import DEVICE, PT_NP_TYPE @@ -32,105 +24,10 @@ UNHANDLED_JOINT_TYPES, merge_fixed_joints_to_one, ) -from jrl.utils import to_torch +from jrl.utils import to_torch, _assert_is_2d, _assert_is_pose_matrix, _assert_is_joint_angle_matrix from jrl.geometry import capsule_capsule_distance_batch, capsule_cuboid_distance_batch -def _assert_is_2d(x: Union[torch.Tensor, np.ndarray]): - assert len(x.shape) == 2, f"Expected x to be a 2D array but got {x.shape}" - assert isinstance(x, (torch.Tensor, np.ndarray)), f"Expected x to be a torch.Tensor or np.ndarray but got {type(x)}" - - -def _assert_is_pose_matrix(poses: Union[torch.Tensor, np.ndarray]): - _assert_is_2d(poses) - assert poses.shape[1] == 7, f"Expected poses matrix to be [n x 7] but got {poses.shape}" - norms = quaternion_norm(poses[:, 3:7]) - assert max(norms) < 1.01 and min(norms) > 0.99, "quaternion(s) are not unit quaternion(s)" - - -def _assert_is_joint_angle_matrix(xs: Union[torch.Tensor, np.ndarray], ndof: int): - _assert_is_2d(xs) - assert xs.shape[1] == ndof, f"Expected matrix to be [n x ndof] ([{xs.shape[0]} x {ndof}]) but got {xs.shape}" - - -def _assert_is_np(x: np.ndarray, variable_name: str = "input"): - assert isinstance(x, np.ndarray), f"Expected {variable_name} to be a numpy array but got {type(x)}" - - -def _generate_self_collision_pairs( - collision_capsules_by_link: Dict[str, torch.Tensor], - joint_chain: List[Joint], - ignored_collision_pairs: List[Tuple[str, str]], - additional_link: Optional[str] = None, - additional_link_lca_joint: Optional[Joint] = None, -): - """ - Generate collision pairs from collision capsules and joint chain. Adjacent links in the joint chain are allowed to - collide. - - Returns capsules and idx0, idx1, such that capsules[idx0[i]] and capsules[idx1[i]] must be checked for collision. - - Args: - joint_chain (List[Joint]): Joint chain of the robot. - ignored_collision_pairs (List[Tuple[str, str]], optional): List of collision pairs to ignore. Defaults to []. - additional_link (Optional[str]): An optional additional link to add to the collision pairs. Defaults to None. - additional_link_lca_joint (Optional[Joint]): The artificial joint that connects the additional link to the - joint_chain - - Returns: - Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: capsules, idx0, idx1 - """ - ignored_collision_set = set(tuple(sorted(pair)) for pair in ignored_collision_pairs) - for link_name, capsule in collision_capsules_by_link.items(): - if capsule is None: - for other_link_name in collision_capsules_by_link.keys(): - ignored_collision_set.add(tuple(sorted((link_name, other_link_name)))) - collision_capsules_by_link[link_name] = torch.tensor( - [0, 0, 0, 0, 0, 0.01, 0.001], device=DEVICE, dtype=DEFAULT_TORCH_DTYPE - ) - - link_name_to_idx = {} - capsule_idx_to_joint_idx = [] - capsules = [] - for i, joint in enumerate(joint_chain): - if i == 0 and joint.parent in collision_capsules_by_link: - capsules.append(collision_capsules_by_link[joint.parent]) - link_name_to_idx[joint.parent] = 0 - capsule_idx_to_joint_idx.append(i) - - if joint.child in collision_capsules_by_link: - capsules.append(collision_capsules_by_link[joint.child]) - link_name_to_idx[joint.child] = i + 1 - capsule_idx_to_joint_idx.append(i + 1) - - ignored_collision_set.add(tuple(sorted((joint.parent, joint.child)))) - - # Add in the additional link - if additional_link is not None: - capsules.append(collision_capsules_by_link[additional_link]) - idx = capsule_idx_to_joint_idx[-1] + 1 - link_name_to_idx[additional_link] = idx - capsule_idx_to_joint_idx.append(idx) - ignored_collision_set.add(tuple(sorted((additional_link_lca_joint.parent, additional_link_lca_joint.child)))) - - idx0, idx1 = [], [] - link_names = list(collision_capsules_by_link.keys()) - for i in range(len(link_names)): - for j in range(i + 1, len(link_names)): - if (link_names[i], link_names[j]) in ignored_collision_set: - continue - if (link_names[j], link_names[i]) in ignored_collision_set: - continue - idx0.append(link_name_to_idx[link_names[i]]) - idx1.append(link_name_to_idx[link_names[j]]) - - return ( - torch.stack(capsules, dim=0), - torch.tensor(capsule_idx_to_joint_idx, dtype=torch.long, device=DEVICE), - torch.tensor(idx0, dtype=torch.long, device=DEVICE), - torch.tensor(idx1, dtype=torch.long, device=DEVICE), - ) - class Robot: def __init__( @@ -210,26 +107,6 @@ def __init__( f" ({self.ndof})." ) - self._collision_capsules = None - self._capsule_idx_to_link_idx = None - self._collision_idx0 = None - self._collision_idx1 = None - if self._collision_capsules_by_link is not None: - ( - self._collision_capsules, - self._capsule_idx_to_link_idx, - self._collision_idx0, - self._collision_idx1, - ) = _generate_self_collision_pairs( - self._collision_capsules_by_link, - self._end_effector_kinematic_chain, - ignored_collision_pairs, - additional_link=self._additional_link_name, - additional_link_lca_joint=( - self._additional_link_lca_joint if self._additional_link_name is not None else None - ), - ) - # Create and fill cache of fixed rotations between links. self._parent_T_joint_cache = {} if self._batch_fk_enabled: @@ -241,35 +118,12 @@ def __init__( ) ) - # Initialize klampt - # Note: Need to save `_klampt_world_model` as a member variable otherwise you'll be doomed to get a segfault - self._klampt_world_model = WorldModel() - # TODO: Consider finding a better way to fix the mesh filepath issue. self._urdf_filepath_absolute = get_urdf_filepath_w_filenames_updated(self._urdf_filepath) - self._klampt_world_model.loadRobot(self._urdf_filepath_absolute) # TODO: suppress output of loadRobot call - assert ( - self._klampt_world_model.numRobots() - ), f"There should be one robot loaded (found {self._klampt_world_model.numRobots()}). Is the urdf well formed?" - self._klampt_robot: robotsim.RobotModel = self._klampt_world_model.robot(0) - self._ignored_collision_pairs_formatted = [ - (self._klampt_robot.link(link1_name), self._klampt_robot.link(link2_name)) - for link1_name, link2_name in ignored_collision_pairs - ] - self._klampt_collision_checker = WorldCollider( - self._klampt_world_model, ignore=self._ignored_collision_pairs_formatted - ) - self._ignored_collision_pairs = ignored_collision_pairs + [(l2, l1) for l1, l2 in ignored_collision_pairs] - self._klampt_ee_link: robotsim.RobotModelLink = self._klampt_robot.link(self._end_effector_link_name) - self._klampt_config_dim = len(self._klampt_robot.getConfig()) - self._klampt_driver_vec_dim = self._klampt_robot.numDrivers() - self._klampt_active_dofs = self._get_klampt_active_dofs() - self._klampt_active_driver_idxs = self._get_klampt_active_driver_idxs() + if verbose: print("\n----------- Robot specs") print(f"name: {self.name}") - print("klampt config size: ", self._klampt_config_dim) - print("klampt drivers size:", self._klampt_driver_vec_dim) print("joints:") sjld = 0 for i, (joint_name, (l, u)) in enumerate(zip(self.actuated_joint_names, self.actuated_joints_limits)): @@ -278,10 +132,6 @@ def __init__( print(f"sum joint range: {round(sjld, 4)} rads") print("-----------\n") - def make_new_collision_checker(self): - self._klampt_collision_checker = WorldCollider( - self._klampt_world_model, ignore=self._ignored_collision_pairs_formatted - ) # ------------------------------------------------------------------------------------------------------------------ # --- --- @@ -358,9 +208,6 @@ def actuated_joints_velocity_limits_deg(self) -> List[float]: assert len(vals) == self.ndof, f"Error, only {len(vals)} in vals, but {self.ndof} degrees of freedom" return vals - @property - def klampt_world_model(self) -> WorldModel: - return self._klampt_world_model @property def revolute_joint_idxs(self) -> List[int]: @@ -418,7 +265,7 @@ def sample_joint_angles_and_poses( with tqdm.tqdm(total=n, disable=not tqdm_enabled) as pbar: while True: - samples_i = self.sample_joint_angles(internal_batch_size, joint_limit_eps=joint_limit_eps) + samples_i = to_torch(self.sample_joint_angles(internal_batch_size, joint_limit_eps=joint_limit_eps)) counter0_i = counter for i in range(samples_i.shape[0]): @@ -426,7 +273,7 @@ def sample_joint_angles_and_poses( if only_non_self_colliding and self.config_self_collides(sample): continue - pose = self.forward_kinematics_klampt(sample[None, :]) + pose = self.forward_kinematics(sample[None, :]) samples[counter] = sample poses[counter] = pose counter += 1 @@ -444,12 +291,6 @@ def sample_joint_angles_and_poses( " correctly for this robot?" ) - def set_klampt_robot_config(self, x: np.ndarray): - """Set the internal klampt robots config with the given joint angle vector""" - _assert_is_np(x) - assert x.shape == (self.ndof,), f"Expected x to be of shape ({self.ndof},) but got {x.shape}" - q = self._x_to_qs(np.resize(x, (1, self.ndof)))[0] - self._klampt_robot.setConfig(q) def clamp_to_joint_limits(self, x: PT_NP_TYPE): """Clamp the given joint angle vectors to the joint limits @@ -464,43 +305,7 @@ def clamp_to_joint_limits(self, x: PT_NP_TYPE): x[:, i] = clamp_fn(x[:, i], l, u) return x - def config_self_collides(self, x: PT_NP_TYPE) -> bool: - """Returns True if the given joint angle vector causes the robot to self collide - Args: - x (PT_NP_TYPE): [ndofs] tensor of joint angles - """ - assert isinstance(x, (np.ndarray, torch.Tensor)) - if isinstance(x, torch.Tensor): - x = x.detach().cpu().numpy() - self.set_klampt_robot_config(x) - collisions = self._klampt_collision_checker.robotSelfCollisions(self._klampt_robot) - # for link1, link2 in collisions: - for _ in collisions: - return True - return False - - def config_collides_with_env(self, x: PT_NP_TYPE, box: klampt.Geometry3D, return_detailed: bool = False) -> bool: - """Returns True if the given joint angle vector causes the robot to self collide - - Args: - x (PT_NP_TYPE): [ndofs] tensor of joint angles - """ - assert isinstance(x, (np.ndarray, torch.Tensor)) - if isinstance(x, torch.Tensor): - x = x.detach().cpu().numpy() - self.set_klampt_robot_config(x) - collisions = self._klampt_collision_checker.robotObjectCollisions(self._klampt_robot, box) - - if return_detailed: - collisions_list = list(collisions) - return collisions_list - - # Not sure if collisions lazily evaluated or not, so we need to iterate over instead of calling list(collisions) - # (RobotModelLink, RigidObjectModel) (for link, rigid_object in collisions:) - for _ in collisions: - return True - return False # ------------------------------------------------------------------------------------------------------------------ # --- --- @@ -518,107 +323,6 @@ def get_child_name_from_joint_name(joint_name: str) -> str: return [get_child_name_from_joint_name(joint_name) for joint_name in self.actuated_joint_names] - def _get_klampt_active_dofs(self) -> List[int]: - """Hack: We need to know which indexes of the klampt q vector are from active joints. - - Returns: - List[int]: The indexes of the klampt configuration vector which correspond to the user specified active - joints - """ - all_drivers = [self._klampt_robot.driver(i) for i in range(self._klampt_robot.numDrivers())] - actuated_joint_child_names = self._get_actuated_joint_child_names() - driver_vec_tester = [1000 if (driver.getName() in actuated_joint_child_names) else -1 for driver in all_drivers] - q_test_result = self._klampt_robot.configFromDrivers(driver_vec_tester) - q_active_joint_idxs = list(locate(q_test_result, lambda x: x == 1000)) - assert len(q_active_joint_idxs) == self.ndof, ( - f"Error - the number of active drivers in the klampt config != ndof ({len(q_active_joint_idxs)} !=" - f" {self.ndof})" - ) - return q_active_joint_idxs - - def _get_klampt_active_driver_idxs(self) -> List[int]: - """We need to know which indexes of the klampt driver vector are from user specified active joints. - - Returns: - List[int]: The indexes of the klampt driver vector which correspond to the(user specified) active joints - """ - - # Get the names of all the child links for each active joint. - # Note: driver.getName() returns the joints' child link for some god awful reason. See L1161 in Robot.cpp - # (https://github.com/krishauser/Klampt/blob/master/Cpp/Modeling/Robot.cpp#L1161) - actuated_joint_child_names = self._get_actuated_joint_child_names() - all_drivers = [self._klampt_robot.driver(i) for i in range(self._klampt_robot.numDrivers())] - driver_vec_tester = [1 if (driver.getName() in actuated_joint_child_names) else -1 for driver in all_drivers] - active_driver_idxs = list(locate(driver_vec_tester, lambda x: x == 1)) - - assert ( - len(active_driver_idxs) == self.ndof - ), f"Error - the number of active drivers != ndof ({len(active_driver_idxs)} != {self.ndof})" - return active_driver_idxs - - # TODO(@jstm): Consider changing this to take (batch x ndofs) - def _driver_vec_from_x(self, x: np.ndarray) -> List[float]: - """Format a joint angle vector into a klampt driver vector. Non user specified joints will have a value of 0. - - Args: - x (np.ndarray): (self.ndof,) joint angle vector - - Returns: - List[float]: A list with the joint angle vector formatted in the klampt driver format. Note that klampt - needs a list of floats when recieving a driver vector. - """ - assert x.size == self.ndof, f"x doesn't have {self.ndof} (ndof) elements ({self.ndof} != {x.size})" - assert x.shape == (self.ndof,), f"x.shape must be (ndof,) - ({(self.ndof,)}) != {x.shape}" - x = x.tolist() - - # return x as a list if there are no additional active joints in the urdf - if len(x) == self._klampt_driver_vec_dim: - return x - - # TODO(@jstm): Consider a non iterative implementation for this - driver_vec = [0.0] * self._klampt_driver_vec_dim - - j = 0 - for i in self._klampt_active_driver_idxs: - driver_vec[i] = x[j] - j += 1 - - return driver_vec - - # TODO(@jstm): Consider changing this to take (batch x driver_vec_dim) - def _x_from_driver_vec(self, driver_vec: List[float]) -> List[float]: - """Remove the non relevant joints from the klampt driver vector.""" - if len(driver_vec) == self.ndof: - return driver_vec - # TODO(@jstm): Consider a non iterative implementation for this - return [driver_vec[i] for i in self._klampt_active_driver_idxs] - - def _x_to_qs(self, x: np.ndarray) -> List[List[float]]: - """Return a list of klampt configurations (qs) from an array of joint angles (x) - - Args: - x: (n x ndof) array of joint angle settings - - Returns: - A list of configurations representing the robots state in klampt - """ - _assert_is_np(x) - _assert_is_joint_angle_matrix(x, self.ndof) - - n = x.shape[0] - qs = [] - for i in range(n): - driver_vec = self._driver_vec_from_x(x[i]) - qs.append(self._klampt_robot.configFromDrivers(driver_vec)) - return qs - - def _qs_to_x(self, qs: List[List[float]]) -> np.array: - """Calculate joint angle values (x) from klampt configurations (qs)""" - res = np.zeros((len(qs), self.ndof)) - for idx, q in enumerate(qs): - driver_vec = self._klampt_robot.configToDrivers(q) - res[idx, :] = self._x_from_driver_vec(driver_vec) - return res def __str__(self) -> str: s = "".format( @@ -633,25 +337,6 @@ def __str__(self) -> str: # --- Forward Kinematics --- # --- --- - def forward_kinematics_klampt(self, x: np.array, link_name: Optional[str] = None) -> np.array: - """Forward kinematics using the klampt library""" - robot_configs = self._x_to_qs(x) - dim_y = 7 - n = len(robot_configs) - y = np.zeros((n, dim_y)) - - if link_name is None: - link = self._klampt_ee_link - else: - link = self._klampt_robot.link(link_name) - - for i in range(n): - q = robot_configs[i] - self._klampt_robot.setConfig(q) - R, t = link.getTransform() - y[i, 0:3] = np.array(t) - y[i, 3:] = np.array(so3.quaternion(R)) - return y def _ensure_forward_kinematics_cache(self, device: torch.device, dtype: torch.dtype = DEFAULT_TORCH_DTYPE): if device not in self._parent_T_joint_cache: @@ -833,40 +518,6 @@ def forward_kinematics( # --- Inverse Kinematics --- # --- --- - def jacobian_np(self, x: np.ndarray) -> np.ndarray: - """Return the jacobian of the end effector link with respect to the joint angles x. - - Note: per Klamp't, the format for the returned orientation: - > The x axis is "roll", y is "pitch", and z is "yaw" - - Args: - x (np.ndarray): The joint angle to the jacobian with respect to - - Returns: - np.ndarray: a [6 x ndof] matrix, where the top 3 rows are the orientation derivatives, and the bottom three - rows are the positional derivatives. - """ - self.set_klampt_robot_config(x) - J_full = self._klampt_ee_link.getJacobian([0, 0, 0]) # [6 x klampt_config_dimension] - J = J_full[:, self._klampt_active_dofs] # see https://stackoverflow.com/a/8386737/5191069 - return J - - def jacobian_batch_np(self, xs: np.ndarray) -> np.ndarray: - """Return a batch of jacobian matrices for the given joint angle vectors. See 'jacobian_np()' for details on - the jacobian - - Args: - xs (np.ndarray): [batch x ndof] matrix of joint angle vectors - - Returns: - np.ndarray: _description_ - """ - _assert_is_joint_angle_matrix(xs, self.ndof) - n = xs.shape[0] - Js = np.zeros((n, 6, self.ndof)) - for i in range(n): - Js[i] = self.jacobian_np(xs[i]) - return Js def jacobian( self, @@ -979,7 +630,6 @@ def inverse_kinematics_step_levenburg_marquardt( return self.clamp_to_joint_limits(xs_updated) return xs_updated - # TODO: Enforce joint limits def inverse_kinematics_step_jacobian_pinv( self, target_poses: torch.Tensor, @@ -1055,143 +705,6 @@ def inverse_kinematics_step_jacobian_pinv( xs_updated = self.clamp_to_joint_limits(xs_updated) return xs_updated - def inverse_kinematics_autodiff_single_step_batch_pt( - self, - target_poses: torch.Tensor, - xs_current: torch.Tensor, - alpha: float = 0.10, - dtype: torch.dtype = DEFAULT_TORCH_DTYPE, - ) -> torch.Tensor: - """Perform a single inverse kinematics step on a batch of joint angle vectors using pytorch. - - Notes: - 1. `target_poses` and `xs_current` need to be on the same device. - 2. the returned tensor will be on the same device as `target_poses` and `xs_current` - - Args: - target_poses (torch.Tensor): [batch x 7] poses to optimize the joint angles towards. - xs_current (torch.Tensor): [batch x ndofs] joint angles to start the optimization from. - alpha (float, optional): Step size for the optimization step. Defaults to 0.25. - - Returns: - torch.Tensor: Updated joint angles - """ - assert self._batch_fk_enabled, "_batch_fk_enabled is required for batch_ik, but is disabled for this robot" - _assert_is_pose_matrix(target_poses) - _assert_is_joint_angle_matrix(xs_current, self.ndof) - assert xs_current.shape[0] == target_poses.shape[0] - assert ( - xs_current.device == target_poses.device - ), f"xs_current and target_poses must be on the same device (got {xs_current.device} and {target_poses.device})" - - # New graph - xs_current = xs_current.detach() - xs_current.requires_grad = True - - # Run the xs_current through FK to get their realized poses - current_poses = self.forward_kinematics(xs_current, out_device=xs_current.device, dtype=dtype) - assert ( - current_poses.shape == target_poses.shape - ), f"current_poses.shape != target_poses.shape ({current_poses.shape} != {target_poses.shape})" - - t_err = target_poses[:, 0:3] - current_poses[:, 0:3] - R_err = geodesic_distance_between_quaternions(target_poses[:, 3:7], current_poses[:, 3:7]) - loss = torch.sum(t_err**2) + torch.sum(R_err**2) - loss.backward() - - xs_updated = xs_current - alpha * xs_current.grad - - assert torch.isnan(xs_updated).sum() == 0, "xs_updated contains NaNs" - assert xs_current.device == xs_updated.device - xs_updated = self.clamp_to_joint_limits(xs_updated) - return xs_updated.detach() - - def inverse_kinematics_klampt( - self, - pose: np.array, - seed: Optional[np.ndarray] = None, - positional_tolerance: float = 1e-3, - n_tries: int = 50, - verbosity: int = 0, - ) -> Optional[np.array]: - """Run klampts inverse kinematics solver with the given pose - - Note: If the solver fails to find a solution with the provided seed, it will rerun with a random seed - - Per http://motion.cs.illinois.edu/software/klampt/latest/pyklampt_docs/Manual-IK.html#ik-solver: - 'To use the solver properly, you must understand how the solver uses the RobotModel: - First, the current configuration of the robot is the seed configuration to the solver. - Second, the robot's joint limits are used as the defaults. - Third, the solved configuration is stored in the RobotModel's current configuration.' - - Args: - pose (np.array): The target pose to solve for - seed (Optional[np.ndarray], optional): A seed to initialize the optimization with. Defaults to None. - verbosity (int): Set the verbosity of the function. 0: only fatal errors are printed. Defaults to 0. - """ - assert len(pose.shape) == 1 - assert pose.size == 7, f"Error, pose is {pose.shape}, should be [{7}]" - if seed is not None: - _assert_is_np(seed, variable_name="seed") - assert len(seed.shape) == 1, f"Seed must be a 1D array (currently: {seed.shape})" - assert seed.size == self.ndof - seed_q = self._x_to_qs(seed.reshape((1, self.ndof)))[0] - - max_iterations = 150 - R = so3.from_quaternion(pose[3 : 3 + 4]) - obj = ik.objective(self._klampt_ee_link, t=pose[0:3].tolist(), R=R) - - for _ in range(n_tries): - solver = IKSolver(self._klampt_robot) - solver.add(obj) - solver.setActiveDofs(self._klampt_active_dofs) - solver.setMaxIters(max_iterations) - # TODO(@jstmn): What does 'tolarance' mean for klampt? Positional error? Positional error + orientation - # error? - solver.setTolerance(positional_tolerance) - - # `sampleInitial()` needs to come after `setActiveDofs()`, otherwise x,y,z,r,p,y of the robot will - # be randomly set aswell <(*<_*)> - if seed is None: - solver.sampleInitial() - else: - # solver.setBiasConfig(seed_q) - self._klampt_robot.setConfig(seed_q) - - res = solver.solve() - if not res: - if verbosity > 0: - print( - " inverse_kinematics_klampt() IK failed after", - solver.lastSolveIters(), - "optimization steps, retrying (non fatal)", - ) - - # Rerun the solver with a random seed - if seed is not None: - return self.inverse_kinematics_klampt( - pose, - seed=None, - positional_tolerance=positional_tolerance, - verbosity=verbosity, - ) - - continue - - if verbosity > 1: - print("Solved in", solver.lastSolveIters(), "iterations") - residual = solver.getResidual() - print("Residual:", residual, " - L2 error:", np.linalg.norm(residual[0:3])) - - return self._qs_to_x([self._klampt_robot.getConfig()]) - - if verbosity > 0: - print( - "inverse_kinematics_klampt() - Failed to find IK solution after", - n_tries, - "optimization attempts", - ) - return None # ------------------------------------------------------------------------------------------------------------------ # --- --- @@ -1210,20 +723,12 @@ def self_collision_distances(self, x: torch.Tensor, use_qpth: bool = False) -> t """ # Capsule and joint indices are offset by 1 to make room for the base # link. - batch_size = x.shape[0] - base_T_links = self.forward_kinematics(x, return_full_link_fk=True, out_device=x.device, dtype=x.dtype) - T1s = base_T_links[:, self._collision_idx0, :, :].reshape(-1, 4, 4) - T2s = base_T_links[:, self._collision_idx1, :, :].reshape(-1, 4, 4) - c1s = self._collision_capsules[self._collision_idx0, :].expand(batch_size, -1, -1).reshape(-1, 7) - c2s = self._collision_capsules[self._collision_idx1, :].expand(batch_size, -1, -1).reshape(-1, 7) - - dists = capsule_capsule_distance_batch(c1s, T1s, c2s, T2s, use_qpth).reshape(batch_size, -1) + TODO + raise NotImplementedError("Implement self_collision_distances_jacobian") - return dists def self_collision_distances_jacobian(self, x: torch.Tensor) -> torch.Tensor: - """Returns the jacobian of the self collision distance function with - respect to the joint angles. + """Returns the jacobian of the self collision distance function with respect to the joint angles. Args: x (torch.Tensor): [n x ndofs] joint angle vectors @@ -1231,63 +736,8 @@ def self_collision_distances_jacobian(self, x: torch.Tensor) -> torch.Tensor: Returns: torch.Tensor: [n x n_pairs x ndofs] jacobian """ - nbatch = x.shape[0] - ndofs = x.shape[1] - - with torch.autograd.forward_ad.dual_level(): - dual_input = torch.autograd.forward_ad.make_dual( - x.unsqueeze(1).expand(nbatch, ndofs, ndofs).reshape(nbatch * ndofs, ndofs).clone(), - torch.eye(ndofs, device=x.device).expand(nbatch, ndofs, ndofs).reshape(-1, ndofs).clone(), - ) - dual_output = self.self_collision_distances(dual_input) - J = torch.autograd.forward_ad.unpack_dual(dual_output).tangent - ndists = J.shape[1] - J = J.reshape(nbatch, ndofs, ndists).permute(0, 2, 1) - - return J - - def env_collision_distances(self, x: torch.Tensor, cuboid: torch.Tensor, Tcuboid: torch.Tensor) -> torch.Tensor: - """Returns the distance between the robot collision capsules and the - environment cuboid obstacle for each joint angle vector in x. - - Args: - x (torch.Tensor): [n x ndofs] joint angle vectors - cuboid (torch.Tensor): [6] cuboid xyz min and xyz max - Tcuboid (torch.Tensor): [4 x 4] cuboid poses - - Returns: - torch.Tensor: [n x n_capsules] distances - """ - - batch_size = x.shape[0] - base_T_links = self.forward_kinematics(x, return_full_link_fk=True, out_device=x.device, dtype=x.dtype) - Tcapsules = base_T_links.reshape(-1, 4, 4) - big_batch_size = Tcapsules.shape[0] - capsules = self._collision_capsules.expand(batch_size, -1, -1).reshape(-1, 7) - Tcuboid = Tcuboid.expand(big_batch_size, 4, 4) - cuboid = cuboid.expand(big_batch_size, 6) - - dists = capsule_cuboid_distance_batch(capsules, Tcapsules, cuboid, Tcuboid).reshape(batch_size, -1) - - return dists - - def env_collision_distances_jacobian( - self, x: torch.Tensor, cuboid: torch.Tensor, Tcuboid: torch.Tensor - ) -> torch.Tensor: - nbatch = x.shape[0] - ndofs = x.shape[1] - - with torch.autograd.forward_ad.dual_level(): - dual_input = torch.autograd.forward_ad.make_dual( - x.unsqueeze(1).expand(nbatch, ndofs, ndofs).reshape(nbatch * ndofs, ndofs).clone(), - torch.eye(ndofs, device=x.device).expand(nbatch, ndofs, ndofs).reshape(-1, ndofs).clone(), - ) - dual_output = self.env_collision_distances(dual_input, cuboid, Tcuboid) - J = torch.autograd.forward_ad.unpack_dual(dual_output).tangent - ndists = J.shape[1] - J = J.reshape(nbatch, ndofs, ndists).permute(0, 2, 1) - - return J + TODO + raise NotImplementedError("Implement self_collision_distances_jacobian") def forward_kinematics_kinpy(robot: Robot, x: np.array) -> np.array: diff --git a/jrl/utils.py b/jrl/utils.py index 2adeda6..9fb005b 100644 --- a/jrl/utils.py +++ b/jrl/utils.py @@ -2,6 +2,7 @@ import random import pathlib import colorsys +from typing import List, Tuple, Optional, Union, Dict # TODO: pkg_resources is deprecated. Use importlib.resources instead. import pkg_resources @@ -11,6 +12,24 @@ from jrl.config import DEVICE, DEFAULT_TORCH_DTYPE, PT_NP_TYPE +def _assert_is_2d(x: Union[torch.Tensor, np.ndarray]): + assert len(x.shape) == 2, f"Expected x to be a 2D array but got {x.shape}" + assert isinstance(x, (torch.Tensor, np.ndarray)), f"Expected x to be a torch.Tensor or np.ndarray but got {type(x)}" + + +def _assert_is_pose_matrix(poses: Union[torch.Tensor, np.ndarray]): + _assert_is_2d(poses) + assert poses.shape[1] == 7, f"Expected poses matrix to be [n x 7] but got {poses.shape}" + norms = quaternion_norm(poses[:, 3:7]) + assert max(norms) < 1.01 and min(norms) > 0.99, "quaternion(s) are not unit quaternion(s)" + + +def _assert_is_joint_angle_matrix(xs: Union[torch.Tensor, np.ndarray], ndof: int): + _assert_is_2d(xs) + assert xs.shape[1] == ndof, f"Expected matrix to be [n x ndof] ([{xs.shape[0]} x {ndof}]) but got {xs.shape}" + + + def cm_to_m(x: float): return x / 100.0 diff --git a/jrl/world_model.py b/jrl/world_model.py new file mode 100644 index 0000000..5a7247f --- /dev/null +++ b/jrl/world_model.py @@ -0,0 +1,13 @@ +from jrl.robot import Robot + + + + + + + +class WorldModel: + + def __init__(self, robot: Robot, obstacles: BatchedCuboidObstacles): + self._robot = robot + self._obstacles = obstacles \ No newline at end of file diff --git a/scripts/batch_ik_covergence_analysis.ipynb b/scripts/batch_ik_covergence_analysis.ipynb index cc8dd1a..94cfa17 100644 --- a/scripts/batch_ik_covergence_analysis.ipynb +++ b/scripts/batch_ik_covergence_analysis.ipynb @@ -108,8 +108,7 @@ "outputs": [], "source": [ "methods = {\n", - " \"j pinv\": lambda x, target, alpha: robot.inverse_kinematics_step_jacobian_pinv(target, x, alpha=alpha),\n", - " \"AutoDiff\": lambda x, target, alpha: robot.inverse_kinematics_autodiff_single_step_batch_pt(target, x, alpha=alpha),\n", + " \"Jac pinv\": lambda x, target, alpha: robot.inverse_kinematics_step_jacobian_pinv(target, x, alpha=alpha),\n", " \"levenburg marquardt\": lambda x, target, alpha: robot.inverse_kinematics_step_levenburg_marquardt(target, x, alpha=alpha),\n", "}\n", "\n", diff --git a/scripts/benchmark_ik.py b/scripts/benchmark_ik.py index 4a3c672..1ef277b 100644 --- a/scripts/benchmark_ik.py +++ b/scripts/benchmark_ik.py @@ -51,9 +51,10 @@ def fn_mean_std(fn: Callable, k: int): x_pt_cuda = to_torch(x.copy()).cuda() methods = { - # "Klampt invJac cpu": lambda: robot.inverse_kinematics_step_jacobian_pinv(goalposes_cpu, x_pt_cpu), - "Klampt invJac cuda": lambda: robot.inverse_kinematics_step_jacobian_pinv(goalposes_cuda, x_pt_cuda), - "AutoDiff cuda": lambda: robot.inverse_kinematics_autodiff_single_step_batch_pt(goalposes_cuda, x_pt_cuda), + "Jac_pinv cpu": lambda: robot.inverse_kinematics_step_jacobian_pinv(goalposes_cpu, x_pt_cpu), + "LMA cpu": lambda: robot.inverse_kinematics_step_jacobian_pinv(goalposes_cpu, x_pt_cpu), + "LMA cuda": lambda: robot.inverse_kinematics_step_jacobian_pinv(goalposes_cuda, x_pt_cuda), + "Jac_pinv cuda": lambda: robot.inverse_kinematics_step_jacobian_pinv(goalposes_cuda, x_pt_cuda), } for name, method in methods.items(): mean_runtime_ms, std_runtime = fn_mean_std(method, k)