DDDにCQRSをどう組み込むか~バックエンドアーキテクチャ設計時の考え方

はじめに

こんにちは。テックドクターでバックエンドエンジニアをしている筧と申します。

新規プロダクトのバックエンドで、DDD (Domain-Driven Design) と CQRS (Command Query Responsibility Segregation) を組み合わせたアーキテクチャを採用しました。

DDDの本や記事は、Eric Evans著『Domain-Driven Design』や『実践ドメイン駆動設計』など様々あります。CQRSについてもMartin Fowler氏のCQRSの解説記事などがあります。しかし、DDDにCQRSをどう組み込んでいったかという話はあまり見かけません。

この点について以前より情報収集や試行錯誤を重ねていましたが、今回のプロダクトでようやく納得のいく形で実装ができました。この記事ではそのポイントをご紹介します。特にCQRSを具体的に実装していくApplication層を中心に、他の層とのデータのやりとりや責務分担について詳しく説明したいと思います。

この記事の想定読者とゴール

この記事は、以下のような方を想定しています:

  • DDDの基本(Entity、Value Object、Repository等の概念)は理解している
  • 実際のコードでCQRSをどう導入すればよいかわからない

記事を読み終わったときに、Application層のCommand/QueryHandlerの実装や、他の層とのデータのやりとりを具体的にイメージできるようになることがゴールです。

CQRSとは何か

まず、この記事でのCQRSの定義を明確にしておきます。

CQRSは、データを変更する操作(Command)と、データを読み取る操作(Query)を分離するパターンです。通常のCRUDではデータモデルが読み書き共通ですが、CQRSでは以下のように分けます:

  • Command側(書き込み): ビジネスルールの検証を重視。Entityを経由してデータを更新
  • Query側(読み込み): パフォーマンスと利便性を重視。最適化されたクエリで直接DTOにマッピング

この分離により、それぞれの操作に最適な実装を選択できるようになります。

全体像:レイヤー構成の概略と責務の割り当て

今回のプロダクトは一種のダッシュボードシステムで、データの集計・可視化を行うほか、組織やユーザー情報等の登録も行います。
レイヤー構成は以下の5層としました。

層名 説明
Presentation層 ← API エンドポイント(薄い層)
UseCase層 ← ビジネスフロー + 認可制御
Application層 ← Command/Query Handler(純粋なCRUD)
Domain層 ← Entity、Value Object、Repository Interface
Infrastructure層 ← Repository実装、Query Service

各層の責務を簡単に整理します。

Presentation層はHTTPリクエスト/レスポンス変換のみを担当します。ビジネスロジックは持たせません。

UseCase層は認可チェックや、複数のCommand/Queryを組み合わせたビジネスフローを扱います。

Application層は純粋なCRUD操作のみで、認可処理は持ちません。

Domain層はビジネスルールの中核を担い、他の層に依存しません。

Infrastructure層はデータベースや外部サービスとの実際のやりとりを担当します。

Core層もありますが、DIコンテナや共通設定を担う補助的な層なので本稿では詳しく扱いません。

特に重要なのはUseCase層とApplication層の境界です。Application層のCommand/QueryHandlerは認可処理を持たず、純粋にドメインロジックに集中します。一方、認可や監査ログといった横断的な関心事はUseCase層で扱います。

この設計にした理由は、別プロダクトでUseCase層とその下のService層(今回のApplication層に相当)を分けて成功した経験があったからです。UseCase層にCQRSのCommand/Queryを直接配置すると、認可処理とデータ操作のロジックが混在してしまうため、Application層として分離しました。

Application層におけるCQRS実装

Application層では、Command(書き込み)とQuery(読み込み)を明確に分離しています。

Commandの実装例:組織を作成する

例として、組織を作成するCommandHandlerを見てみましょう。以下は実際のプロダクトのコードを簡略化したサンプルです。

class CreateOrganizationCommand(BaseModel):
    """組織作成コマンド"""
    organization_id: OrganizationId
    name: OrganizationName
    # ... その他のフィールド

class CreateOrganizationCommandResult(BaseModel):
    """組織作成結果"""
    organization: Organization
    created: bool

class CreateOrganizationCommandHandler(
    ICommandHandler[CreateOrganizationCommand, CreateOrganizationCommandResult]
):
    """組織作成コマンドハンドラー"""

    def __init__(self, organization_repository: IOrganizationRepository) -> None:
        self._organization_repository = organization_repository

    async def handle(
        self, command: CreateOrganizationCommand
    ) -> CreateOrganizationCommandResult:
        # 1. 既存チェック
        if await self._organization_repository.exists(command.organization_id):
            raise EntityAlreadyExistsException(...)

        # 2. ドメインモデルでビジネスルール検証
        organization = Organization.create(
            organization_id=command.organization_id,
            name=command.name,
            # ...
        )

        # 3. 永続化
        await self._organization_repository.save(organization)

        return CreateOrganizationCommandResult(
            organization=organization,
            created=True,
        )

このCommandHandlerで最も重要なのは、認可処理を一切持たない点です。「この組織を作成できる権限があるか?」といったチェックはUseCase層の仕事で、Application層は純粋に「組織の作成」というビジネスロジックに集中しています。

具体的な処理の流れは、入力としてCommand(必要なデータのみ)を受け取り、出力としてCommandResult(処理結果)を返します。内部では、ドメインモデルのOrganization.create()を使ってビジネスルール検証を行い(例えば、OrganizationNameというValue Objectで組織名の長さや形式をチェックしています)、最後にRepositoryで永続化します。

こうすることでHandlerがシンプルになり、テストも書きやすくなります。認可を気にする必要がなく、Infrastructure層のDBセッションにも依存しないため、モックのRepositoryを渡すだけで単体テストができました。

Queryの実装例:組織を取得する

次に、組織を取得するQueryHandlerです。

class GetOrganizationQuery(BaseModel):
    """組織取得クエリ"""
    organization_id: OrganizationId

class GetOrganizationQueryResult(BaseModel):
    """組織取得結果"""
    organization: OrganizationDTO | None

class GetOrganizationQueryHandler(
    IQueryHandler[GetOrganizationQuery, GetOrganizationQueryResult]
):
    """組織取得クエリハンドラー"""

    def __init__(self, organization_repository: IOrganizationRepository) -> None:
        self._organization_repository = organization_repository

    async def handle(self, query: GetOrganizationQuery) -> GetOrganizationQueryResult:
        organization = await self._organization_repository.find_by_id(
            query.organization_id
        )

        if organization is None:
            return GetOrganizationQueryResult(organization=None)

        # Entity → DTOへの変換
        organization_dto = OrganizationDTO(
            organization_id=str(organization.organization_id.value),
            name=str(organization.name.value),
            created_at=organization.created_at,
            # ...
        )

        return GetOrganizationQueryResult(organization=organization_dto)

QueryHandlerで重要なのは、Entity→DTOの変換です。入力としてQuery(検索条件)を受け取り、出力としてQueryResult(DTO形式のデータ)を返します。責務はデータ取得とDTO変換のみで、Command側のようなビジネスルール検証は行いません。

DTOに変換する理由は、Presentation層で使いやすい形にするためです。Entityはビジネスロジックを持つ重いオブジェクトですが、DTOは単なるデータ転送用の軽いオブジェクトです。この変換をApplication層で行うことで、Presentation層はシンプルに保てます。

データモデルの使い分け

実装していて最も悩んだのが、「どのデータモデルをどこで使うか」でした。

当初、各データモデルの役割は ”なんとなく” 決まっていたものの、具体的なルールがありませんでした。例えば、DTOの使い回しや、概念の分離ができていなかったりしました。

最終的に、各層の境界を明確にするため、以下のようにデータモデルを整理しました:

データモデル 役割 配置場所 命名の由来
Command/Query リクエストデータ(入力) Application層 CQRSの概念からそのまま
CommandResult
/QueryResult
レスポンスデータ(出力) Application層 Commandの結果、Queryの結果という明確な名前
Projection Infrastructure
→Application層のデータ
Infrastructure層 CQRSの文献で使われている用語(後述)
DTO Application
→Presentation層のデータ
Application層 特に他の名前が思いつかなかった

Projectionという名前について補足します。当初はXXResultという名前も検討しましたが、CommandResult/QueryResultと名前が被ってしまうこと、そしてRepositoryの結果とQuery Serviceの結果の両方があり「どちらの名前を取るか」という論争が起きそうだったため不採用にしました。

CQRSの文献を調べたところ、読み取り側のデータモデルとして「Projection」という用語が使われていること(参考1、参考2)がわかり、この名前を採用しました。

これらを明確に分けることで、各層の関心事が混ざらないようにできています。特に名前付けには苦労しましたが、役割が明確になってからはコードの見通しが格段に良くなりました。

UseCase+Infrastructure:認可とデータアクセスをどう接続したか

Application層のCommand/QueryHandlerは純粋なCRUD操作だけを扱うので、認可処理はUseCase層で行います。

UseCaseでの認可統合:Command実行前に権限チェック

UseCase層では、Application層のHandlerをラップして認可処理を追加します。実装例を見てみましょう。

class CreateOrganizationUseCase:
    """組織作成UseCase(認可付き)"""

    def __init__(
        self,
        permission_checker: PermissionChecker,
        create_organization_handler: CreateOrganizationCommandHandler,
    ):
        self._permission_checker = permission_checker
        self._create_organization_handler = create_organization_handler

    async def execute(
        self,
        request: CreateOrganizationRequest,
        user_claims: JWTClaims
    ) -> Organization:
        # 1. 認可チェック(UseCase層の責務)
        await self._permission_checker.verify_role(user_claims, required_roles=["org-admin"])

        # 2. RequestからCommandへの変換
        command = CreateOrganizationCommand(
            organization_id=OrganizationId.generate(),
            name=OrganizationName(request.name),
            # ...
        )

        # 3. Application層のHandlerを実行
        result = await self._create_organization_handler.handle(command)

        return result.organization

このように、認可チェックとCommand実行を分離することで、いくつかのメリットがあります。

まず、Application層はビジネスロジックに集中できます。「組織を作成する」というドメインロジックに認可処理が混ざらないので、コードが読みやすくなります。

次に、認可ロジックを一箇所に集約できます。権限チェックの方法を変更したいとき、UseCase層だけ修正すれば済みます。

そして何より、テストが書きやすくなります。Application層はビジネスロジックの単体テストに集中でき、UseCase層は認可のテストとして分離できました。

認可処理の設計

認可処理の実装について、今回のプロダクトでは細かいロール設定をバックエンド側で担当することにしました。これにより、より柔軟な権限管理を実現しています。

Query Serviceでのデータ取得最適化

読み取り側において、複雑なクエリはInfrastructure層のQuery Serviceで最適化します。

Query Serviceとは、最適化されたクエリを実行するための読み取り専用のサービスです。Repositoryパターンとは異なり、複数のテーブルをJOINして1回のクエリで必要なデータを取得することに特化しています。

今回のプロダクトは、ダッシュボードシステムであり、複雑なデータの集計・可視化を行います。Repositoryパターンだけだと大量のクエリが発生し、パフォーマンスが低下する可能性があるため、CQRSを採用し、読み取り側ではQuery Serviceを使って1回のクエリで効率的にデータを取得しています。

具体例を見てみましょう(実際のプロダクトのコードを簡略化したものです):

class OrganizationQueryService:
    """組織クエリサービス(Infrastructure層)"""

    async def get_organization_with_stats(
        self, organization_id: str
    ) -> OrganizationProjection | None:
        # 最適化されたJOINクエリで一度に取得
        query = select(
            OrganizationModel.id,
            OrganizationModel.name,
            func.count(MemberModel.id).label('member_count'),
            func.count(ItemModel.id).label('item_count'),
        ).select_from(
            OrganizationModel
        ).outerjoin(
            MemberModel
        ).outerjoin(
            ItemModel
        ).where(
            OrganizationModel.id == organization_id
        ).group_by(OrganizationModel.id)

        result = await self._session.execute(query)
        row = result.first()

        if not row:
            return None

        # Projectionとして返す(DTO変換はQueryHandlerで)
        return OrganizationProjection(
            id=row.id,
            name=row.name,
            member_count=row.member_count or 0,
            item_count=row.item_count or 0,
        )

ここでのポイントは、Query ServiceがProjectionを返す点です。データフローは次のようになります:

QueryService(Projection) → QueryHandler(DTO) → UseCase → API(Response)

ProjectionはInfrastructure層からApplication層へのデータモデル、DTOはApplication層からPresentation層へのデータモデルと、役割が分かれています。

この設計にしたのは、Infrastructure層にドメイン知識を漏らさないためです。もしQuery ServiceがDTOを直接返すと、Infrastructure層がPresentation層の都合(どんな形式でデータを返すか)を知る必要があり、依存方向が逆転してしまいます。Projectionという中間データモデルを挟むことで、各層の責務を明確に保てています。

Query ServiceとRepositoryの使い分け

具体的な判断基準として、複数回JOINが必要かどうかを見ています。

単純な取得であればRepositoryを使いますが、組織に紐づく複数の関連データを同時に取得する場合など、複数のテーブルをJOINする必要がある場合はQuery Serviceを使います。これにより、クエリ実行回数を抑え、パフォーマンスを向上させています。

Presentation層とCore層の扱い

Presentation層:UseCaseに委譲するだけの薄い層

APIエンドポイントは本当に薄く、UseCaseに処理を委譲するだけです。

@router.post("", response_model=CreateOrganizationResponse, status_code=201)
@require_roles(["org-admin"])
async def create_organization(
    request: CreateOrganizationRequest,
    create_organization_use_case: Annotated[
        CreateOrganizationUseCase, Depends(get_create_organization_use_case)
    ],
    user_claims: Annotated[JWTClaims, Depends(get_current_user)],
) -> CreateOrganizationResponse:
    """組織作成エンドポイント"""
    try:
        # UseCaseに処理を委譲
        organization = await create_organization_use_case.execute(request, user_claims)

        # レスポンスに変換して返すだけ
        return CreateOrganizationResponse(
            organization_id=str(organization.organization_id.value),
            name=str(organization.name.value),
            # ...
        )
    except ValidationException:
        # エラーハンドリング(詳細は省略)
        raise

エンドポイントではビジネスロジックを持たず、HTTPの世界とアプリケーションの世界を繋ぐ責務だけに徹しています。実際のコードを見ても、UseCaseの実行結果をResponseに変換し、例外をHTTPステータスコードに変換しているだけです。

この設計にしてよかったのは、エンドポイントのテストがシンプルになったことです。HTTPリクエストの形式が正しいかだけをテストすればよく、ビジネスロジックのテストはUseCase層で完結します。

Core層

Core層はDI ContainerやJWT検証といった共通機能を提供していますが、アーキテクチャの中心ではないので本稿では詳しく触れません。重要なのは、各層の責務を明確に分けることです。

まとめ:実際に導入してみて感じたメリット

DDD+CQRSを実際のプロダクトに導入してみて、以下のメリットを感じました。

まず、認可ロジックの明確化です。UseCase層に認可処理を集約することで、「誰が何にアクセスできるか」が一目でわかるようになりました。

次に、テストの容易性です。Application層のHandlerがシンプルなので、単体テストが書きやすくなりました。認可処理と分離されているため、ビジネスロジックのテストに集中できます。また、Infrastructure層に依存しないため、DBセッションを用意する必要もありません。

そして、保守性の向上です。各層の責務が明確なので、変更の影響範囲が予測しやすく、コードの見通しが良くなりました。新しいメンバーがジョインしたときも、「この機能はどこを見ればいい?」という質問に明確に答えられるようになっています。

特にApplication層とUseCase層を分けたことで、ビジネスロジックと認可処理が混ざらず、それぞれに集中できるようになりました。当初は「層が多すぎて複雑では?」と心配したのですが、実装を進めるうちに明確なルールが整備され、責務が明確な分、むしろシンプルに感じています。

DDDもCQRSも学習コストは高いですが、実際に手を動かして設計してみると、その価値が実感できます。この記事が、これからDDD+CQRSを導入しようとしている方の参考になれば幸いです。

似顔絵
書いた人:筧