AWS Lambda で LINE Bot を作成する
本エントリでは、公式 sdk のサンプルコード を AWS Lambda で利用する方法を紹介します。
LINE Bot のサンプル
from flask import Flask, request, abort
from linebot.v3 import (
WebhookHandler
)
from linebot.v3.exceptions import (
InvalidSignatureError
)
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage
)
from linebot.v3.webhooks import (
MessageEvent,
TextMessageContent
)
app = Flask(__name__)
configuration = Configuration(access_token='YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
@app.route("/callback", methods=['POST'])
def callback():
# get X-Line-Signature header value
signature = request.headers['X-Line-Signature']
# get request body as text
body = request.get_data(as_text=True)
app.logger.info("Request body: " + body)
# handle webhook body
try:
handler.handle(body, signature)
except InvalidSignatureError:
app.logger.info("Invalid signature. Please check your channel access token/channel secret.")
abort(400)
return 'OK'
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message_with_http_info(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=event.message.text)]
)
)
if __name__ == "__main__":
app.run()
Webhook のエントリポイントとなる callback()
関数は、リクエストのヘッダから X-Line-Signature
とボディをテキストとして取得しています。
その後は WebhookHandler
に処理を移譲しているだけなので、callback()
関数を Lambda のハンドラ関数に書き換えることで、Lambda で LINE Bot を動かすことができます。
- from flask import Flask, request, abort
from linebot.v3 import (
WebhookHandler
)
from linebot.v3.exceptions import (
InvalidSignatureError
)
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage
)
from linebot.v3.webhooks import (
MessageEvent,
TextMessageContent
)
- app = Flask(__name__)
configuration = Configuration(access_token='YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
- @app.route("/callback", methods=['POST'])
- def callback():
+ def lambda_function(event, context):
# get X-Line-Signature header value
- signature = request.headers['X-Line-Signature']
+ signature = event['headers']['x-line-signature']
# get request body as text
- body = request.get_data(as_text=True)
+ body = event['body']
- app.logger.info("Request body: " + body)
# handle webhook body
try:
handler.handle(body, signature)
except InvalidSignatureError:
- app.logger.info("Invalid signature. Please check your channel access token/channel secret.")
- abort(400)
+ return {'statusCode': 400, 'Invalid signature. Please check your channel access token/channel secret.'}
- return 'OK'
+ return {'statusCode': 200, 'OK'}
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message_with_http_info(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=event.message.text)]
)
)
- if __name__ == "__main__":
- app.run()
全体コード
from linebot.v3 import (
WebhookHandler
)
from linebot.v3.exceptions import (
InvalidSignatureError
)
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage
)
from linebot.v3.webhooks import (
MessageEvent,
TextMessageContent
)
configuration = Configuration(access_token='YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
def lambda_function(event, context):
# get X-Line-Signature header value
signature = event['headers']['x-line-signature']
# get request body as text
body = event['body']
# handle webhook body
try:
handler.handle(body, signature)
except InvalidSignatureError:
return {'statusCode': 400, 'Invalid signature. Please check your channel access token/channel secret.'}
return {'statusCode': 200, 'OK'}
@handler.add(MessageEvent, message=TextMessageContent)
def handle_message(event):
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message_with_http_info(
ReplyMessageRequest(
reply_token=event.reply_token,
messages=[TextMessage(text=event.message.text)]
)
)