
Source code for minedojo.sim.sim

import uuid
from copy import deepcopy
from typing import Union, Optional, List, Dict, Tuple, Literal, Any

import cv2
import gym
import numpy as np
from lxml import etree

from .mc_meta import mc
from . import handlers
from .bridge import BridgeEnv
from .cmd_executor import CMDExecutor
from .config_sim_spec import SimSpec
from .inventory import InventoryItem, parse_inventory_item

[docs]class MineDojoSim(gym.Env): """An environment wrapper for MineDojo simulation. Args: allow_mob_spawn: If ``True``, allow mobs (animals and hostiles) to spawn. Default: ``True``. allow_time_passage: Time flows if ``True``. Default: ``True``. break_speed_multiplier: Controls the speed of breaking blocks. A value larger than 1.0 accelerates the breaking. Default: ``1.0``. drawing_str: Draws shapes (e.g. spheres, cuboids) in the minecraft world. Default: ``None``. event_level_control: If ``True``, the agent is able to perform high-level controls including place and equip. If ``False``, then is keyboard-mouse level control. Default: ``True``. flat_world_seed_string: A string that specifies how we want the world layers to be created if ``generate_world_type`` is "flat". One can use the `tool <>`_ to generate. Default: ``None``. generate_world_type: A string that specifies the type of the minecraft world. One of ``"default"``, ``"flat"``, ``"from_file"``, ``"specified_biome"``. Default: ``"default"``. image_size: The size of image observations. initial_inventory: If not ``None``, specifies initial items in the agent's inventory. Use class ``InventoryItem`` to specify items. Default: ``None``. initial_weather: If not ``None``, specifies the initial weather. Can be one of ``"clear"``, ``"normal"``, ``"rain"``, ``"thunder"``. Default: ``None``. lidar_rays: Defines the directions and maximum distances of the lidar rays if ``use_lidar`` is ``True``. If supplied, should be a list of tuple(pitch, yaw, distance). Pitch and yaw are in radians and relative to agent looking vector. Default: ``None``. raise_error_on_invalid_cmds: If ``True``, the cmd executor will raise error when a command is invalid. If ``False``, the executor will just skip instead. Default: ``False``. regenerate_world_after_reset: If ``True``, the minecraft world will be re-generated when resetting. Default: ``False``. seed: The seed for an instance's internal random number generator. Default: ``None``. sim_name: Name of a simulation instance. Default: ``"MineDojoSim"``. spawn_in_village: If ``True``, the agent will spawn in a village. Default: ``False``. specified_biome: If not ``None``, specifies the biome of the minecraft world by a string or an integer. Default: ``None``. start_food: If not ``None``, specifies initial food of the agent. Default: ``None``. start_health: If not ``None``, specifies initial health of the agent. Default: ``None``. start_position: If not ``None``, specifies the agent's initial location and orientation. If provided, should be a dict with keys ``x``, ``y``, ``z``, ``yaw``, ``pitch``. ``yaw`` and ``pitch`` are in degrees. Default: ``None``. start_time: If not ``None``, specifies the time when the agent spawns. If supplied, should be an int between 0 and 24000. See `here <>`_ for more information. Default: ``None``. use_depth: If ``True``, includes depth map in observations. Default: ``False``. use_lidar: If ``True``, includes lidar in observations. Default: ``False``. use_voxel: If ``True``, includes voxel in observations. Default: ``False``. voxel_size: Defines the voxel's range in each axis if ``use_voxel`` is ``True``. If supplied, should be a dict with keys ``xmin``, ``xmax``, ``ymin``, ``ymax``, ``zmin``, ``zmax``. Each value specifies the voxel size relative to the agent. Default: ``None``. world_file_path: The path to the world file if ``generate_world_type`` is ``"from_file"``. Default: ``None``. world_seed: Seed for deterministic world generation if ``generate_world_type`` is ``"default"`` or ``"specified_biome"``. See `here <>`_ for more details. Default: ``None``. """ def __init__( self, *, # ------ initial conditions ------ initial_inventory: Optional[List[InventoryItem]] = None, start_position: Optional[Dict[str, Union[float, int]]] = None, start_health: Optional[float] = None, start_food: Optional[int] = None, start_time: Optional[int] = None, initial_weather: Optional[Literal["normal", "clear", "rain", "thunder"]] = None, spawn_in_village: bool = False, drawing_str: Optional[str] = None, # ------ global conditions ------ break_speed_multiplier: float = 1.0, allow_time_passage: bool = True, allow_mob_spawn: bool = True, # ------ world generation ------ generate_world_type: Literal[ "default", "from_file", "flat", "specified_biome" ] = "default", regenerate_world_after_reset: bool = False, world_seed: Optional[Union[str, int]] = None, world_file_path: Optional[str] = None, flat_world_seed_string: Optional[str] = None, specified_biome: Optional[Union[int, str]] = None, # ------ observation ------ image_size: Union[int, Tuple[int, int]], use_voxel: bool = False, voxel_size: Optional[Dict[str, int]] = None, use_lidar: bool = False, lidar_rays: Optional[List[Tuple[float, float, float]]] = None, use_depth: bool = False, # ------ control ------ event_level_control: bool = True, # ------ randomness ------ seed: Optional[int] = None, # ------ misc ------ sim_name: str = "MineDojoSim", raise_error_on_invalid_cmds: bool = False, ): self._sim_name = sim_name self._rng = np.random.default_rng(seed) if isinstance(image_size, int): image_size = (image_size, image_size) if use_voxel: voxel_size = voxel_size or dict( xmin=-1, ymin=-1, zmin=-1, xmax=1, ymax=1, zmax=1 ) if use_lidar: lidar_rays = lidar_rays or [(0.0, 0.0, 10.0)] assert not use_depth, "TODO: fix depth obs bug" start_health = start_health or 20.0 start_food = start_food or 20 assert generate_world_type in { "default", "from_file", "flat", "specified_biome", }, f"invalid generate world type {generate_world_type}" if generate_world_type == "default": world_seed = world_seed or "" elif generate_world_type == "from_file": assert ( world_file_path is not None ), f'A world file path must be provided when `generate_world_type = "from_file"`' elif generate_world_type == "flat": flat_world_seed_string = flat_world_seed_string or "" elif generate_world_type == "specified_biome": assert ( specified_biome is not None ), f"must provide a biome when generate_world_type = 'specified_biome'!" world_seed = world_seed or "" else: raise ValueError(f"Unknown world generation type {generate_world_type}") if specified_biome is not None: if isinstance(specified_biome, str): assert ( specified_biome in mc.BIOMES_MAP ), f"Unknown biome name {specified_biome}" specified_biome = mc.BIOMES_MAP[specified_biome] elif isinstance(specified_biome, int): assert ( specified_biome in mc.BIOMES_MAP.values() ), f"Invalid biome id {specified_biome}" else: raise ValueError(f"invalid biome type {specified_biome}") # configure obs handlers obs_handlers = [ handlers.POVObservation(image_size, False), handlers.TrueFlatInventoryObservation(), handlers.EquipmentObservation(), handlers.ObservationFromLifeStats(), handlers.ObservationFromCurrentLocation(), handlers.ObserveFromFullStats(), handlers.NearbyToolsObservation(), handlers.ObservationFromDamageSource(), ] if use_voxel: voxel_size = ( (voxel_size["xmin"], voxel_size["xmax"]), (voxel_size["ymin"], voxel_size["ymax"]), (voxel_size["zmin"], voxel_size["zmax"]), ) obs_handlers.append(handlers.VoxelObservation(voxel_size)) if use_lidar: obs_handlers.append(handlers.RichLidarObservation(lidar_rays)) # configure action handlers common_actions = [ "forward", "back", "left", "right", "jump", "sneak", "sprint", "use", "attack", "drop", ] action_handlers = [ handlers.CameraAction(), handlers.SmeltAction( ["none"] + mc.ALL_SMELTING_ITEMS, _other="none", _default="none" ), handlers.CraftAction( ["none"] + mc.ALL_PERSONAL_CRAFTING_ITEMS, _other="none", _default="none", ), handlers.CraftWithTableAction( ["none"] + mc.ALL_CRAFTING_TABLE_ITEMS, _other="none", _default="none" ), ] action_handlers.extend( [ handlers.KeybasedCommandAction(k, mc.INVERSE_KEYMAP[k]) for k in common_actions ] ) if event_level_control: action_handlers.extend( [ handlers.EquipAction( ["none"] + mc.ALL_ITEMS, _other="none", _default="none" ), handlers.PlaceBlock( ["none"] + mc.ALL_ITEMS, _other="none", _default="none" ), ] ) else: action_handlers.append(handlers.SwapSlotAction()) action_handlers.append( handlers.KeybasedCommandAction( "pickItem", mc.INVERSE_KEYMAP["pickItem"] ) ) action_handlers.extend( [ handlers.KeybasedCommandAction( f"hotbar.{i}", mc.INVERSE_KEYMAP[str(i)] ) for i in range(1, 10) ] ) # configure agent handlers agent_handlers = [] # configure agent start handlers, e.g., initial inventory self.start_health, self.start_food = start_health, start_food agent_start_handlers = [ handlers.LowLevelInputsAgentStart(), handlers.AgentStartBreakSpeedMultiplier(break_speed_multiplier), handlers.StartingHealthAgentStart(health=start_health), handlers.StartingFoodAgentStart(food=start_food), ] self.initial_inventory = initial_inventory if initial_inventory is not None: initial_inventory = [ parse_inventory_item(item) for item in initial_inventory ] agent_start_handlers.append( handlers.InventoryAgentStart( { inventory_item[0]: inventory_item[1] for inventory_item in initial_inventory } ) ) self.start_position = start_position if start_position is not None: agent_start_handlers.append( handlers.AgentStartPlacement( x=start_position["x"], y=start_position["y"], z=start_position["z"], yaw=start_position["yaw"], pitch=start_position["pitch"], ) ) # configure server initial conditions handlers self.start_time = start_time server_initial_conditions_handlers = [ handlers.TimeInitialCondition( allow_passage_of_time=allow_time_passage, start_time=start_time ), handlers.SpawningInitialCondition(allow_mob_spawn), ] self.initial_weather = initial_weather if initial_weather is not None: server_initial_conditions_handlers.append( handlers.WeatherInitialCondition(initial_weather) ) # configure world generator handlers world_generator_handlers = [] if generate_world_type == "default": world_generator_handlers.append( handlers.DefaultWorldGenerator(regenerate_world_after_reset, world_seed) ) elif generate_world_type == "from_file": world_generator_handlers.append( handlers.FileWorldGenerator( world_file_path, destroy_after_use=False, ) ) elif generate_world_type == "flat": world_generator_handlers.append( handlers.FlatWorldGenerator( force_reset=True, generatorString=flat_world_seed_string ) ) elif generate_world_type == "specified_biome": world_generator_handlers.append( handlers.BiomeGenerator( specified_biome, force_reset=True, world_seed=world_seed ) ) else: raise ValueError() if drawing_str is not None: world_generator_handlers.append(handlers.DrawingDecorator(drawing_str)) # configure server decorator handlers server_decorator_handlers = [] if spawn_in_village: server_decorator_handlers.append(handlers.VillageSpawnDecorator()) # configure server quit handlers server_quit_handlers = [] self._sim_spec = SimSpec( sim_name=sim_name, agent_count=1, obs_handlers=obs_handlers, action_handlers=action_handlers, agent_handlers=agent_handlers, agent_start_handlers=agent_start_handlers, server_initial_conditions_handlers=server_initial_conditions_handlers, world_generator_handlers=world_generator_handlers, server_decorator_handlers=server_decorator_handlers, server_quit_handlers=server_quit_handlers, seed=self.new_seed, ) self._bridge_env = BridgeEnv(is_fault_tolerant=True, seed=self.new_seed) self._prev_obs = None self._prev_action = None self._prev_info = None self._cmd_executor = CMDExecutor(self, raise_error_on_invalid_cmds) @property def observation_space(self): return self._sim_spec.observation_space @property def action_space(self): return self._sim_spec.action_space @property def new_seed(self): return self._rng.integers(low=0, high=2**31 - 1).item()
[docs] def seed(self, seed: int = None): """Sets the seed for this env’s random number generator. Args: seed: The seed for the number generator """ self._rng = np.random.default_rng(seed)
[docs] def reset(self): """Resets the environment to an initial state and returns an initial observation. Return: Agent’s initial observation. """ episode_id = str(uuid.uuid4()) xml = etree.fromstring(self._sim_spec.to_xml(episode_id)) raw_obs = self._bridge_env.reset(episode_id, [xml])[0] obs, info = self._process_raw_obs(raw_obs) self._prev_obs, self._prev_info = deepcopy(obs), deepcopy(info) return obs
[docs] def step(self, action: dict): """Run one timestep of the environment’s dynamics. Accepts an action and returns next_obs, reward, done, info. Args: action: The action of the agent in current step. Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s next observation. - ``float`` - Amount of reward returned after executing previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ self._prev_action = deepcopy(action) action_xml = self._action_obj_to_xml(action) step_tuple = self._bridge_env.step([action_xml]) step_success, raw_obs = step_tuple.step_success, step_tuple.raw_obs if not step_success: # when step failed, return prev obs return self._prev_obs, 0, True, self._prev_info else: obs, info = self._process_raw_obs(raw_obs[0]) self._prev_obs, self._prev_info = deepcopy(obs), deepcopy(info) return obs, 0, self.is_terminated, info
[docs] def execute_cmd(self, cmd: str, action: Optional[dict] = None): """Execute a given string command. Args: cmd: The string command accepted by the Minecraft client. action: An action that will be simultaneously executed with the command. Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.execute_cmd(cmd, action)
[docs] def spawn_mobs( self, mobs: Union[str, List[str]], rel_positions: Union[np.ndarray, list], action: Optional[dict] = None, ): """Spawn mobs in the world. Args: mobs: The names of the mobs to spawn rel_positions: The mobs' positions relative to the agent action: An action that will be simultaneously executed with the spawning Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.spawn_mobs(mobs, rel_positions, action)
[docs] def set_block( self, blocks: Union[str, List[str]], rel_positions: Union[np.ndarray, list], action: Optional[dict] = None, ): """Set blocks in the world. Args: blocks: The names of the blocks to set rel_positions: The blocks' positions relative to the agent action: An action that will be simultaneously executed with the setting Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.set_block(blocks, rel_positions, action)
[docs] def clear_inventory(self, action: Optional[dict] = None): """Remove all items in the agent's inventory. Args: action: An action that will be simultaneously executed Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.clear_inventory(action)
[docs] def set_inventory( self, inventory_list: List[InventoryItem], action: Optional[dict] = None ): """Set items to the agent's inventory. Args: inventory_list: List of ``InventoryItem`` to change the inventory status action: An action that will be simultaneously executed Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.set_inventory(inventory_list, action)
[docs] def teleport_agent(self, x, y, z, yaw, pitch, action: Optional[dict] = None): """Teleport the agent to a given position. Args: x: x coordinate of the destination y: y coordinate of the destination z: z coordinate of the destination yaw: yaw of the targeted orientation pitch: pitch of the targeted orientation action: An action that will be simultaneously executed with the teleporting Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.teleport_agent(x, y, z, yaw, pitch, action)
[docs] def kill_agent(self, action: Optional[dict] = None): """Kill the agent. Args: action: An action that will be simultaneously executed Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.kill_agent(action)
[docs] def set_time(self, time: int, action: Optional[dict] = None): """Set the world with the given time. Args: time: The target time action: An action that will be simultaneously executed Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.set_time(time, action)
[docs] def set_weather(self, weather: str, action: Optional[dict] = None): """Set the world with the given weather. Args: weather: The target weather action: An action that will be simultaneously executed Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.set_weather(weather, action)
[docs] def random_teleport(self, max_range: int, action: Optional[dict] = None): """Teleport the agent randomly. Args: max_range: The maximum distance on each horizontal axis from the center of the area to spread targets (thus, the area is square, not circular) action: An action that will be simultaneously executed Return: A tuple (obs, reward, done, info) - ``dict`` - Agent’s observation of the current environment. - ``float`` - Amount of reward returned after previous action. - ``bool`` - Whether the episode has ended. - ``dict`` - Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). """ return self._cmd_executor.random_teleport(max_range, action)
[docs] def close(self): """Environments will automatically close() themselves when garbage collected or when the program exits.""" self._bridge_env.close()
[docs] def render(self, mode: str = "human"): """Renders the environment. Args: mode: The mode to render with. """ img = self._prev_obs["rgb"] img = img.transpose((1, 2, 0)) img = img[:, :, ::-1] cv2.imshow(f"{self._sim_name}", img) cv2.waitKey(1)
@property def prev_obs(self): return self._prev_obs @property def prev_info(self): return self._prev_info @property def prev_action(self): return self._prev_action @property def is_terminated(self): return self._bridge_env.is_terminated def _process_raw_obs(self, raw_obs: dict): info = deepcopy(raw_obs) if "pov" in info: info.pop("pov") obs_dict = { h.to_string(): h.from_hero(raw_obs) for h in self._sim_spec.observables } return obs_dict, info def _action_obj_to_xml(self, action): parsed_action = [f'chat {action["chat"]}'] if "chat" in action else [] parsed_action.extend( [h.to_hero(action[h.to_string()]) for h in self._sim_spec.actionables] ) return "\n".join(parsed_action)