Agent Control#

This section explains how to create a program to control agents.

About Agent Control#

RRS の災害救助エージェントは 3 種類あり、種類毎にそれぞれ異なるプログラムを書く必要があります。しかし、初めから全てのプログラムを書くことは困難です。ここではまず初めに、消防隊エージェントを操作するプログラムの一部を書いてみましょう。

Note

The program that controls the agents is the same for each type of agent. The program is distributed to each agent and is responsible for controlling only that agent. If you write a program to control the fire brigade agent, it will run on all fire brigade agents.

Fire Brigade Agent

Agent Operation Flow#

エージェントの動作を決めているのは Tactics というプログラムです。

消防隊の思考ルーチンは下の図の通りにおおよそおこなわれます。 消防隊の動作としては、まず救助対象の市民を捜索し(Human Detector)、見つかった市民を救助します(Action Rescue)。 救助対象の市民が見つからない場合は、探索場所を変更して市民を捜索するため、次の捜索場所を決定して(Search)移動します(Action Ext move)。 なお、エージェントは 1 ステップ内で、移動と救助活動を同時におこなうことが出来ません。つまり、ステップごとに更新される自身の周辺情報を確認して、動作の対象と動作内容を決定していくということになります。 これらそれぞれの機能が、モジュールと呼ばれるプログラムとして分割して表現されています。

This time, we will develop the Human Detector module that determines the rescue (digging) target.

Fire Brigade Agent Operation Flow

Preparing to Implement the Human Detector Module#

First, create a file to describe the Human Detector module.

cd WORKING_DIR/<your_team_name>
touch src/<your_team_name>/module/complex/fire_brigade_human_detector.py

Next, implement the template for the Human Detector module. Please write the following code in fire_brigade_human_detector.py.

from typing import Optional

from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.component.module.complex.human_detector import HumanDetector
from adf_core_python.core.logger.logger import get_agent_logger
from rcrscore.entities import EntityID


class FireBrigadeHumanDetector(HumanDetector):
    def __init__(
        self,
        agent_info: AgentInfo,
        world_info: WorldInfo,
        scenario_info: ScenarioInfo,
        module_manager: ModuleManager,
        develop_data: DevelopData,
    ) -> None:
        super().__init__(
            agent_info, world_info, scenario_info, module_manager, develop_data
        )
        # 計算結果を格納する変数
        self._result: Optional[EntityID] = None
        # ロガーの取得
        self._logger = get_agent_logger(
            f"{self.__class__.__module__}.{self.__class__.__qualname__}",
            self._agent_info,
        )

    def calculate(self) -> HumanDetector:
        """
        行動対象を決定する

        Returns
        -------
            HumanDetector: 自身のインスタンス
        """
        self._logger.info("Calculate FireBrigadeHumanDetector")
        return self

    def get_target_entity_id(self) -> Optional[EntityID]:
        """
        行動対象のEntityIDを取得する

        Returns
        -------
            Optional[EntityID]: 行動対象のEntityID
        """
        return self._result

Registering the Module#

Next, register the created module.

Open the WORKING_DIR/<your_team_name>/config/module.yaml file and

DefaultTacticsFireBrigade:
  HumanDetector: src.<your_team_name>.module.complex.sample_human_detector.SampleHumanDetector

change the following part as shown below.

DefaultTacticsFireBrigade:
  HumanDetector: src.<your_team_name>.module.complex.fire_brigade_human_detector.FireBrigadeHumanDetector

ターミナルを 2 つ起動します。

Open one terminal and start the simulation server with the following command:

# Terminal A
cd WORKING_DIR/rcrs-server/scripts
./start-comprun.sh -m ../maps/tutorial_fire_brigade_only/map -c ../maps/tutorial_fire_brigade_only/config

Then, open another terminal and start the agent:

# Terminal B
cd WORKING_DIR/<your_team_name>
python main.py

If Calculate FireBrigadeHumanDetector is displayed in the standard output, it is successful.

Designing the Human Detector Module#

Write the content of the rescue target selection program and modify it so that the fire brigade agent can perform rescue activities.

The simplest way for a fire brigade agent to select a rescue target is to choose the closest buried citizen. This time, let’s write a module to determine the action target of the fire brigade agent using this method.

The method of selecting the closest buried citizen as the rescue target is shown in the flowchart below.

Rescue Target Selection Flowchart

By replacing each process in this flowchart with the classes and methods introduced in the next subsection, and writing them in fire_brigade_human_detector.py, you will complete the rescue target selection program.

Classes and Methods Used in the Implementation of the Human Detector Module#

Entity#

The Entity class is the base class for entities. This class holds the basic information of an entity.

RRS 上のエンティティは下図のように Entity を継承したクラスで表現されています。 赤枠で囲まれたクラスは、クラスの意味がそのまま RRS の直接的な構成要素を表しています。

For example, among the instances of the Road class, there are those that represent ordinary roads that do not inherit from the Hydrant class.

Entity Inheritance Relationship

EntityID#

EntityID クラスは、全てのエージェント/オブジェクトを一意に識別するための ID(識別子)を表すクラスです。 RRS ではエージェントとオブジェクトをまとめて、エンティティと呼んでいます。

Civilian#

The Civilian class represents a citizen. From this class, you can obtain the position of the agent and the progress of injuries.

  • Determine if entity is a civilian

is_civilian: bool = isinstance(entity, Civilian)
  • エンティティ ID を取得する

entity_id: EntityID = entity.get_id()
  • Determine if the civilian is alive

hp: Optional[int] = entity.get_hp()
if hp is None or hp <= 0:
    return False
  • Determine if the civilian is buried

buriedness: Optional[int] = entity.get_buriedness()
if buriedness is None or buriedness <= 0:
    return False

WorldInfo#

The WorldInfo class is a class that has methods for the information that the agent knows and the operations related to it. Through an instance of this class, the agent checks the status of other agents and objects.

In the module, an instance of the WorldInfo class is held as self._world_info.

  • エンティティ ID からエンティティを取得する

entity: Entity = self._world_info.get_entity(entity_id)
  • Get all entities of the specified class

entities: list[Entity] = self._world_info.get_entities_by_type([Building, Road])
  • Get the distance from the agent’s position to the specified entity

distance: float = self._world_info.get_distance(me, civilian.get_id())

Details here

AgentInfo#

The AgentInfo class is a class that has methods for the agent’s own information and operations related to it. Through an instance of this class, the agent obtains its own status.

In the module, an instance of the AgentInfo class is held as self._agent_info.

  • 自分自身のエンティティ ID を取得する

my_entity_id: EntityID = self._agent_info.get_entity_id()

Details here

Implementation of the Human Detector Module#

Implement the Human Detector module. Please write the following code in fire_brigade_human_detector.py.

from typing import Optional

from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.component.module.complex.human_detector import HumanDetector
from adf_core_python.core.logger.logger import get_agent_logger
from rcrscore.entities import Civilian, Entity, EntityID


class FireBrigadeHumanDetector(HumanDetector):
    def __init__(
        self,
        agent_info: AgentInfo,
        world_info: WorldInfo,
        scenario_info: ScenarioInfo,
        module_manager: ModuleManager,
        develop_data: DevelopData,
    ) -> None:
        super().__init__(
            agent_info, world_info, scenario_info, module_manager, develop_data
        )
        # 計算結果を格納する変数
        self._result: Optional[EntityID] = None
        # ロガーの取得
        self._logger = get_agent_logger(
            f"{self.__class__.__module__}.{self.__class__.__qualname__}",
            self._agent_info,
        )

    def calculate(self) -> HumanDetector:
        """
        行動対象を決定する

        Returns
        -------
            HumanDetector: 自身のインスタンス
        """
        # 自分自身のEntityIDを取得
        me: EntityID = self._agent_info.get_entity_id()
        # すべてのCivilianを取得
        civilians: list[Entity] = self._world_info.get_entities_of_types(
            [
                Civilian,
            ]
        )

        # 最も近いCivilianを探す
        nearest_civilian: Optional[EntityID] = None
        nearest_distance: Optional[float] = None
        for civilian in civilians:
            # civilianがCivilianクラスのインスタンスでない場合はスキップ
            if not isinstance(civilian, Civilian):
                continue

            # civilianのHPが0以下の場合はすでに死んでしまっているのでスキップ
            if civilian.get_hp() <= 0:
                continue

            # civilianの埋没度が0以下の場合は掘り起こす必要がないのでスキップ
            if civilian.get_buriedness() <= 0:
                continue

            # 自分自身との距離を計算
            distance: float = self._world_info.get_distance(me, civilian.get_id())

            # 最も近いCivilianを更新
            if nearest_distance is None or distance < nearest_distance:
                nearest_civilian = civilian.get_id()
                nearest_distance = distance

        # 計算結果を格納
        self._result = nearest_civilian

        # ロガーに出力
        self._logger.info(f"Target: {self._result}")

        return self

    def get_target_entity_id(self) -> Optional[EntityID]:
        """
        行動対象のEntityIDを取得する

        Returns
        -------
            Optional[EntityID]: 行動対象のEntityID
        """
        return self._result

ターミナルを 2 つ起動します。

Open one terminal and start the simulation server with the following command:

# Terminal A
cd WORKING_DIR/rcrs-server/scripts
./start-comprun.sh -m ../maps/tutorial_fire_brigade_only/map -c ../maps/tutorial_fire_brigade_only/config

Then, open another terminal and start the agent:

# Terminal B
cd WORKING_DIR/<your_team_name>
python main.py

It is successful if the fire brigade agent can rescue the buried citizens.