サーチモジュール#

サーチモジュールの概要#

今回開発するモジュールは、KMeansPPClustering モジュールを用いた情報探索対象決定 (Search) モジュールです。 クラスタリングモジュールによってエージェント間で担当地域の分割をおこない、 担当地域内からランダムに探索対象として選択します。

サーチモジュールの実装の準備#

注釈

以降の作業では、カレントディレクトリがプロジェクトのルートディレクトリであることを前提としています。

まず、サーチモジュールを記述するためのファイルを作成します。

touch src/<your_team_name>/module/complex/k_means_pp_search.py

次に、サーチモジュールの実装を行います。 以下のコードを k_means_pp_search.py に記述してください。 これが今回実装するサーチモジュールの雛形になります。

import random
from typing import Optional, cast

from rcrs_core.entities.building import Building
from rcrs_core.entities.entity import Entity
from rcrs_core.entities.refuge import Refuge
from rcrs_core.worldmodel.entityID import EntityID

from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.component.module.algorithm.clustering import Clustering
from adf_core_python.core.component.module.complex.search import Search
from adf_core_python.core.logger.logger import get_agent_logger


class KMeansPPSearch(Search):
    def __init__(
        self,
        agent_info: AgentInfo,
        world_info: WorldInfo,
        scenario_info: ScenarioInfo,
        module_manager: ModuleManager,
        develop_data: DevelopData,
    ) -> None:
        super().__init__(
            agent_info, world_info, scenario_info, module_manager, develop_data
        )
        self._result: Optional[EntityID] = None
        # ロガーの取得
        self._logger = get_agent_logger(
            f"{self.__class__.__module__}.{self.__class__.__qualname__}",
            self._agent_info,
        )

    def calculate(self) -> Search:
        return self

    def get_target_entity_id(self) -> Optional[EntityID]:
        return self._result

モジュールの登録#

次に、作成したモジュールを登録します。 以下のようにconfig/module.yamlの該当箇所を変更してください

DefaultTacticsAmbulanceTeam:
  Search: src.<your_team_name>.module.complex.k_means_pp_search.KMeansPPSearch

DefaultTacticsFireBrigade:
  Search: src.<your_team_name>.module.complex.k_means_pp_search.KMeansPPSearch

DefaultTacticsPoliceForce:
  Search: src.<your_team_name>.module.complex.k_means_pp_search.KMeansPPSearch

モジュールの実装#

まず、KMeansPPClustering モジュールを呼び出せるようにします。

以下のコードをconfig/module.yamlに追記してください。

KMeansPPSearch:
  Clustering: src.<your_team_name>.module.algorithm.k_means_pp_clustering.KMeansPPClustering

次に、KMeansPPSearch モジュールで KMeansPPClustering モジュールを呼び出せるようにします。

以下のコードを k_means_pp_search.py に追記してください。

class KMeansPPSearch(Search):
    def __init__(
        self,
        agent_info: AgentInfo,
        world_info: WorldInfo,
        scenario_info: ScenarioInfo,
        module_manager: ModuleManager,
        develop_data: DevelopData,
    ) -> None:
        super().__init__(
            agent_info, world_info, scenario_info, module_manager, develop_data
        )
        self._result: Optional[EntityID] = None
        
        # ロガーの取得
        self._logger = get_agent_logger(
            f"{self.__class__.__module__}.{self.__class__.__qualname__}",
            self._agent_info,
        )

        # クラスタリングモジュールの読み込み
        self._clustering: Clustering = cast(
            Clustering,
            module_manager.get_module(
                # config.yamlに登録したkey
                "KMeansPPSearch.Clustering",
                # 上記のkeyが登録されていなかった場合のデフォルトモジュール
                "adf_core_python.implement.module.algorithm.k_means_clustering.KMeansClustering",
            ),
        )

        # クラスタリングモジュールの登録
        self.register_sub_module(self._clustering)

そして、calculate メソッドでクラスタリングモジュールを呼び出し、探索対象を決定するように変更します。

以下のコードを k_means_pp_search.py に追記してください。

    def calculate(self) -> Search:
        # 自エージェントのエンティティIDを取得
        me: EntityID = self._agent_info.get_entity_id()
        # 自エージェントが所属するクラスターのインデックスを取得
        allocated_cluster_index: int = self._clustering.get_cluster_index(me)
        # クラスター内のエンティティIDを取得
        cluster_entity_ids: list[EntityID] = self._clustering.get_cluster_entity_ids(
            allocated_cluster_index
        )
        # 乱数で選択
        if cluster_entity_ids:
            self._result = random.choice(cluster_entity_ids)
        
        # ログ出力
        self._logger.info(f"Target entity ID: {self._result}")
        
        return self

以上で、KMeansPPClustering モジュールを用いた KMeansPPSearch モジュールの実装が完了しました。

ターミナルを2つ起動します。

片方のターミナルを開き、シミュレーションサーバーを以下のコマンドで起動します:

# Terminal A
cd WORKING_DIR/rcrs-server/scripts
./start-comprun.sh -m ../maps/tutorial_ambulance_team_only/map -c ../maps/tutorial_ambulance_team_only/config

その後、別のターミナルを開き、エージェントを起動します:

# Terminal B
cd WORKING_DIR/<your_team_name>
python main.py

モジュールの改善#

KMeansPPSearch モジュールは、クラスタリングモジュールを用いて担当地域内からランダムに探索対象を選択しています。 そのため、以下のような問題があります。

  • 探索対象がステップごとに変わってしまう

    • 目標にたどり着く前に探索対象が変わってしまうため、なかなか目標にたどり着けない

    • 色んなところにランダムに探索対象を選択することで、効率的な探索ができない

  • すでに探索したエンティティを再度探索対象として選択してしまうため、効率的な探索ができない

  • 近くに未探索のエンティティがあるのに、遠くのエンティティを探索対象として選択してしまう

などの問題があります。

課題#

KMeansPPSearch モジュールを改善し、より効率的な探索を行うモジュールを実装して見てください。

警告

ここに上げた問題以外にも、改善すべき点が存在すると思うので、それを改善していただいても構いません。

警告

プログラム例のプログラムにも一部改善点があるので、余裕があったら修正してみてください。

探索対象がステップごとに変わってしまう問題#

すでに探索したエンティティを再度探索対象として選択してしまう問題#

近くに未探索のエンティティがあるのに、遠くのエンティティを探索対象として選択してしまう#