🚧 This website is under construction. 🚧
TIL
AWS
Lambda
Linebot

AWS Lambda で LINE Bot を作成する

本エントリでは、公式 sdk のサンプルコード (opens in a new tab) を AWS Lambda で利用する方法を紹介します。

LINE Bot のサンプル

from flask import Flask, request, abort
 
from linebot.v3 import (
    WebhookHandler
)
from linebot.v3.exceptions import (
    InvalidSignatureError
)
from linebot.v3.messaging import (
    Configuration,
    ApiClient,
    MessagingApi,
    ReplyMessageRequest,
    TextMessage
)
from linebot.v3.webhooks import (
    MessageEvent,
    TextMessageContent
)
 
app = Flask(__name__)
 
configuration = Configuration(access_token='YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
 
 
@app.route("/callback", methods=['POST'])
def callback():
    # get X-Line-Signature header value
    signature = request.headers['X-Line-Signature']
 
    # get request body as text
    body = request.get_data(as_text=True)
    app.logger.info("Request body: " + body)
 
    # handle webhook body
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        app.logger.info("Invalid signature. Please check your channel access token/channel secret.")
        abort(400)
 
    return 'OK'
 
 
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
    with ApiClient(configuration) as api_client:
        line_bot_api = MessagingApi(api_client)
        line_bot_api.reply_message_with_http_info(
            ReplyMessageRequest(
                reply_token=event.reply_token,
                messages=[TextMessage(text=event.message.text)]
            )
        )
 
if __name__ == "__main__":
    app.run()
 

Webhook のエントリポイントとなる callback() 関数は、リクエストのヘッダから X-Line-Signature とボディをテキストとして取得しています。 その後は WebhookHandler に処理を移譲しているだけなので、callback() 関数を Lambda のハンドラ関数に書き換えることで、Lambda で LINE Bot を動かすことができます。

- from flask import Flask, request, abort
 
  from linebot.v3 import (
      WebhookHandler
  )
  from linebot.v3.exceptions import (
      InvalidSignatureError
  )
  from linebot.v3.messaging import (
      Configuration,
      ApiClient,
      MessagingApi,
      ReplyMessageRequest,
      TextMessage
  )
  from linebot.v3.webhooks import (
      MessageEvent,
      TextMessageContent
  )
 
- app = Flask(__name__)
 
  configuration = Configuration(access_token='YOUR_CHANNEL_ACCESS_TOKEN')
  handler = WebhookHandler('YOUR_CHANNEL_SECRET')
 
 
- @app.route("/callback", methods=['POST'])
- def callback():
+ def lambda_function(event, context):
      # get X-Line-Signature header value
-     signature = request.headers['X-Line-Signature']
+     signature = event['headers']['x-line-signature']
 
      # get request body as text
-     body = request.get_data(as_text=True)
+     body = event['body']
-     app.logger.info("Request body: " + body)
 
      # handle webhook body
      try:
          handler.handle(body, signature)
      except InvalidSignatureError:
-         app.logger.info("Invalid signature. Please check your channel access token/channel secret.")
-         abort(400)
+         return {'statusCode': 400, 'Invalid signature. Please check your channel access token/channel secret.'}
 
-     return 'OK'
+     return {'statusCode': 200, 'OK'}
 
  @handler.add(MessageEvent, message=TextMessageContent)
  def handle_message(event):
      with ApiClient(configuration) as api_client:
          line_bot_api = MessagingApi(api_client)
          line_bot_api.reply_message_with_http_info(
              ReplyMessageRequest(
                  reply_token=event.reply_token,
                  messages=[TextMessage(text=event.message.text)]
              )
          )
 
- if __name__ == "__main__":
-     app.run()
 

全体コード

from linebot.v3 import (
    WebhookHandler
)
from linebot.v3.exceptions import (
    InvalidSignatureError
)
from linebot.v3.messaging import (
    Configuration,
    ApiClient,
    MessagingApi,
    ReplyMessageRequest,
    TextMessage
)
from linebot.v3.webhooks import (
    MessageEvent,
    TextMessageContent
)
 
 
configuration = Configuration(access_token='YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
 
 
def lambda_function(event, context):
    # get X-Line-Signature header value
    signature = event['headers']['x-line-signature']
 
    # get request body as text
    body = event['body']
 
    # handle webhook body
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        return {'statusCode': 400, 'Invalid signature. Please check your channel access token/channel secret.'}
 
    return {'statusCode': 200, 'OK'}
 
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
    with ApiClient(configuration) as api_client:
        line_bot_api = MessagingApi(api_client)
        line_bot_api.reply_message_with_http_info(
            ReplyMessageRequest(
                reply_token=event.reply_token,
                messages=[TextMessage(text=event.message.text)]
            )
        )