iimon TECH BLOG

iimonエンジニアが得られた経験や知識を共有して世の中をイイモンにしていくためのブログです

OpenSearch(ElasticSearch) + Lambdaで物件検索APIを作る

はじめに

こんにちはiimonでエンジニアをしているhogeです。

最近ElasticSearchを勉強する機会がありました。アウトプットのためにElasticSearchに関する基礎的な知識を学びながら、OpenSearchとLambdaで物件検索APIを作ろうと思います。

AWSリソースは検証用のために簡易的に作ったため、本番環境では推奨されない設定が含まれますが、ご了承ください。

ElasticSearchとは

ElasticSearchはElastic社が開発するApache Luceneを基盤とした検索エンジンです。ElasticSearchは以下の特徴を持ちます。

  • 高速な全文検索や分析機能を持つ
    • 全てのデータをインデックス化して格納する。インデックス化された各フィールドには専用の最適化されたデータ構造がある。例えばテキストデータは転置インデックスに格納され、数値や地理フィールドBKDツリーに格納される。このようにフィールドごとにインデックスにデータを格納し、データ構造を使用して検索結果を組み立てるため、ElasticSearchの検索は非常に高速になる。
  • クラスター構成により複数のノードで分散して動作し、検索性能をスケールさせることができる。
  • REST APIが提供されており、開発者フレンドリー

ユースケース

  • Webアプリケーションの検索機能
  • ログやイベントデータの保存分析
  • ベクターデータベース
  • RAG
  • 機械学習

OpenSearchとは

OpenSearchはElasticSearchのフォーク版としてAWSが立ち上げたプロジェクトです。

背景をざっくり説明すると、Elastic社のライセンスが変更され、商業利用に関する制限が強化されました。このライセンス変更により、AWSはElasticSearchをサービスとして提供できなくなるため、最終のOSSバージョンES7.10をフォークし、オープンソースとしてOpenSearchを立ち上げ、OpenSearch Serviceとして提供しました。

本記事で扱う範囲ではOpenSearchとElasticSearchで機能の違いはなさそうです。

https://aws.amazon.com/jp/what-is/opensearch/

システム構成

Lambda function UrlsでAPIを作成し、LambdaからOpenSearchに対して検索を行ないます。

APIの仕様

以下のデータを検索対象とします。物件の属性値(物件名、住所、家賃など)で絞り込みを行い、結果をJSONで返却します。

name(物件名) address(住所) nearest_station(最寄り駅) rent(家賃) has_parking(駐車場の有無)
testマンション 大阪府堺市堺区賑町2 堺東 80000 0
パークコートtestザ・タワー 東京都港区南青山6-10-1 南青山 150000 1
testヒルズレジデンス 東京都港区六本木6-12-2 六本木 240000 1

エンドポイント

GET /properties/search

リクエストパラメータ

パラメータ名 必須 説明
name string Optional 物件名に対して全文検索
address string Optional 住所に対して全文検索
nearest_station string Optional 最寄り駅の完全一致検索。
min_rent number Optional 最小家賃のフィルタリング。
max_rent number Optional 最大家賃のフィルタリング。
has_parking boolean Optional 駐車場の有無 (true or false) でフィルタリング。

リクエスト例

GET /properties/search?name=Skyline&address=test&nearest_station=新宿&min_rent=100000&max_rent=300000&has_parking=true

レスポンス例

{
  "results": [
    {
      "name": "testマンション",
      "address": "大阪府堺市堺区賑町2",
      "nearest_station": "堺東",
      "rent": 800000,
      "has_parking": false
    },
    {
      "name": "パークコートtestザ・タワー",
      "address": "東京都港区南青山6-10-1",
      "nearest_station": "南青山",
      "rent": 1500000,
      "has_parking": true
    }
  ]
}

インデックス設計

物件名と住所は全文検索をしたいため、text型を使用します。Indexはフィールドに対して複数作成することができます。住所は将来的に完全一致もサポートする可能性があることを考慮してkeyword型も用意しておきます。

最寄り駅は完全一致検索を使用するため、keyword型を使用します。

フィールド名 備考
name text bi-gramで転置インデックスを作成
address text,keyword bi-gramで転置インデックスを作成。将来的に完全一致もサポートする可能性がるため、keyword型も用意する
nearest_station keyword
rent number
has_parking bool

転置インデックストークナイズの選択肢として、形態素解析n-gramがあります。

形態素解析

形態素解析でTokenを分割するとこのようになります。

| 明日 | は | クリスマス |

N-gram

  • 検索対象のテキストをN文字単位の文字列片に分解する
  • 分解した文字列片を見出し語として転置インデックスを作成する
  • 検索語をN文字単位の文字列片に分け検索を行う
  • 文字列の出現位置情報を利用すれば、漏れのない完全一致の検索が可能

bi-gramでTokenを分割するとこのようになります。

明日 | 日は | はク | クリ | リス | スマ | マス

物件名と住所に固有名詞や珍しい住所が含まれるため、形態素解析でTokenizeすると検索に引っかからなくなるケースがありそうなため、ngramを採用します。

プロジェクト構成

terraformで構築します。後述のコードでデプロイすれば同じ環境を作ることができると思います。

├── base.tf
├── main.tf
├── src
│   └── property-search-api
│       ├── lambda_handler.py
│       ├── layer
│       └── requirements.txt
├── terraform.tfstate
└── terraform.tfvars

インフラ構築

base.tf

ここにprovider情報を記載します。

terraform {
  required_version = ">= 1.9.5"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~>5.65"
    }
    opensearch = {
      source  = "opensearch-project/opensearch"
      version = "2.3.0"
    }
  }
}

terraform {
  backend "local" {
  }
}

provider "aws" {
  region = "ap-northeast-1"
}

variable "opensearch_url" {}
variable "opensearch_username" {}
variable "opensearch_password" {}

provider "opensearch" {
  url      = var.opensearch_url
  username = var.opensearch_username
  password = var.opensearch_password
}

main.tf

opensearch周り

  • 検証用のためネットワークアクセスがVPC内ではなく、public設定になっていたり、セキュリティ設定を甘くしています
  • きめ細やかなアクセス制御を設定(これを使用することで、インデックスレベルでのリソース・アクションのアクセス制御も可能になる)
############################################
# OpenSearch関連
############################################

resource "aws_opensearch_domain" "test" {
  access_policies = jsonencode(
    {
      Statement = [
        {
          Action = "es:ESHttp*"
          Effect = "Allow"
          Principal = {
            AWS = "*"
          }
          Resource = "arn:aws:es:ap-northeast-1:xxxxxxxxx:domain/property-search-test/*"
        },
      ]
      Version = "2012-10-17"
    }
  )
  advanced_options = {
    "indices.fielddata.cache.size"        = "20"
    "indices.query.bool.max_clause_count" = "1024"
  }

  domain_name     = "property-search-test"
  engine_version  = "OpenSearch_2.13"
  ip_address_type = "dualstack"

  domain_endpoint_options {
    enforce_https = true
  }

  encrypt_at_rest {
    enabled = true
  }

  cluster_config {
    instance_type                 = "t3.small.search"
    multi_az_with_standby_enabled = false
    zone_awareness_enabled        = false
  }
  node_to_node_encryption {
    enabled = true
  }
  advanced_security_options {
    enabled                        = true
    anonymous_auth_enabled         = false
    internal_user_database_enabled = true

    master_user_options {
      master_user_name     = var.opensearch_username
      master_user_password = var.opensearch_password
    }
  }
  ebs_options {
    ebs_enabled = true
    volume_size = 10
  }

  snapshot_options {
    automated_snapshot_start_hour = 0
  }
}

lambda関連

  • pythonでlambda関数を作成
  • lambda layerの作成
  • lambda function urlsを設定
############################################
# Lambda関連
############################################
resource "null_resource" "create-property-search-api" {
    triggers = {
      always_run = "${timestamp()}"
    }
  provisioner "local-exec" {
    command = <<-EOF
      rm -rf src/property-search-api/layer &&
      pip3 install -r ./src/property-search-api/requirements.txt -t ./src/property-search-api/layer/python --no-cache-dir
    EOF
  }
}

data "archive_file" "property-search-api-lambda" {
  type        = "zip"
  source_file = "./src/property-search-api/lambda_handler.py"
  output_path = "./archive/propery-search-api.zip"
}

data "archive_file" "property-search-api-lambda-layer" {
  type             = "zip"
  source_dir       = "./src/property-search-api/layer"
  output_path      = "./archive/property-search-api-layer.zip"
  output_file_mode = "0644"
  depends_on = [
    null_resource.create-property-search-api
  ]
}

resource "aws_lambda_layer_version" "property-search-api-lambda-layer" {
  layer_name          = "property-search-api-lambda-layer"
  filename            = data.archive_file.property-search-api-lambda-layer.output_path
  compatible_runtimes = ["python3.11"]
  source_code_hash    = data.archive_file.property-search-api-lambda-layer.output_base64sha256
  depends_on = [
    data.archive_file.property-search-api-lambda-layer
  ]
}

module "property-search-api" {
  source                     = "terraform-aws-modules/lambda/aws"
  version                    = "7.5.0"
  timeout                    = 30
  memory_size                = 128
  function_name              = "property-search-api"
  handler                    = "lambda_handler.lambda_handler"
  architectures              = ["x86_64"]
  runtime                    = "python3.11"
  create_package             = false
  local_existing_package     = data.archive_file.property-search-api-lambda.output_path
  ignore_source_code_hash    = false
  create_lambda_function_url = true
  layers                     = [aws_lambda_layer_version.property-search-api-lambda-layer.arn]
  environment_variables = {
    OPENSEARCH_HOST       = aws_opensearch_domain.test.endpoint
    OPENSEARCH_PORT       = 443
    OPENSEARCH_INDEX_NAME = "properties"
  }
}
  • きめ細やかなアクセス制御を設定しているため、OpenSearch内部でアクセス制御を設定
  • backend_roleにLambdaにアタッチされているロールを設定
  • opensearchドメインが作成されるまで一旦コメントアウトしておきます。
#resource "opensearch_roles_mapping" "mapper" {
#  role_name = "all_access"
#  users     = ["admin", "adsmin"]
#  backend_roles = [
#    module.property-search-api.lambda_role_arn,
#  ]
#}

APIのコード

requirements.txtに以下のライブラリを記載しておきます。

aws-lambda-powertools
requests_aws4auth
opensearch-py

全体

from aws_lambda_powertools.event_handler import LambdaFunctionUrlResolver
from aws_lambda_powertools import Logger
from typing import List,Dict
from aws_lambda_powertools.utilities.typing import LambdaContext
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import os
import boto3
import traceback
import json


logger = Logger()
app = LambdaFunctionUrlResolver()

host = os.environ["OPENSEARCH_HOST"]
port = os.environ["OPENSEARCH_PORT"]
region = os.environ["AWS_REGION"]
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
index_name = os.environ["OPENSEARCH_INDEX_NAME"]

def buildQuery(search_condition:Dict):
    # デフォルトのクエリ
    query = {
      "from" : 0,
      "size": 50,
      "query" : {
        "bool" : { 
          "must" : []
        }
      }
    }
    if search_condition:
        for key in search_condition.keys():
            # min_rent/max_rentは範囲検索
            if key == "min_rent" or key == "max_rent":
                range_query = {}
                if "min_rent" in search_condition:
                    range_query["gte"] = int(search_condition["min_rent"])
                if "max_rent" in search_condition:
                    range_query["lte"] = int(search_condition["max_rent"])
                
                query["query"]["bool"]["must"].append(
                    {
                        "range": {
                            "rent": range_query
                        }
                    }
                )
            # has_parkingはbool検索
            elif key == "has_parking":
                query["query"]["bool"]["must"].append(
                    {
                        "term" : {
                            "has_parking" : search_condition["has_parking"]
                        }
                    }
                )
            # その他はmatch検索、ただし1文字の場合はワイルドカード検索
            else:
                search_value = search_condition[key]
                if len(search_value) == 1:
                    query["query"]["bool"]["must"].append(
                        {
                            "wildcard": {
                                key: f"*{search_value}*"
                            }
                        }
                    )
                else:
                    query["query"]["bool"]["must"].append(
                        {
                            "match": {
                                key: {
                                    "query": search_value,
                                    "operator": "AND"
                                }
                            }
                        }
                    )
    return query

def execute_query(query):
    try:
        client = OpenSearch(
            hosts=[{'host': host, 'port': port}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )
        res = client.search(index=index_name, body=query)
        return res
    except Exception as e:
        print(traceback.format_exc())
        raise e
    
def format_response(resp):
    hits = resp.get("hits", {}).get("hits", [])
    formatted_results = []
    for hit in hits:
        source = hit["_source"]
        formatted_results.append({
            "name": source.get("name"),
            "address": source.get("address"),
            "nearest_station": source.get("nearest_station"),
            "rent": source.get("rent"),
            "has_parking": source.get("has_parking")
        })
    return {"results": formatted_results}

@app.get("/properties/search")
def search_properties():
    query_params = ["name", "address", "nearest_station", "min_rent", "max_rent", "has_parking"]
    search_condition =  {key: app.current_event.get_query_string_value(name=key, default_value=None) for key in query_params if app.current_event.get_query_string_value(name=key, default_value=None) is not None}
    query = buildQuery(search_condition)
    resp = execute_query(query)
    formatted_resp = format_response(resp)
    return json.dumps(formatted_resp, ensure_ascii=False)

@logger.inject_lambda_context
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

個人的にLambda Powertoolsが好きなので、Lambda Function UrlsとLambda powertoolsでAPIを作成しています。

全体の流れとしては、クエリパラメータから検索条件を受け取り、OpenSearchにリクエストするクエリを生成して実行して、整形してレスポンスを返却しています。

クエリ作成部分について簡単に解説します。

検索条件が存在する場合は、must句(AND)に追加しています。

def buildQuery(search_condition:Dict):
    # デフォルトのクエリ
    query = {
      "from" : 0,
      "size": 50,
      "query" : {
        "bool" : { 
          "must" : []
        }
      }
    }
    if search_condition:
        for key in search_condition.keys():
            # min_rent/max_rentは範囲検索
            if key == "min_rent" or key == "max_rent":
                range_query = {}
                if "min_rent" in search_condition:
                    range_query["gte"] = int(search_condition["min_rent"])
                if "max_rent" in search_condition:
                    range_query["lte"] = int(search_condition["max_rent"])
                
                query["query"]["bool"]["must"].append(
                    {
                        "range": {
                            "rent": range_query
                        }
                    }
                )
            # has_parkingはbool検索
            elif key == "has_parking":
                query["query"]["bool"]["must"].append(
                    {
                        "term" : {
                            "has_parking" : search_condition["has_parking"]
                        }
                    }
                

bi-gramでTokenizeしているため、例えば”堺”のように1文字で検索しようとすると、Hitしなくなってしまいます。

// "堺"でマッチクエリをした場合、以下のドキュメントはHitしない
大阪府堺市堺区賑町2

そのため、1文字の場合はワイルドカードクエリで検索するようにしました。MySQLのようにドキュメントをフルスキャンにはならないため、検索パフォーマンスは悪くないはずです。

            # その他はmatch検索、ただし1文字の場合はワイルドカード検索
            else:
                search_value = search_condition[key]
                if len(search_value) == 1:
                    query["query"]["bool"]["must"].append(
                        {
                            "wildcard": {
                                key: f"*{search_value}*"
                            }
                        }
                    )
                else:
                    query["query"]["bool"]["must"].append(
                        {
                            "match": {
                                key: {
                                    "query": search_value,
                                    "operator": "AND"
                                }
                            }
                        }
                    )
 

デプロイ

terraform.tfvarsに以下の変数を設定する。opensearch_urlはまだ分からないので入れない

opensearch_url="まだ入れない"
opensearch_username="ユーザ名"
opensearch_password="パスワード"

適用する。適用が完了したらaws関連のリソースが作成される。

terraform apply

opensearch内部の設定も行なうため、awsマネコン等でopensearchドメインを確認し、terraform.tfvarsにopensearch_urlを設定する。

opensearch_url="https://search-....."

mapping設定はコメントインする

resource "opensearch_roles_mapping" "mapper" {
  role_name = "all_access"
  users     = ["admin", "adsmin"]
  backend_roles = [
    module.property-search-api.lambda_role_arn,
  ]
}

適用し、opensearch関連のリソースも作成する

terraform apply

インデックス作成

インデックスの設計に基づき、以下のリクエストでインデックスを作成します。

curl -X PUT -u "ユーザ名:パスワード" "https://opensearchのドメイン/properties" \
-H "Content-Type: application/json" \
-d '{
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "ngram_analyzer"
      },
      "address": {
        "type": "text",
        "analyzer": "ngram_analyzer",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "nearest_station": {
        "type": "keyword"
      },
      "rent": {
        "type": "integer"
      },
      "has_parking": {
        "type": "boolean"
      }
    }
  },
  "settings": {
    "analysis": {
      "analyzer": {
        "ngram_analyzer": {
          "type": "custom",
          "tokenizer": "ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ngram_tokenizer": {
          "type": "ngram",
          "min_gram": 2,
          "max_gram": 2
        }
      }
    }
  }
}'

データの投入

bulk apiでデータを投入します。

curl -X POST -u "ユーザ名:パスワード" "https://opensearchのドメイン/properties/_bulk" \
-H "Content-Type: application/json" \
-d '
{ "index": { "_index": "properties" } }
{ "name": "testマンション", "address": "大阪府堺市堺区賑町2", "nearest_station": "堺東", "rent": 80000, "has_parking": false }
{ "index": { "_index": "properties" } }
{ "name": "パークコートtestザ・タワー", "address": "東京都港区南青山6-10-1", "nearest_station": "南青山", "rent": 150000, "has_parking": true }
{ "index": { "_index": "properties" } }
{ "name": "testヒルズレジデンス", "address": "東京都港区六本木6-12-2", "nearest_station": "六本木", "rent": 240000, "has_parking": true }
{ "index": { "_index": "properties" } }
{ "name": "testマンション2", "address": "大阪府堺市堺区賑町2", "nearest_station": "堺東", "rent": 100000, "has_parking": false }
{ "index": { "_index": "properties" } }
{ "name": "パークコートtestザ・タワー2", "address": "東京都港区南青山6-10-1", "nearest_station": "南青山", "rent": 180000, "has_parking": true }
{ "index": { "_index": "properties" } }
{ "name": "testヒルズレジデンス2", "address": "東京都港区六本木6-12-2", "nearest_station": "六本木", "rent": 260000, "has_parking": true }
'

登録されているか確認

curl -X GET -u "ユーザ名:パスワード" "https://opensearchのドメイン/properties/_search?pretty" \
-H "Content-Type: application/json" \
-d '{
  "query": {
    "match_all": {}
  }
}'

以下のようなレスポンスが返ってくるはず

{
  "took" : 20,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" :6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "properties",
        "_id" : "bZy3tZEBcMwSyS6Kuphr",
        "_score" : 1.0,
        "_source" : {
          "name" : "testマンション",
          "address" : "大阪府堺市堺区賑町2",
          "nearest_station" : "堺東",
          "rent" : 80000,
          "has_parking" : false
        }
      },
      {
        "_index" : "properties",
        "_id" : "b5y3tZEBcMwSyS6Kuphv",
        "_score" : 1.0,
        "_source" : {
          "name" : "testヒルズレジデンス",
          "address" : "東京都港区六本木6-12-2",
          "nearest_station" : "六本木",
          "rent" : 240000,
          "has_parking" : true
        }
      },
      {
        "_index" : "properties",
        "_id" : "bpy3tZEBcMwSyS6Kuphv",
        "_score" : 1.0,
        "_source" : {
          "name" : "パークコートtestザ・タワー",
          "address" : "東京都港区南青山6-10-1",
          "nearest_station" : "南青山",
          "rent" : 150000,
          "has_parking" : true
        }
      }
    ]
  }
}

動作確認

全文検索、範囲検索、bool検索を含めたリクエストで期待通りのドキュメントが取得できていることを確認できました。

“堺”のような1文字での全文検索も成功することを確認できました。

参考

OpenSearch とは? - オープンソース検索エンジンの説明 - AWS

チュートリアル: Amazon OpenSearch Service を用いて検索アプリケーションを作成する - Amazon OpenSearch サービス

REST APIs | Elasticsearch Guide [8.15] | Elastic

最後に

今回はAWSOpenSearch ServiceとLambdaを使ってシンプルな物件検索APIを作ってみました。

データベースにOpenSearchを使うことで、柔軟な全文検索に対応でき、データ数が増えた際にも簡単にスケールさせることができます。

最後まで読んでくださりありがとうございます。

弊社ではエンジニアを募集しております。

ぜひカジュアル面談でお話ししましょう!

ご興味ありましたら、ご応募ください!

Wantedly / Green