これまでシリーズの記事で書いたように、お届けチームの扱っているシステムはイベントを扱って非同期処理をしています。
非同期処理でイベントを扱うということは、イベントをモデルとして扱うのとセットです。 イベントは書き込み系で作成しか発生しないモデルかつ、参照系でイベントそのものをクエリしないものとしています。
この記事では書き込み系・参照系でそれぞれイベントに関わる実装がどうなっているのか紹介します。
書き込み系
イベントはアプリケーションでも書き込み系の側面の経路で発生します。 イベントは発生元としてライフサイクル上ステートフルなモデルがいて、そのモデルで発生した出来ごととして扱われています。 またイベントはシステム寄りなネーミングや意味づけより、実際の業務に近づけたり業務から取り出して定義する努力をしています。
たとえばPickingというモデルは、「ある1注文のある1つの商品をN個売り場から探してピックしたり、見つからなかったら品切れとする。お客様の要求数分ピックまたは品切れがあることで業務が完了する」ような一連の業務から成り立っています。
「ピックした」、「品切れにした」ような一連の業務中の出来ごとをイベントとしています。 たとえばPickingモデルでピックするようなメソッド(Picking#pickItem)は次のようになっています。
- 合計のピック済み数がお客様の要求数未満である。
- itemPickedイベントをPickingの状態とpickItemの入力から生成する。
- Pickingの状態をItemPickedイベント元に次の状態へ遷移させる。
class Picking { PickingItemPicked pick( PickedItemAmount pickedAmount, { required bool withImmediatelyPack, }) { // 合計のピック済み数がお客様の要求数未満である。 if (pickingItems .progressAmount(requestedAmount) .isPickingItemsOperationFinished()) { throw Error( 'pickingItemsOperation is already finished'); } // itemPickedイベントをPickingの状態とpickItemの入力から生成する。 // apply -> Pickingの状態をItemPickedイベント元に次の状態へ遷移させる。 return apply(PickingItemPicked( attribute: _nextAttribute, beforeChangeAmount: pickingFinishCondition().progressAmount(), pickingItems: pickingItems.pick( pickedAmount: pickedAmount, withoutScan: withoutScan, ), changedAmount: changedItems.progressAmount(requestedAmount), withImmediatelyPack: withImmediatelyPack, )); } }
これはイベントを発生させるモデルが行う段取りであり、次の3段構成と表せます。
- 入力の検証
- イベントの生成、
- 自分へのイベントの適用
この3段構成中、「自分へのイベントの適用」によって状態遷移が起きるように仕向けているので、状態遷移にはイベントが不可欠になります。 「状態遷移あるところにはイベントあり」を実現することで、段取りに則るとデータ分析時に「この操作について分析したいのにデータないじゃん」が起きないようになります。
分析用データについては、「GAでイベントを取ればいいのでは?」という意見もあるかもしれません。しかし、状態遷移とイベントの発生を一体化することで、「送り忘れ」や「アプリケーションデータベースとGAに送ったイベントの不整合」を防ぐことができます。 「分析のためにイベントが役立つ」というお話は別の記事で取り上げる予定です。
モデルの永続化
StailerのサーバサイドアプリケーションではFirestoreを中心にあるDBとして扱っています。 なのでFirestoreへイベントと、Pickingモデル自体の状態の書き込みをしています。 イベントソーシングらへんの言葉でいうスナップショットを常に取っているわけです。
class PickingRepositoryOnFirestore implements PickingRepository { @override Future<void> store(FirestoreTransaction tx, Picking picking) async { final pickingDto = PickingFirestoreDto.fromPicking(picking); // pickingDtoはその時のPickingの状態をFirestore用に表していてupsertで永続化される。 tx.update( pickingDto, createIfNotExist: true, ); for (final event in picking.occurredEvents) { // Eventはcreateしか起きない。 // pickpack外の領域がイベントをトリガーにしたいケースがあり、 // 永続化の時点で構造が決まっていると永続化以降のプロセスで扱いやすいので、 // protobufで構造が宣言されたJSONをFirestoreに渡している。 tx.create(PickingEventProtoTranslate().toProtoFormat(event)); } } }
スナップショットを使わない場合、Pickingモデルをデータストアから復元する際には、過去に発生したすべてのイベントをFirestoreから読み出し、それを適用して状態を復元します。
スナップショットとは、ある時点でのデータの状態を保存したものです。これで過去のイベントをすべて再生する必要がなくなります。
Pickingが扱うピックの業務はスマホアプリで商品をカメラにかざすと、「商品をピックした」イベントが発生したりデータの動かします。 ピックは高い頻度でwriteが発生するので、スナップショットを参照すれば常に最新の状態を取れるとFirestoreとのやりとりが最小になります。
他のモデルも同様の理由で常にスナップショットを取る場合がありますが、モデルによってはスナップショットを取らない場合もあります。
Stailerの実装とは多くの部分で一致しませんが、実装について気になった方はコチラの記事を読むと、より具体的な内容や方法が分かると思います。
アクターシステムに頼らずEvent Sourcingする方法について
現状の書き込み頻度からは激しい問題が起きてないので検討もしていませんが、 Pickingようなモデルはakka cluster sharding上のActorで実現できるとまさにそのメリットが受けやすいと個人的に考えています。
Akka Platform GuideのMemory Image Pattern
参照系
まず1つがイベントをトリガーに作ったデータを参照する系統です。CQRS+ESに馴染みがあるとイメージしやすいかと思います。 Stailerでは厳密にはCQRS+ESに沿ってないので書き込み系で言及した最新のスナップショットを参照系でも使っています。
class PickingRepositoryOnFirestore implements PickingRepository { @override Future<List<Picking>> listOriginLabelPrintReadyPickingsByPickingOrderId( PickingOrderId pickingOrderId, ) { // storeで永続化していたDtoをTransactionを不要とする書き込み以外の用途で読み出している。 final pickingDtoStream = _firestore.listAll( PickingFirestoreDto.reference, where: [ PickingFirestoreDto.filterOfSamePickingOrderId(pickingOrderId), PickingFirestoreDto.filterOfProductOriginLabelPrintReady(), PickingFirestoreDto.filterOfNonCancelledPicking(), ], ); // Dtoのまま返さずPickingのモデルにしている return pickingDtoStream .map((dto) => dto.toPicking()) .toList(); } } // 読み込み系のユースケース(アプリケーションサービス) class ListPickingsForOriginLabelPrintingUseCase { final PickingRepository _pickingRepository; @override Future<ListPickingsForOriginLabelPrintingResponse> execute(ListPickingsForOriginLabelPrintingRequest input) async { // Repositoryを使って必要なデータを読み出す final pickings = await _pickingRepository .listOriginLabelPrintReadyPickingsByPickingOrderId( PickingOrderId(input.pickingOrderId), ); // 例に出しているユースケースは非常にシンプルですが、Picking以外にもいろんなデータを跨ぐ取得系のユースケースは // FirestoreがRDBほど複雑な結合をできないのでかなりロジックがモリモリです。 // データベースがRDBのように複雑な結合を実行できるならRepositoryに読み込み系用のメソッドを実装せず、読み取り用のクエリ実行に特化させたDAOとデータモデルを定義していたと思います。 // presenterに渡す return pb.ListPickingsForOriginLabelPrintingResponse( pickings: pickings.map(pickingPresenter), ); } }
スナップショットが読み込み用に使われていると読み込み系の都合が徐々に書き込み系へ出てくるリスクはありますが、今のところそれが問題になっていないのでそのままというところです。
アプリケーションコード的にはリポジトリを呼び出しています。 一方でパフォーマンスを考慮してPickingをそのまま取得せず一部分をfirestoreから取り出すような際はPickingとして扱わず参照用のモデルを定義して取り回しています。
// PickingProgressByProductは読み込み系用のためにPickingDtoからフィールドを絞った読み込み用モデル class PickingRepositoryOnFirestore implements PickingRepository { @override Future<List<PickingProgressByProduct>> listPickingProgressesByProduct( List<PickingOrderId> pickingOrderIds) { final dtoStream = _firestore.listAllWithContainedIn( PickingProgressByProductDto.reference, containedIn: ( 'pickingOrderId', pickingOrderIds.map((id) => id.toString()).toList() ), where: [ PickingFirestoreDto.filterOfNonCancelledPicking(), ], // PickingDtoとして永続化した際のすべてのフィールドを取得せず、PickingProgressByProductに必要なものだけ取得を命じている。 select: PickingProgressByProductDto.requireFields, ); return dtoStream.map((dto) => dto.toModel()).toList(); } }
集計データの参照系
ある1日にあったピッキングの進捗 のような集計データに関してはイベントを取り込み集計データを作るようになっています。
これはコチラの記事にもあるように、Firestoreと付き合う上で集計データは事前に作った方の扱いが容易ゆえに採用しています。
集計データを作る際にもイベントを元に作り出すことは都合がいいことで、たまにあります。
たとえばある集計データは ある商品においてその日の最後にあった品切れあった日時を持っていたい場合を考えてみます。
集計データがPickingモデルを取り込む場合はPicking自身も最後にあった品切れの日時をもつ必要があります。
最後にあった品切れの日時がPicking自身の振る舞いにも不可欠なことであれば受け入れるのは容易ですが、そうでなければPickingモデルに集計のために持ち物を増やすことになってきます。
こういったケースが増えていくとモデル側に振る舞いには影響しない持ち物がずらっと並ぶ状況になります。
// Picking自身はあまり興味がないが、集計のために持足せてしまうケース class Picking { final ProductId productId; // 品切れ両方とも作業が始まる前は値がないのでnullableとして扱うことにした final DateTime? lastStockoutAt; } class DailySummary { // 商品ごとに品切れの時刻を持つ Map<ProductId, DateTime> _lastStockoutTimes; void addSummaryWhenStockout(Picking picking) { final alreadyRecordedTime = _lastStockoutTimes.get(stockout.productId); // 品切れ後にはlastStockoutAtに値があることを信じる if (picking.lastStockoutAt!.isAfter(alreadyRecordedTime)) { _lastStockoutTimes.add(picking.lastStockoutAt); } } }
一方でイベントを取り込むような実装をすると品切れにした、ピックしたというイベントが発生日時とともに流れてくるので、
集計データを作る側では流れてきたモノをシンプルに採用するか判断するようになります。
// Pickingは品切れを決定するためのメソッドでPickingItemStockoutを生成するが、lastStockoutAtのようなフィールドは持たない class PickingItemStockout { final PickingId pickingId; final ProductId productId; final DateTime occurrenceTime; } class DailySummary { // 商品ごとに品切れの時刻を持つ Map<ProductId, DateTime> _lastStockoutTimes; void addSummaryWhenStockout(PickingItemStockout stockout) { final alreadyRecordedTime = _lastStockoutTimes.get(stockout.productId); if (occurrenceTime.isAfter(alreadyRecordedTime)) { _lastStockoutTimes.add(stockout.occurrenceTime); } } }
今回挙げた例1つでは体感しずらいところですが、Pickingのモデルの持ち物が読み取り用の集計値の計算のためにnullableのフィールドを10個も持ち出すと、 いつどのときはフィールドがあるんだ?だったり、意図しないときにフィールドを参照するロジックが書かれたり認知負荷をグッと上げるコードが増える要因になりやすいです。
全体通じて
イベントがアプリケーションの中心!になるように考えて作ってきました。するとCQRS+ESで出てくるような書き込み系でイベント発生、読み込み系はイベントから取り込んだデータを提供する構造に近づいてきました。 CとQの分離を徹底して目指さなくても現状問題はないので、その定義に沿ったCQRS+ESを目指す展望は現状ありませんが悩んだときには参考になると考えています。
Command側にQuery側の関心があまりに影響してくる一線を超えると、あっという間に辛くることを自分は経験からこのことを知っているので「ここは混ぜたら危険だな〜」には敏感になりながら開発をしています。
今回書いた内容はオブジェクト設計スタイルガイドの内容とかなり近いです。 本の中では「イベントを使う、それでもCQRSまでやらなくてもメリットはある」ようなことも書いてあり、CQRS+ES全部実現するのは難しいけど、できるところから始めてみたいようなケースには参考になるのではないでしょうか。 「オブジェクト設計」とありますが、「オブジェクト指向」を問わず参考になることは多いオススメブックです。
次回に続く
次回も引き続きお届けチームによるイベント駆動設計への取り組みを紹介していきます。
お届けチームでは絶賛エンジニアを募集中です。カジュアル面談もwelcomeです。 ご応募お待ちしております。