はじめに
はじめまして、SREチームのhogeです。
検証環境にBasic認証をかけて簡易的なアクセス制限をかけることがあるかと思います。
Basic認証とはいえ、以下のような理由からパスワードを定期的に更新することが望ましいです。
- パスワードが長期間使い回されると、漏洩や不正アクセスのリスクがある
- チームメンバーの変更(退職、プロジェクト移動など)があった場合、古いメンバーは開発環境にアクセスできないようにした方が良い
しかし、手動で更新するのも手間なため、今回はAWSのCloudfront + Lambda@Edge + SecretsManagerを使用してでBasic認証をかけつつ、パスワードを自動的に定期更新する仕組みを作ってみたいと思います。
構成
- クライアントがCloudfrontディストリビューションにアクセス
- CloudfrontビューアリクエストがLambda@edgeを呼び出す
- Lambda@edgeがSecretsManagerから認証情報を取得し、Basic認証を行う
SecretsManagerに保管されている認証情報はローテーション設定によりLambdaで月に一度更新されます。
実装
全体のコード
デプロイできる状態にはなっていますが、terraformのリソース名とLambda@edge上のSecretsManager IDは適宜書き換える必要があります。
providers.tf
terraform { required_version = "~>1.9.0" required_providers { aws = { source = "hashicorp/aws" version = "~>5.76.0" } } } provider "aws" { region = "ap-northeast-1" } provider "aws" { alias = "virginia" region = "us-east-1" } terraform { backend "local" {} }
lambda@edge
- Lambda@edgeはバージニア北部に作成する必要があります。
- SecretsManagerの権限を付与しています。
main.tf
module "lambda_at_edge" { source = "terraform-aws-modules/lambda/aws" providers = { aws = aws.virginia } lambda_at_edge = true function_name = "basicAuth" handler = "lambda_handler.lambda_handler" runtime = "python3.12" source_path = "src/basic_auth_lambda_at_edge/lambda_handler.py" attach_policy_json = true policy_json = jsonencode( { Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = "secretsmanager:GetSecretValue" Resource = aws_secretsmanager_secret.basic_auth.arn } ] } ) }
lambda@edgeのコード
lambda_hander.py
import boto3 import base64 import json cached_credentials = None def initialize_secrets(secret_id): global cached_credentials if cached_credentials: return cached_credentials client = boto3.client("secretsmanager", region_name="ap-northeast-1") try: response = client.get_secret_value(SecretId=secret_id) if "SecretString" in response: cached_credentials = json.loads(response["SecretString"]) else: cached_credentials = json.loads(base64.b64decode(response["SecretBinary"])) print("Secrets successfully loaded and cached") return cached_credentials except Exception as e: print(f"Error retrieving secrets: {e}") raise def lambda_handler(event, context): # 必要に応じて書き換える secret_id = "your-secrets" credentials = initialize_secrets(secret_id) auth_user = credentials["username"] auth_pass = credentials["password"] # Authorizationヘッダーの確認 request = event["Records"][0]["cf"]["request"] headers = request["headers"] auth_string = f"Basic {base64.b64encode(f'{auth_user}:{auth_pass}'.encode()).decode()}" if ( "authorization" not in headers or headers["authorization"][0]["value"] != auth_string ): return { "status": "401", "statusDescription": "Unauthorized", "headers": { "www-authenticate": [{"key": "WWW-Authenticate", "value": "Basic"}], }, "body": "Unauthorized", } return request
ポイントは以下の部分です。
SecretsManagerから取得した認証情報をグローバル変数(handler外)に保持することで、Lambdaのコールドスタート時にのみSecretsManagerに問い合わせるようにしています。
cached_credentials = None def initialize_secrets(secret_id): global cached_credentials if cached_credentials: return cached_credentials client = boto3.client("secretsmanager", region_name="ap-northeast-1") try: response = client.get_secret_value(SecretId=secret_id) if "SecretString" in response: cached_credentials = json.loads(response["SecretString"]) else: cached_credentials = json.loads(base64.b64decode(response["SecretBinary"])) print("Secrets successfully loaded and cached") return cached_credentials except Exception as e: print(f"Error retrieving secrets: {e}") raise
料金 - AWS Secrets Manager | AWS
SecretsManagerのAPI呼び出しは上記のように課金されます。検証環境で使うぶんにはそこまでコストはかからなそうですが、(要件によると思うが)大規模な負荷テストを行う際にはコストが膨らむ可能性があります。 そのため、Lambdaのグローバル変数にキャッシュして、SecretsManagerへのアクセス回数を減らしています。
SecretsManager
- 30日に一度Lambda関数でローテーションするように設定します。
- ローテーション用のLambda関数をSecretsManagerから呼び出せるように権限を設定し、SecretsManagerのアクションをiamに付与します。
main.tf
resource "aws_secretsmanager_secret" "basic_auth" { name = "/dev/basic-auth" } resource "aws_secretsmanager_secret_rotation" "basic_auth_rotation" { secret_id = aws_secretsmanager_secret.basic_auth.id rotation_lambda_arn = module.secrets_manager_rotation.lambda_function_arn rotation_rules { automatically_after_days = 30 } } module "secrets_manager_rotation" { source = "terraform-aws-modules/lambda/aws" function_name = "basic_auth_rotation" runtime = "python3.12" handler = "lambda_handler.lambda_handler" source_path = "src/basic_auth_rotation/lambda_handler.py" allowed_triggers = { SecretsManager ={ "service": "secretsmanager" "source_arn": aws_secretsmanager_secret.basic_auth.arn } } create_current_version_allowed_triggers = false attach_policy_json = true policy_json = jsonencode( { Version = "2012-10-17" Statement = [ { Effect = "Allow" Action = [ "secretsmanager:DescribeSecret", "secretsmanager:GetSecretValue", "secretsmanager:PutSecretValue", "secretsmanager:UpdateSecretVersionStage" ] Resource = aws_secretsmanager_secret.basic_auth.arn }, { "Effect": "Allow", "Action": [ "secretsmanager:GetRandomPassword" ], "Resource": "*" }, ] } )
SecretsManagerローテーション用のLambda関数のコード
- ランダムな文字列を生成してパスワードを更新しています。
- 例えば共有のためにスプシの更新や通知などがあればこのLambda内に記述します
- 以下のリポジトリのサンプルコードを参考にして作成しました
lambda_handler.py
import boto3 import logging import os import json logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): """Secrets Manager Rotation Template This is a template for creating an AWS Secrets Manager rotation lambda Args: event (dict): Lambda dictionary of event parameters. These keys must include the following: - SecretId: The secret ARN or identifier - ClientRequestToken: The ClientRequestToken of the secret version - Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret) context (LambdaContext): The Lambda runtime information Raises: ResourceNotFoundException: If the secret with the specified arn and stage does not exist ValueError: If the secret is not properly configured for rotation KeyError: If the event parameters do not contain the expected keys """ arn = event['SecretId'] token = event['ClientRequestToken'] step = event['Step'] logger.info(f"Received Step: {step}") service_client = boto3.client('secretsmanager', endpoint_url=os.getenv('SECRETS_MANAGER_ENDPOINT', None)) metadata = service_client.describe_secret(SecretId=arn) if not metadata['RotationEnabled']: logger.error(f"Secret {arn} is not enabled for rotation.") raise ValueError(f"Secret {arn} is not enabled for rotation.") versions = metadata['VersionIdsToStages'] if token not in versions: logger.error(f"Secret version {token} has no stage for rotation of secret {arn}.") raise ValueError(f"Secret version {token} has no stage for rotation of secret {arn}.") if "AWSCURRENT" in versions[token]: logger.info(f"Secret version {token} already set as AWSCURRENT for secret {arn}.") return elif "AWSPENDING" not in versions[token]: logger.error(f"Secret version {token} not set as AWSPENDING for rotation of secret {arn}.") raise ValueError(f"Secret version {token} not set as AWSPENDING for rotation of secret {arn}.") if step == "createSecret": create_secret(service_client, arn, token) elif step == "setSecret": pass elif step == "testSecret": pass elif step == "finishSecret": finish_secret(service_client, arn, token) else: raise ValueError("Invalid step parameter") def create_secret(service_client, arn, token): """ランダムなBasic認証用文字列を生成してSecrets Managerに保存""" try: service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING") logger.info(f"createSecret: Successfully retrieved secret for {arn}.") except service_client.exceptions.ResourceNotFoundException: # ランダムなBasic認証用文字列を生成 username = "admin" exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\' password = service_client.get_random_password(ExcludeCharacters=exclude_characters, PasswordLength=16) basic_auth_credentials = { "username": username, "password": password['RandomPassword'] } # Secrets Managerに保存 service_client.put_secret_value( SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(basic_auth_credentials), VersionStages=['AWSPENDING'] ) logger.info(f"createSecret: Successfully put secret for ARN {arn} and version {token}.") def finish_secret(service_client, arn, token): """Finish the secret This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret. Args: service_client (client): The secrets manager service client arn (string): The secret ARN or other identifier token (string): The ClientRequestToken associated with the secret version Raises: ResourceNotFoundException: If the secret with the specified arn does not exist """ # First describe the secret to get the current version metadata = service_client.describe_secret(SecretId=arn) current_version = None for version in metadata["VersionIdsToStages"]: if "AWSCURRENT" in metadata["VersionIdsToStages"][version]: # The correct version is already marked as current, return if version == token: logger.info(f"finishSecret: Version {version} already marked as AWSCURRENT for {arn}.") return current_version = version break # Finalize by staging the secret version current service_client.update_secret_version_stage( SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version ) logger.info(f"finishSecret: Successfully set AWSCURRENT stage to version {token} for secret {arn}.")
CloudfrontにLambda@edgeの関連付け
最後に、上記で作成したLambda@edgeをCloudfrontに関連付けます。
data "aws_lambda_function" "basic_auth" { function_name = "basicAuth" } resource "aws_cloudfront_distribution" "asset" { // ... lambda_function_association { event_type = "viewer-request" lambda_arn = "${data.aws_lambda_function.basic_auth.arn}:${data.aws_lambda_function.basic_auth.version}" include_body = false } }
動作確認
CloudfrontにLambda@edgeを関連付けたサイトにアクセスしてBasic認証がかかっていることを確認します。
SecretsManagerのコンソールで手動でローテーションを実行します。
数分後くらいにアクセス(コールドスタートが古いSecretsを保持しているため)して、再度Basic認証がかかっています。 ローテーション後のパスワードで認証できることが確認できました!(スクショでは分かりづらいですね)
最後に
AWS SecretsManagerのローテーション機能を使ってBasic認証のパスワードを自動更新する方法をご紹介いたしました。Lambda@edgeでSecretsManagerから認証情報を取得しても、グローバル変数を適切に使えば、低コスト・低レイテンシーでBasic認証を行うことができます。 最近チームの名称がインフラチームからSREチームに変わったので、SREらしくトイルを削減できたのではないかと思います。
この記事を読んで興味を持って下さった方がいらっしゃれば、カジュアルにお話させていただきたいです!是非ご応募をお願いいたします!
Wantedly / Green