production.log

株式会社リブセンスでエンジニアをやっている星直史のブログです。

【ServerlessFramework】DynamoDB Streamsでデータの更新をトリガーにLambdaを動かす方法。

概要

表題の通り、DynamoDB Streamsでデータの更新をトリガーにLambdaを動かす方法について説明します。

AWSマネジメントコンソール上でのDynamoDB Streamsの有効化

まず、AWSマネジメントコンソールでStreamsを有効化する必要があります。 DynamoDB > Tables > テーブル選択 > Overview > Stream details > ManageStreamを選択

これを選択すると、ストリームを経由して渡されるデータ構造を選択できるラジオボタンが表示されます。 それぞれの意味は下記の通りです。

  • [Keys only] — 変更された項目のキー属性のみ取得。
  • [New image] — 変更後のデータのみ取得。
  • [Old image] — 変更前のデータのみ取得。
  • [New and old images] — 変更後、変更前のデータの両方取得。

あとは作成すれば、ARNが表示されます。

serverless.ymlの設定

次にserverless.ymlに設定を書き、Lambdaと紐づける必要があります。

provider:
  name: aws
  runtime: nodejs6.10
  stage: dev
  region: us-west-2
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - dynamodb:DescribeStream
        - dynamodb:GetRecords
        - dynamodb:GetShardIterator
        - dynamodb:ListStreams
      Resource:
        - "${AWSマネジメントコンソール上に表示されているARN}"
functions:
  dynamo_stream:
    handler: handler.dynamo_stream
    events:
      - stream:
          arn: "${AWSマネジメントコンソール上に表示されているARN}"
          batchSize: 1
          startingPosition: TRIM_HORIZON
          enabled: true

functions.dynamo_stream.events.stream内の各項目は

batchSize: 一度に処理するストリームの数を表します。1~10000まで指定できます。1以上の値を指定した場合、Lambdaの処理の中でループで処理をする必要があります。 startingPosition: 処理開始するストリームの位置を指定します。最新から取得か最古から取得かのどちらかを指定できます。LATEST | TRIM_HORIZON enabled true: 有効 / 無効を指定 true | false

また、eventの設定以外には、IAMの設定も必要となります。 上記の設定項目は最低限の設定です。これを設定しないとsls deploy -vをした時に下記エラーに見舞われます。

  Serverless Error ---------------------------------------
 
     An error occurred while provisioning your stack: DynamoUnderscorestreamUnderscoreproxyEventSourceMappingDynamodbDevitems
     - Cannot access stream AWSマネジメントコンソール上に表示されているARN
     Please ensure the role can perform the GetRecords, GetShardIterator,
     DescribeStream, and ListStreams Actions on your stream
     in IAM..
 

取得できるJSON

{
  "Records": [
    {
      "eventID": "78h9r97gawegaj7ddnga6e6w",
      "eventName": "MODIFY",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-west-2",
      "dynamodb": {
        "ApproximateCreationDateTime": 1492654321,
        "Keys": {
          "id": {
            "N": "123456789"
          }
        },
        "NewImage": {
          "column_a": {
            "N": "2"
          }
        },
        "OldImage": {
          "column_a": {
            "N": "1"
          }
        },
        "SequenceNumber": "405760000000000123456789",
        "SizeBytes": 123,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "AWSマネジメントコンソール上に表示されているARN"
    }
  ]
}

しっかり新旧両方のデータが取得できていますね。

解決できなかった点

  1. ServerlessFrameworkだけで完結できない。 説明の通り、AWS マネジメントコンソールでストリームを追加し、それを元にserverless.ymlを設定しなければなりません。 deploy時に失敗するから気づけるとは思うのですが、漏れが発生する気しかしないです。

  2. 設定できるストリームが1テーブルにつき1つ 当初、レコード追加時のストリーム、Aカラムを更新したときのストリーム….といったように、細かく設定できると思っていたのですが、 試して見た結果、「何かしらの変更が加えられたらJSONに新旧両方のデータを詰めて渡すからよしなにやってね」って感じでした。 DynamoDBにカラムが新規作成されたらLambdaAを、更新の時はLambdaBを実行しようとした場合、テンプレートメソッドパターンや、プロキシパターンなり使わないと収集つかなくなりそうな印象を受けました。

まとめ

  1. AWSマネジメントコンソールでStreamsを有効化
  2. serverless.ymlに設定

この2ステップだけでDynamoDBストリームを受け取りLambdaに渡せるのはやはり便利です。 ただ、マネコンとserverless.ymlの両方に設定を加えなければならないのは漏れが発生しそうなので、もう一声といったところでしょうか。