OpenSearch 完整使用指南
OpenSearch 是 Amazon 开发的企业级搜索和分析套件,是 Elasticsearch 和 Kibana 的开源分支。本指南将详细介绍 OpenSearch 的安装、配置、使用和优化方法。
OpenSearch 简介
什么是 OpenSearch
OpenSearch 是一个软件系列,由两个主要组件组成:
- OpenSearch 搜索引擎:基于 Apache Lucene 构建的分布式搜索和分析引擎
- OpenSearch Dashboards:数据可视化仪表板,用于搜索、分析和可视化数据
核心特性
- 全文搜索:支持复杂查询和文本分析
- 分布式架构:水平扩展,支持大数据量
- 实时分析:近实时数据索引和分析
- 向量搜索:支持 KNN(K 最近邻)搜索
- RESTful API:基于 HTTP 的 API 接口
- 多语言支持:支持多种编程语言的客户端
应用场景
- 日志分析:ELK/ELK Stack 替代方案
- 全文搜索:电商、文档搜索
- 实时监控:系统指标监控和分析
- 安全分析:SIEM(安全信息与事件管理)
- 向量搜索:AI 应用、推荐系统
Docker 安装
快速启动
# 拉取 OpenSearch 镜像
docker pull opensearchproject/opensearch:1.2.4
# 运行单节点集群
docker run -d \
--name opensearch \
-p 9200:9200 \
-p 9600:9600 \
-e "discovery.type=single-node" \
-e "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" \
opensearchproject/opensearch:1.2.4
# 验证运行
curl -XGET --insecure -u 'admin:admin' 'https://localhost:9200'
生产环境配置
# 启动生产级集群
docker run -d \
--name opensearch \
-p 9200:9200 \
-p 9600:9600 \
-e "cluster.name=opensearch-cluster" \
-e "node.name=opensearch-node" \
-e "discovery.type=single-node" \
-e "bootstrap.memory_lock=true" \
-e "OPENSEARCH_JAVA_OPTS=-Xms2g -Xmx2g" \
-e "path.logs=/var/log/opensearch" \
-e "path.data=/var/lib/opensearch" \
-e "http.cors.enabled=true" \
-e "http.cors.allow-origin=*" \
-e "http.cors.allow-headers=*" \
-e "http.cors.allow-credentials=true" \
--ulimit nofile=65536:65536 \
--ulimit memlock=-1 \
--security-opt seccomp=unconfined \
opensearchproject/opensearch:1.2.4
使用 Docker Compose
# docker-compose.yml
version: '3'
services:
opensearch:
image: opensearchproject/opensearch:1.2.4
container_name: opensearch
environment:
- cluster.name=opensearch-cluster
- node.name=opensearch-node
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms2g -Xmx2g"
- path.logs=/var/log/opensearch
- path.data=/var/lib/opensearch
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- opensearch-data:/var/lib/opensearch
- opensearch-logs:/var/log/opensearch
ports:
- 9200:9200
- 9600:9600
networks:
- opensearch-net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:1.2.0
container_name: opensearch-dashboards
ports:
- 5601:5601
environment:
- OPENSEARCH_HOSTS=https://opensearch:9200
- OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=admin
- OPENSEARCH_SECURITY_SSL_CERTIFICATEKEY=<path-to-your-pem-file>
depends_on:
- opensearch
networks:
- opensearch-net
volumes:
opensearch-data:
opensearch-logs:
networks:
opensearch-net:
driver: bridge
启动服务:
docker-compose up -d
基本操作
创建索引
# 方式 1:通过 curl
curl -XPUT --insecure -u 'admin:admin' \
'https://localhost:9200/my-first-index'
# 方式 2:指定设置
curl -XPUT --insecure -u 'admin:admin' \
'https://localhost:9200/my-index' \
-H 'Content-Type: application/json' \
-d '{
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}
}'
添加文档
# 插入文档(自动生成 ID)
curl -XPOST --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/_doc' \
-H 'Content-Type: application/json' \
-d '{
"title": "First Document",
"content": "This is the content of the document.",
"timestamp": "2022-03-03T18:03:03Z"
}'
# 插入文档(指定 ID)
curl -XPUT --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/_doc/1' \
-H 'Content-Type: application/json' \
-d '{
"title": "Document with ID 1",
"content": "This document has a specific ID.",
"category": "example"
}'
检索文档
# 根 据 ID 检索
curl -XGET --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/_doc/1'
# 搜索所有文档
curl -XGET --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/_search'
# 条件搜索
curl -XGET --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/_search?q=title:First'
删除操作
# 删除文档
curl -XDELETE --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/_doc/1'
# 删除索引
curl -XDELETE --insecure -u 'admin:admin' \
'https://localhost:9200/my-index/'
Python API 使用
基础连接
from opensearchpy import OpenSearch
# 配置连接
host = 'localhost'
port = 9200
auth = ('admin', 'admin')
# 创建客户端
client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True,
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False
)
# 测试连接
print(client.info())
创建索引(Python)
# 创建索引
index_name = 'products'
index_body = {
'settings': {
'index': {
'number_of_shards': 3,
'number_of_replicas': 1,
'knn': True,
'knn.algo_param.ef_search': 100
}
},
'mappings': {
'properties': {
'title': {
'type': 'text',
'analyzer': 'standard'
},
'description': {
'type': 'text',
'analyzer': 'standard'
},
'price': {
'type': 'float'
},
'category': {
'type': 'keyword'
},
'tags': {
'type': 'keyword'
},
'created_at': {
'type': 'date'
}
}
}
}
response = client.indices.create(index_name, body=index_body)
print('\nCreating index:')
print(response)
批量添加数据
# 准备数据
products = [
{'_id': 1, 'title': 'iPhone 13', 'description': 'Latest iPhone model',
'price': 999.99, 'category': 'smartphone', 'tags': ['apple', 'mobile']},
{'_id': 2, 'title': 'Samsung Galaxy', 'description': 'Android smartphone',
'price': 899.99, 'category': 'smartphone', 'tags': ['samsung', 'mobile']},
{'_id': 3, 'title': 'MacBook Pro', 'description': 'Apple laptop',
'price': 1999.99, 'category': 'laptop', 'tags': ['apple', 'computer']},
]
# 批量索引
for product in products:
response = client.index(
index=index_name,
body=product,
id=product['_id'],
refresh=True
)
print('\nAdding document:')
print(response)
搜索查询
# 1. 全文搜索
query = {
'query': {
'match': {
'title': 'iPhone'
}
}
}
response = client.search(
body=query,
index=index_name
)
print('\nSearch results:')
for hit in response['hits']['hits']:
print(hit['_source'])
# 2. 多字段搜索
query = {
'query': {
'multi_match': {
'query': 'iPhone Apple',
'fields': ['title', 'description']
}
}
}
# 3. 布尔查询
query = {
'query': {
'bool': {
'must': [
{'match': {'title': 'iPhone'}}
],
'filter': [
{'range': {'price': {'gte': 500}}}
]
}
}
}
# 4. 聚合查询
query = {
'aggs': {
'categories': {
'terms': {'field': 'category'}
},
'avg_price': {
'avg': {'field': 'price'}
}
}
}
高级搜索示例
# 复杂的电商搜索
def search_products(query_text, filters=None, sort=None, size=10):
query_body = {
'query': {
'bool': {
'must': []
}
},
'size': size,
'sort': []
}
# 添加全文搜索
if query_text:
query_body['query']['bool']['must'].append({
'multi_match': {
'query': query_text,
'fields': ['title^3', 'description', 'tags'],
'type': 'best_fields',
'fuzziness': 'AUTO'
}
})
# 添加过滤条件
if filters:
query_body['query']['bool']['filter'] = []
for field, value in filters.items():
if isinstance(value, list):
query_body['query']['bool']['filter'].append({
'terms': {field: value}
})
else:
query_body['query']['bool']['filter'].append({
'term': {field: value}
})
# 添加排序
if sort:
query_body['sort'] = [sort]
# 执行搜索
response = client.search(
body=query_body,
index=index_name
)
return response
# 使用示例
results = search_products(
query_text='iPhone',
filters={'category': 'smartphone'},
sort=[{'price': {'order': 'desc'}}]
)
print(f"Found {results['hits']['total']['value']} results")
向量搜索(KNN)
创建向量索引
from sentence_transformers import SentenceTransformer
# 创建向量索引
index_name = 'vector_search_index'
index_body = {
'settings': {
'index': {
'knn': True,
'knn.algo_param.ef_search': 100
}
},
'mappings': {
'properties': {
'text': {
'type': 'text'
},
'text_vector': {
'type': 'knn_vector',
'dimension': 384,
'method': {
'name': 'hnsw',
'space_type': 'l2',
'engine': 'nmslib',
'parameters': {
'ef_construction': 128,
'm': 24
}
}
}
}
}
}
client.indices.create(index_name, body=index_body)
向量数据索引
# 使用 Sentence Transformers 生成向量
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
# 文档数据
documents = [
{'_id': 1, 'text': 'OpenSearch is a powerful search engine'},
{'_id': 2, 'text': 'Elasticsearch is also a search platform'},
{'_id': 3, 'text': 'Machine learning is fascinating'},
{'_id': 4, 'text': 'Natural language processing helps in text analysis'},
{'_id': 5, 'text': 'Vector search enables semantic understanding'},
]
# 为每个文档生成向量并索引
for doc in documents:
# 生成向量
vector = model.encode(doc['text']).tolist()
# 添加到索引
client.index(
index=index_name,
body={
'text': doc['text'],
'text_vector': vector
},
id=doc['_id'],
refresh=True
)
print("Vector indexing completed!")
向量搜索查询
# 向量搜索函数
def vector_search(query_text, k=5):
# 生成查询向量
query_vector = model.encode(query_text).tolist()
# 构建查询
query = {
'size': k,
'query': {
'knn': {
'text_vector': {
'vector': query_vector,
'k': k
}
}
}
}
# 执行搜索
response = client.search(
body=query,
index=index_name
)
return response
# 使用示例
results = vector_search('What is OpenSearch?', k=5)
print(f"\nFound {results['hits']['total']['value']} similar documents:")
for hit in results['hits']['hits']:
print(f"Score: {hit['_score']:.3f} | Text: {hit['_source']['text']}")
# 混合搜索(向量 + 关键词)
def hybrid_search(query_text, k=10):
query_vector = model.encode(query_text).tolist()
query = {
'size': k,
'query': {
'bool': {
'should': [
{
'knn': {
'text_vector': {
'vector': query_vector,
'k': 50
}
}
},
{
'multi_match': {
'query': query_text,
'fields': ['text^2'],
'type': 'best_fields'
}
}
]
}
}
}
response = client.search(
body=query,
index=index_name
)
return response
# 使用混合搜索
results = hybrid_search('search engine technology')
数据管理
批量操作
from opensearchpy.helpers import bulk
# 准备数据
actions = []
for i in range(100):
actions.append({
'_index': index_name,
'_id': i,
'title': f'Document {i}',
'content': f'This is the content of document {i}',
'category': 'example',
'price': i * 10.5
})
# 批量索引
success, failed = bulk(
client,
actions,
index=index_name,
refresh=True
)
print(f"Successfully indexed {success} documents")
print(f"Failed to index {failed} documents")
数据更新
# 更新单个文档
response = client.update(
index=index_name,
id=1,
body={
'doc': {
'price': 1099.99,
'updated_at': '2022-03-03T18:03:03Z'
}
},
refresh=True
)
# 批量更新
def bulk_update(index_name, updates):
for update in updates:
client.update(
index=index_name,
id=update['id'],
body={'doc': update['doc']},
refresh=False
)
# 使用
updates = [
{'id': 1, 'doc': {'price': 1099.99}},
{'id': 2, 'doc': {'price': 999.99}},
]
bulk_update(index_name, updates)
数据删除
# 删除文档
client.delete(index=index_name, id=1)
# 删除匹配条件的文档
query = {
'query': {
'term': {'category': 'old'}
}
}
response = client.delete_by_query(
index=index_name,
body=query
)
print(f"Deleted {response['deleted']} documents")
索引管理
索引别名
# 创建别名
client.indices.put_alias(
index=index_name,
name='current_index'
)
# 切换别名(零停机)
client.indices.update_aliases({
'actions': [
{'remove': {'index': 'old_index', 'alias': 'current_index'}},
{'add': {'index': 'new_index', 'alias': 'current_index'}}
]
})
# 查看别名
aliases = client.indices.get_alias(name='current_index')
索引模板
# 创建索引模板
template_body = {
'index_patterns': ['logs-*'],
'settings': {
'number_of_shards': 3,
'number_of_replicas': 1
},
'mappings': {
'properties': {
'@timestamp': {'type': 'date'},
'message': {'type': 'text'},
'level': {'type': 'keyword'}
}
}
}
client.indices.put_template(
name='logs_template',
body=template_body
)
# 创建符合模板的索引
client.indices.create(index='logs-2022-03-03')
索引设置优化
# 更新索引设置
client.indices.put_settings(
index=index_name,
body={
'settings': {
'number_of_replicas': 2,
'refresh_interval': '30s'
}
}
)
# 查看当前设置
settings = client.indices.get_settings(index=index_name)
print(settings)
集群管理
节点信息
# 查看集群健康状态
health = client.cluster.health()
print(f"Cluster status: {health['status']}")
# 查看节点信息
nodes = client.nodes.info()
for node_id, node_info in nodes['nodes'].items():
print(f"Node {node_id}: {node_info['name']}")
# 查看集群统计
stats = client.cluster.stats()
print(f"Total indices: {stats['indices']['count']}")
print(f"Total documents: {stats['indices']['total']['docs']['count']}")
备份与恢复
# 创建快照仓库
curl -XPUT --insecure -u 'admin:admin' \
'https://localhost:9200/_snapshot/my_backup' \
-H 'Content-Type: application/json' \
-d '{
"type": "fs",
"settings": {
"location": "/path/to/backup/directory"
}
}'
# 创建快照
curl -XPUT --insecure -u 'admin:admin' \
'https://localhost:9200/_snapshot/my_backup/snapshot_1' \
-d '{
"indices": "my-index",
"ignore_unavailable": true,
"include_global_state": false
}'
# 恢复快照
curl -XPOST --insecure -u 'admin:admin' \
'https://localhost:9200/_snapshot/my_backup/snapshot_1/_restore'
性能优化
索引优化
# 关闭副本(批量导入时)
client.indices.put_settings(
index=index_name,
body={
'settings': {
'number_of_replicas': 0
}
}
)
# 执行批量操作...
# 重新开启副本
client.indices.put_settings(
index=index_name,
body={
'settings': {
'number_of_replicas': 1
}
}
)
# 强制合并段
client.indices.forcemerge(
index=index_name,
max_num_segments=1
)
查询优化
# 使用过滤器而不是查询
query = {
'query': {
'bool': {
'must': [{'match': {'title': 'iPhone'}}],
'filter': [
{'range': {'price': {'gte': 500, 'lte': 1500}}},
{'term': {'category': 'smartphone'}}
]
}
}
}
# 使用时间筛选减少结果集
query = {
'query': {
'bool': {
'must': [{'match': {'content': 'search'}}],
'filter': [
{'range': {'@timestamp': {'gte': 'now-7d'}}}
]
}
}
}
# 使用 source filtering 减少传输数据
query = {
'_source': ['title', 'price'],
'query': {'match': {'title': 'iPhone'}}
}
内存优化
# 调整缓存大小
client.cluster.put_settings(
body={
'persistent': {
'indices.fielddata.cache.size': '30%',
'indices.queries.cache.size': '10%'
}
}
)
监控与调试
性能指标
# 获取集群统计
stats = client.cluster.stats()
print(json.dumps(stats, indent=2))
# 获取索引统计
index_stats = client.indices.stats(index=index_name)
print(f"Index size: {index_stats['indices']['total']['store']['size_in_bytes']} bytes")
# 获取节点统计
node_stats = client.nodes.stats()
for node_id, stats in node_stats['nodes'].items():
print(f"Node {node_id}: {stats['indices']['search']['query_total']} queries")
慢查询日志
# 启用慢查询日志
client.cluster.put_settings(
body={
'persistent': {
'index.search.slowlog.threshold.query.warn': '5s',
'index.search.slowlog.threshold.fetch.warn': '1s'
}
}
)
# 查看慢查询
curl -XGET --insecure -u 'admin:admin' \
'https://localhost:9200/_search?pretty&search_type=dfs_query_then_fetch'
实战案例
案例 1:电商搜索引擎
class EcommerceSearch:
def __init__(self, client):
self.client = client
self.index_name = 'products'
def index_product(self, product):
"""索引商品"""
return self.client.index(
index=self.index_name,
body=product,
id=product['id'],
refresh=True
)
def search_products(self, query, filters=None, sort=None):
"""搜索商品"""
query_body = {
'query': {
'bool': {
'must': [],
'should': []
}
},
'aggs': {
'categories': {
'terms': {'field': 'category', 'size': 10}
},
'price_range': {
'histogram': {
'field': 'price',
'interval': 100
}
}
}
}
# 添加全文搜索
if query:
query_body['query']['bool']['must'].append({
'multi_match': {
'query': query,
'fields': [
'title^3',
'description',
'brand^2',
'tags'
],
'type': 'best_fields',
'fuzziness': 'AUTO'
}
})
# 添加过滤
if filters:
query_body['query']['bool']['filter'] = []
for field, value in filters.items():
if field == 'price_range':
query_body['query']['bool']['filter'].append({
'range': {
'price': {
'gte': value.get('min'),
'lte': value.get('max')
}
}
})
elif field == 'in_stock':
query_body['query']['bool']['filter'].append({
'term': {'in_stock': value}
})
else:
query_body['query']['bool']['filter'].append({
'terms': {field: value} if isinstance(value, list) else {'term': {field: value}}
})
# 添加排序
if sort:
query_body['sort'] = [sort]
return self.client.search(
body=query_body,
index=self.index_name
)
def get_recommendations(self, product_id, size=5):
"""商品推荐(基于类别)"""
# 获取商品信息
product = self.client.get(index=self.index_name, id=product_id)
category = product['_source']['category']
# 基于类别推荐
query = {
'size': size,
'query': {
'bool': {
'must': [
{'term': {'category': category}}
],
'must_not': [
{'term': {'_id': product_id}}
]
}
}
}
return self.client.search(
body=query,
index=self.index_name
)
# 使用示例
search = EcommerceSearch(client)
# 索引商品
product = {
'id': 1001,
'title': 'iPhone 13 Pro',
'description': 'Latest iPhone with advanced features',
'brand': 'Apple',
'category': 'smartphone',
'price': 999.99,
'in_stock': True,
'tags': ['apple', 'mobile', '5g']
}
search.index_product(product)
# 搜索商品
results = search.search_products(
query='iPhone Apple',
filters={'category': ['smartphone'], 'in_stock': True},
sort=[{'price': {'order': 'desc'}}]
)
print(f"Found {results['hits']['total']['value']} products")
案例 2:日志分析系统
class LogAnalyzer:
def __init__(self, client):
self.client = client
self.index_name = 'logs-*'
def index_log(self, log_entry):
"""索引日志"""
return self.client.index(
index=f"logs-{log_entry['date']}",
body=log_entry,
refresh=False
)
def search_logs(self, query, time_range=None, filters=None):
"""搜索日志"""
query_body = {
'query': {
'bool': {
'must': []
}
},
'sort': [{'@timestamp': {'order': 'desc'}}],
'size': 100
}
# 添加搜索条件
if query:
query_body['query']['bool']['must'].append({
'multi_match': {
'query': query,
'fields': ['message', 'level', 'service']
}
})
# 添加时间范围
if time_range:
query_body['query']['bool']['filter'] = [{
'range': {
'@timestamp': time_range
}
}]
# 添加其他过滤
if filters:
if 'query' not in query_body['query']['bool']:
query_body['query']['bool']['filter'] = []
for field, value in filters.items():
query_body['query']['bool']['filter'].append({
'term': {field: value}
})
return self.client.search(
body=query_body,
index=self.index_name
)
def get_error_summary(self, hours=1):
"""获取错误汇总"""
query = {
'query': {
'bool': {
'filter': [
{'term': {'level': 'ERROR'}},
{'range': {'@timestamp': {'gte': f'now-{hours}h'}}}
]
}
},
'aggs': {
'errors_by_service': {
'terms': {'field': 'service', 'size': 10},
'aggs': {
'error_types': {
'terms': {'field': 'level', 'size': 5}
}
}
},
'errors_over_time': {
'date_histogram': {
'field': '@timestamp',
'calendar_interval': 'minute'
}
}
}
}
return self.client.search(
body=query,
index=self.index_name
)
# 使用示例
analyzer = LogAnalyzer(client)
# 索引日志
log = {
'date': '2022-03-03',
'@timestamp': '2022-03-03T18:03:03Z',
'level': 'ERROR',
'service': 'api-server',
'message': 'Database connection failed',
'trace_id': 'abc123'
}
analyzer.index_log(log)
# 搜索日志
results = analyzer.search_logs(
query='database error',
time_range={'gte': 'now-1h'},
filters={'level': 'ERROR'}
)
最佳实践
1. 索引设计
- 选择合适的分片数:数据量的 1.5-3 倍
- 设置副本数:生产环境至少 1 个副本
- 使用别名:方便索引切换和零停机维护
- 字段类型选择:
keyword:精确匹配、聚合text:全文搜索date:时间字段float/double:数值
2. 查询优化
# 使用过滤器缓存
query = {
'query': {
'bool': {
'must': [{'match': {'title': 'iPhone'}}],
'filter': [
{'range': {'price': {'gte': 500}}} # 会被缓存
]
}
}
}
# 使用 bool 查询
query = {
'query': {
'bool': {
'should': [
{'match': {'title': 'iPhone'}},
{'match': {'brand': 'Apple'}}
],
'minimum_should_match': 1
}
}
}
3. 数据建模
# 避免深度嵌套
bad_mapping = {
'properties': {
'user': {
'properties': {
'address': {
'properties': {
'street': {'type': 'text'},
'city': {'type': 'text'}
}
}
}
}
}
}
# 使用 join 类型
good_mapping = {
'properties': {
'user_name': {'type': 'text'},
'join_field': {
'type': 'join',
'relations': {
'user': 'address'
}
}
}
}
4. 监控建议
- 集群健康:定期检查
cluster.health - 索引大小:监控
indices.stats - 查询性能:启用慢查询日志
- 磁盘使用:关注节点磁盘使用率
- 内存使用:监控 JVM 堆内存使用
常见问题解决
问题 1:集群红色状态
# 查看未分配的分片
curl -XGET --insecure -u 'admin:admin' \
'https://localhost:9200/_cat/shards?v'
# 查看失败的分片
curl -XGET --insecure -u 'admin:admin' \
'https://localhost:9200/_cat/recovery?v'
问题 2:内存不足
# 调整 JVM 堆大小
# 在启动时设置:-Xms2g -Xmx2g
# 清理缓存
client.cluster.post_settings(
body={'transient': {'action.auto_create_index': False}}
)
问题 3:查询速度慢
# 1. 添加更多副本
client.indices.put_settings(
index=index_name,
body={'settings': {'index.number_of_replicas': 2}}
)
# 2. 使用路由
client.search(
body=query,
index=index_name,
routing='user_id'
)
# 3. 优化查询
# - 使用过滤器
# - 减少返回字段
# - 限制结果数量
总结
OpenSearch 是一个功能强大的搜索和分析平台,掌握其核心概念和使用方法,能够帮助您构建高效的搜索和分析系统。
通过本指南,您已经学习了:
- OpenSearch 的基本概念和特性
- Docker 安装和配置方法
- RESTful API 的使用
- Python 客户端的高级用法
- 向量搜索和 KNN 查询
- 索引管理和性能优化
- 集群监控和维护
- 实战案例和应用场景
在实际应用中,建议:
- 根据业务需求设计索引 结构
- 使用模板管理索引
- 定期监控集群健康
- 优化查询和索引策略
- 做好数据备份和恢复
持续实践和探索,您将能够充分发挥 OpenSearch 的强大功能!