iimon TECH BLOG

iimonエンジニアが得られた経験や知識を共有して世の中をイイモンにしていくためのブログです

お手軽にBasic認証のパスワードを自動更新する方法

はじめに

はじめまして、SREチームのhogeです。

検証環境にBasic認証をかけて簡易的なアクセス制限をかけることがあるかと思います。

Basic認証とはいえ、以下のような理由からパスワードを定期的に更新することが望ましいです。

  • パスワードが長期間使い回されると、漏洩や不正アクセスのリスクがある
  • チームメンバーの変更(退職、プロジェクト移動など)があった場合、古いメンバーは開発環境にアクセスできないようにした方が良い

しかし、手動で更新するのも手間なため、今回はAWSのCloudfront + Lambda@Edge + SecretsManagerを使用してでBasic認証をかけつつ、パスワードを自動的に定期更新する仕組みを作ってみたいと思います。

構成

  1. クライアントがCloudfrontディストリビューションにアクセス
  2. CloudfrontビューアリクエストがLambda@edgeを呼び出す
  3. Lambda@edgeがSecretsManagerから認証情報を取得し、Basic認証を行う

SecretsManagerに保管されている認証情報はローテーション設定によりLambdaで月に一度更新されます。

実装

全体のコード

github.com

デプロイできる状態にはなっていますが、terraformのリソース名とLambda@edge上のSecretsManager IDは適宜書き換える必要があります。

providers.tf

terraform {
  required_version = "~>1.9.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~>5.76.0"
    }
  }
}

provider "aws" {
  region = "ap-northeast-1"
}

provider "aws" {
  alias  = "virginia"
  region = "us-east-1"
}

terraform {
  backend "local" {}
}

lambda@edge

  • Lambda@edgeはバージニア北部に作成する必要があります。
  • SecretsManagerの権限を付与しています。

main.tf

module "lambda_at_edge" {
  source = "terraform-aws-modules/lambda/aws"
  providers = {
    aws = aws.virginia
  }

  lambda_at_edge = true

  function_name = "basicAuth"
  handler       = "lambda_handler.lambda_handler"
  runtime       = "python3.12"
  source_path = "src/basic_auth_lambda_at_edge/lambda_handler.py"
  attach_policy_json = true
  policy_json = jsonencode(
    {
      Version = "2012-10-17"
      Statement = [
        {
          Effect   = "Allow"
          Action   = "secretsmanager:GetSecretValue"
          Resource = aws_secretsmanager_secret.basic_auth.arn
        }
      ]
    }
  )
}

lambda@edgeのコード

  • SecretsManagerを呼び出してBasic認証の処理を行います。
  • SDKがデフォルトで入っているPythonで実装しています(Node.jsはデフォルトで入っていない認識)

lambda_hander.py

import boto3
import base64
import json

cached_credentials = None

def initialize_secrets(secret_id):
    global cached_credentials
    if cached_credentials:
        return cached_credentials

    client = boto3.client("secretsmanager", region_name="ap-northeast-1")
    try:
        response = client.get_secret_value(SecretId=secret_id)
        if "SecretString" in response:
            cached_credentials = json.loads(response["SecretString"])
        else:
            cached_credentials = json.loads(base64.b64decode(response["SecretBinary"]))
        print("Secrets successfully loaded and cached")
        return cached_credentials
    except Exception as e:
        print(f"Error retrieving secrets: {e}")
        raise

def lambda_handler(event, context):
    # 必要に応じて書き換える
    secret_id = "your-secrets"
    credentials = initialize_secrets(secret_id)

    auth_user = credentials["username"]
    auth_pass = credentials["password"]

    # Authorizationヘッダーの確認
    request = event["Records"][0]["cf"]["request"]
    headers = request["headers"]

    auth_string = f"Basic {base64.b64encode(f'{auth_user}:{auth_pass}'.encode()).decode()}"

    if (
        "authorization" not in headers
        or headers["authorization"][0]["value"] != auth_string
    ):
        return {
            "status": "401",
            "statusDescription": "Unauthorized",
            "headers": {
                "www-authenticate": [{"key": "WWW-Authenticate", "value": "Basic"}],
            },
            "body": "Unauthorized",
        }

    return request

ポイントは以下の部分です。

SecretsManagerから取得した認証情報をグローバル変数(handler外)に保持することで、Lambdaのコールドスタート時にのみSecretsManagerに問い合わせるようにしています。

repost.aws

cached_credentials = None

def initialize_secrets(secret_id):
    global cached_credentials
    if cached_credentials:
        return cached_credentials

    client = boto3.client("secretsmanager", region_name="ap-northeast-1")
    try:
        response = client.get_secret_value(SecretId=secret_id)
        if "SecretString" in response:
            cached_credentials = json.loads(response["SecretString"])
        else:
            cached_credentials = json.loads(base64.b64decode(response["SecretBinary"]))
        print("Secrets successfully loaded and cached")
        return cached_credentials
    except Exception as e:
        print(f"Error retrieving secrets: {e}")
        raise

10,000 回の API コールあたり 10,000 件の API コールあたりUSD 0.05。

料金 - AWS Secrets Manager | AWS

SecretsManagerのAPI呼び出しは上記のように課金されます。検証環境で使うぶんにはそこまでコストはかからなそうですが、(要件によると思うが)大規模な負荷テストを行う際にはコストが膨らむ可能性があります。 そのため、Lambdaのグローバル変数にキャッシュして、SecretsManagerへのアクセス回数を減らしています。

SecretsManager

  • 30日に一度Lambda関数でローテーションするように設定します。
  • ローテーション用のLambda関数をSecretsManagerから呼び出せるように権限を設定し、SecretsManagerのアクションをiamに付与します。

main.tf

resource "aws_secretsmanager_secret" "basic_auth" {
  name = "/dev/basic-auth"
}


resource "aws_secretsmanager_secret_rotation" "basic_auth_rotation" {
  secret_id           = aws_secretsmanager_secret.basic_auth.id
  rotation_lambda_arn = module.secrets_manager_rotation.lambda_function_arn

  rotation_rules {
    automatically_after_days = 30
  }
}

module "secrets_manager_rotation" {
  source = "terraform-aws-modules/lambda/aws"
  function_name = "basic_auth_rotation"
  runtime       = "python3.12"
  handler       = "lambda_handler.lambda_handler"
  source_path = "src/basic_auth_rotation/lambda_handler.py"
  allowed_triggers = {
    SecretsManager ={
      "service": "secretsmanager"
      "source_arn": aws_secretsmanager_secret.basic_auth.arn
    }
  }
  create_current_version_allowed_triggers = false
  attach_policy_json = true
  policy_json = jsonencode(
    {
      Version = "2012-10-17"
      Statement = [
        {
          Effect   = "Allow"
          Action   = [
            "secretsmanager:DescribeSecret",
            "secretsmanager:GetSecretValue",
            "secretsmanager:PutSecretValue",
            "secretsmanager:UpdateSecretVersionStage"
          ]
          Resource = aws_secretsmanager_secret.basic_auth.arn
        },
        {
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword"
            ],
            "Resource": "*"
        },
      ]
    }
  )

SecretsManagerローテーション用のLambda関数のコード

  • ランダムな文字列を生成してパスワードを更新しています。
  • 例えば共有のためにスプシの更新や通知などがあればこのLambda内に記述します
  • 以下のリポジトリのサンプルコードを参考にして作成しました

github.com

lambda_handler.py

import boto3
import logging
import os
import json

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """Secrets Manager Rotation Template

    This is a template for creating an AWS Secrets Manager rotation lambda

    Args:
        event (dict): Lambda dictionary of event parameters. These keys must include the following:
            - SecretId: The secret ARN or identifier
            - ClientRequestToken: The ClientRequestToken of the secret version
            - Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)

        context (LambdaContext): The Lambda runtime information

    Raises:
        ResourceNotFoundException: If the secret with the specified arn and stage does not exist

        ValueError: If the secret is not properly configured for rotation

        KeyError: If the event parameters do not contain the expected keys

    """
    arn = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']
    logger.info(f"Received Step: {step}")

    service_client = boto3.client('secretsmanager', endpoint_url=os.getenv('SECRETS_MANAGER_ENDPOINT', None))

    metadata = service_client.describe_secret(SecretId=arn)
    if not metadata['RotationEnabled']:
        logger.error(f"Secret {arn} is not enabled for rotation.")
        raise ValueError(f"Secret {arn} is not enabled for rotation.")

    versions = metadata['VersionIdsToStages']
    if token not in versions:
        logger.error(f"Secret version {token} has no stage for rotation of secret {arn}.")
        raise ValueError(f"Secret version {token} has no stage for rotation of secret {arn}.")
    if "AWSCURRENT" in versions[token]:
        logger.info(f"Secret version {token} already set as AWSCURRENT for secret {arn}.")
        return
    elif "AWSPENDING" not in versions[token]:
        logger.error(f"Secret version {token} not set as AWSPENDING for rotation of secret {arn}.")
        raise ValueError(f"Secret version {token} not set as AWSPENDING for rotation of secret {arn}.")

    if step == "createSecret":
        create_secret(service_client, arn, token)
    elif step == "setSecret":
        pass
    elif step == "testSecret":
        pass
    elif step == "finishSecret":
        finish_secret(service_client, arn, token)
    else:
        raise ValueError("Invalid step parameter")


def create_secret(service_client, arn, token):
    """ランダムなBasic認証用文字列を生成してSecrets Managerに保存"""
    try:
        service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
        logger.info(f"createSecret: Successfully retrieved secret for {arn}.")
    except service_client.exceptions.ResourceNotFoundException:
        # ランダムなBasic認証用文字列を生成
        username = "admin"
        exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
        password = service_client.get_random_password(ExcludeCharacters=exclude_characters, PasswordLength=16)


        basic_auth_credentials = {
            "username": username,
            "password": password['RandomPassword']
        }

        # Secrets Managerに保存
        service_client.put_secret_value(
            SecretId=arn,
            ClientRequestToken=token,
            SecretString=json.dumps(basic_auth_credentials),
            VersionStages=['AWSPENDING']
        )

        logger.info(f"createSecret: Successfully put secret for ARN {arn} and version {token}.")


def finish_secret(service_client, arn, token):
    """Finish the secret

    This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret.

    Args:
        service_client (client): The secrets manager service client

        arn (string): The secret ARN or other identifier

        token (string): The ClientRequestToken associated with the secret version

    Raises:
        ResourceNotFoundException: If the secret with the specified arn does not exist

    """
    # First describe the secret to get the current version
    metadata = service_client.describe_secret(SecretId=arn)
    current_version = None
    for version in metadata["VersionIdsToStages"]:
        if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
            # The correct version is already marked as current, return
            if version == token:
                logger.info(f"finishSecret: Version {version} already marked as AWSCURRENT for {arn}.")
                return
            current_version = version
            break

    # Finalize by staging the secret version current
    service_client.update_secret_version_stage(
        SecretId=arn,
        VersionStage="AWSCURRENT",
        MoveToVersionId=token,
        RemoveFromVersionId=current_version
    )
    logger.info(f"finishSecret: Successfully set AWSCURRENT stage to version {token} for secret {arn}.")

CloudfrontにLambda@edgeの関連付け

最後に、上記で作成したLambda@edgeをCloudfrontに関連付けます。

data "aws_lambda_function" "basic_auth" {
  function_name = "basicAuth"
}

resource "aws_cloudfront_distribution" "asset" {
  // ...
    lambda_function_association {
      event_type   = "viewer-request"
      lambda_arn   = "${data.aws_lambda_function.basic_auth.arn}:${data.aws_lambda_function.basic_auth.version}"
      include_body = false
    }
  }

動作確認

CloudfrontにLambda@edgeを関連付けたサイトにアクセスしてBasic認証がかかっていることを確認します。

SecretsManagerのコンソールで手動でローテーションを実行します。

数分後くらいにアクセス(コールドスタートが古いSecretsを保持しているため)して、再度Basic認証がかかっています。 ローテーション後のパスワードで認証できることが確認できました!(スクショでは分かりづらいですね)

最後に

AWS SecretsManagerのローテーション機能を使ってBasic認証のパスワードを自動更新する方法をご紹介いたしました。Lambda@edgeでSecretsManagerから認証情報を取得しても、グローバル変数を適切に使えば、低コスト・低レイテンシーBasic認証を行うことができます。 最近チームの名称がインフラチームからSREチームに変わったので、SREらしくトイルを削減できたのではないかと思います。

この記事を読んで興味を持って下さった方がいらっしゃれば、カジュアルにお話させていただきたいです!是非ご応募をお願いいたします!

Wantedly / Green