助手
一系列简单的辅助函数,抽象了原始 API 的一些细节。
连接
from elasticsearch import Elasticsearch
client = Elasticsearch("https://.../", api_key="YOUR_API_KEY")
批量助手
对于 bulk
API,有几个助手,因为其对特定格式和其它考虑因素的要求,如果直接使用会很麻烦。
所有批量助手都接受一个 Elasticsearch
类实例和一个可迭代的 actions
(任何可迭代对象,也可以是生成器,在大多数情况下这是理想的选择,因为它允许您索引大型数据集,而无需将它们加载到内存中)。
在 action
可迭代对象中的项目应该是我们希望以多种格式索引的文档。最常见的一种与 search()
返回的结果相同,例如
{
'_index': 'index-name',
'_id': 42,
'_routing': 5,
'pipeline': 'my-ingest-pipeline',
'_source': {
"title": "Hello World!",
"body": "..."
}
}
或者,如果 _source 不存在,它将从 doc 中弹出所有元数据字段,并使用其余部分作为文档数据
{
"_id": 42,
"_routing": 5,
"title": "Hello World!",
"body": "..."
}
The bulk()
api 接受 index
、create
、delete
和 update
操作。使用 _op_type
字段指定操作(_op_type
默认值为 index
)
{
'_op_type': 'delete',
'_index': 'index-name',
'_id': 42,
}
{
'_op_type': 'update',
'_index': 'index-name',
'_id': 42,
'doc': {'question': 'The life, universe and everything.'}
}
示例:
假设我们有一个可迭代的数据。假设一个名为 mywords
的单词列表,我们希望将这些单词索引到单个文档中,其中文档的结构类似于 {"word": "<myword>"}
。
from elasticsearch.helpers import bulk
def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"word": word,
}
bulk(client, gendata())
有关更完整和更复杂的示例,请查看 https://github.com/elastic/elasticsearch-py/blob/main/examples/bulk-ingest
The parallel_bulk()
api 是 bulk()
api 的包装器,用于提供线程。 parallel_bulk()
返回一个生成器,必须使用它来生成结果。
要查看结果,请使用
for success, info in parallel_bulk(...):
if not success:
print('A document failed:', info)
如果您不关心结果,可以使用 collections 中的 deque
from collections import deque
deque(parallel_bulk(...), maxlen=0)
注意
从文件读取原始 json 字符串时,也可以直接传入它们(无需先解码为字典)。但是,在这种情况下,您将失去在每个记录的基础上指定任何内容(索引、op_type 甚至 id)的能力,所有文档都将被发送到 elasticsearch 以按原样索引。
- elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), *args, **kwargs)
Streaming bulk 从传入的可迭代对象中消耗操作,并为每个操作生成结果。对于非流式用例,请使用
bulk()
,它是 streaming bulk 的包装器,在整个输入被消耗并发送后返回有关批量操作的摘要信息。如果您指定
max_retries
,它还会重试任何被拒绝的文档,其状态代码为429
。为此,它将等待(**通过调用 time.sleep,这将阻塞**)initial_backoff
秒,然后,对于同一块的每次后续拒绝,每次等待两倍的时间,直到max_backoff
秒。- 参数:
client (Elasticsearch) –
Elasticsearch
的实例,用于使用actions (Iterable[bytes | str | Dict[str, Any]]) – 包含要执行的操作的可迭代对象
chunk_size (int) – 发送到 es 的一个块中的文档数量(默认:500)
max_chunk_bytes (int) – 请求的最大大小(字节)(默认:100MB)
raise_on_error (bool) – 当出现错误时,引发包含错误(作为 .errors)的
BulkIndexError
。默认情况下,我们引发错误。raise_on_exception (bool) – 如果为
False
,则不从对bulk
的调用中传播异常,只报告失败的项目。expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 在传入的每个操作上执行的回调,应返回一个元组,其中包含操作行和数据行(None 如果数据行应该被省略)。
max_retries (int) – 当收到
429
时,文档将被重试的最大次数,设置为 0(默认)表示在429
上不重试initial_backoff (float) – 我们在第一次重试之前应该等待的秒数。任何后续重试都将是
initial_backoff * 2**retry_number
的幂max_backoff (float) – 重试将等待的最大秒数
yield_ok (bool) – 如果设置为 False,将跳过输出中的成功文档
ignore_status (int | Collection[int]) – 您想要忽略的 HTTP 状态代码列表
client
actions
chunk_size
max_chunk_bytes
raise_on_error
expand_action_callback
raise_on_exception
max_retries
initial_backoff
max_backoff
yield_ok
ignore_status
args (Any)
kwargs (Any)
- 返回类型:
- elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=104857600, queue_size=4, expand_action_callback=<function expand_action>, ignore_status=(), *args, **kwargs)
批量助手并行版本,在多个线程中同时运行。
- 参数:
client (Elasticsearch) –
Elasticsearch
的实例,用于使用thread_count (整数) – 用于批量请求的线程池大小
chunk_size (int) – 发送到 es 的一个块中的文档数量(默认:500)
max_chunk_bytes (int) – 请求的最大大小(字节)(默认:100MB)
raise_on_error – 当出现错误时,引发包含错误(作为 .errors)的
BulkIndexError
。默认情况下,我们引发错误。raise_on_exception – 如果为
False
,则不从调用bulk
中传播异常,只报告失败的项目。expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 在传入的每个操作上执行的回调,应返回一个元组,其中包含操作行和数据行(None 如果数据行应该被省略)。
queue_size (整数) – 主线程(生成要发送的块)和处理线程之间的任务队列大小。
ignore_status (int | Collection[int]) – 您想要忽略的 HTTP 状态代码列表
client
actions
thread_count
chunk_size
max_chunk_bytes
queue_size
expand_action_callback
ignore_status
args (Any)
kwargs (Any)
- 返回类型:
- elasticsearch.helpers.bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)
用于
bulk()
API 的助手,提供更人性化的界面 - 它使用操作迭代器,并将它们以块的形式发送到 Elasticsearch。它返回一个包含摘要信息的元组 - 成功执行的操作数量,以及错误列表或错误数量(如果stats_only
设置为True
)。请注意,默认情况下,当遇到错误时,我们会引发BulkIndexError
,因此像stats_only
这样的选项仅在raise_on_error
设置为False
时适用。当收集错误时,原始文档数据将包含在错误字典中,这会导致内存使用量过高。如果您需要处理大量数据并希望忽略/收集错误,请考虑使用
streaming_bulk()
助手,它只返回错误,不会将它们存储在内存中。- 参数:
client (Elasticsearch) –
Elasticsearch
的实例,用于使用stats_only (布尔值) – 如果为 True,则仅报告成功/失败操作的数量,而不是仅报告成功操作的数量和错误响应列表
ignore_status (int | Collection[int]) – 您想要忽略的 HTTP 状态代码列表
client
actions
stats_only
ignore_status
args (Any)
kwargs (Any)
- 返回类型:
任何其他关键字参数都将传递给
streaming_bulk()
,它用于执行操作,有关更多接受的参数,请参见streaming_bulk()
。
扫描
- elasticsearch.helpers.scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)
在
scroll()
API 之上的简单抽象 - 一个简单的迭代器,它会生成由底层滚动请求返回的所有命中结果。默认情况下,扫描不会以任何预定的顺序返回结果。为了在滚动时在返回的文档中具有标准顺序(按分数或显式排序定义),请使用
preserve_order=True
。这可能是一个昂贵的操作,并且会抵消使用scan
带来的性能优势。- 参数:
client (Elasticsearch) –
Elasticsearch
的实例,用于使用scroll (字符串) – 指定应为滚动搜索维护索引的一致视图的时间长度
raise_on_error (布尔值) – 如果遇到错误(某些分片无法执行),则引发异常 (
ScanError
)。默认情况下,我们引发错误。preserve_order (布尔值) – 不要将
search_type
设置为scan
- 这将导致滚动进行分页,同时保留顺序。请注意,这可能是一个非常昂贵的操作,并且很容易导致不可预测的结果,请谨慎使用。size (整数) – 每次迭代发送的批次大小(每个分片)。
request_timeout (浮点数 | 无) – 对每次调用
scan
的显式超时时间clear_scroll (bool) – 在方法完成或出错时,通过清除滚动 API 显式调用滚动 ID 上的删除操作,默认为 true。
scroll_kwargs (MutableMapping[str, Any] | None) – 要传递给
scroll()
的其他关键字参数client
查询
滚动
raise_on_error
保留顺序
大小
请求超时
清除滚动
滚动关键字参数
kwargs (Any)
- 返回类型:
任何其他关键字参数都将传递给初始的
search()
调用scan(client, query={"query": {"match": {"title": "python"}}}, index="orders-*", doc_type="books" )
重新索引
- elasticsearch.helpers.reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})
将满足给定查询的一个索引中的所有文档重新索引到另一个索引,可能(如果指定了 target_client)在不同的集群上。如果您没有指定查询,您将重新索引所有文档。
从
2.3
开始,reindex()
API 作为 Elasticsearch 本身的一部分可用。建议尽可能使用 API 而不是此帮助程序。此帮助程序主要用于向后兼容性和需要更多灵活性的情况。注意
此帮助程序不传输映射,只传输数据。
- 参数:
client (Elasticsearch) – 要使用的
Elasticsearch
实例(如果指定了 target_client,则用于读取)source_index (str | Collection[str]) – 要从中读取文档的索引(或索引列表)
target_index (str) – 要填充的目标集群中索引的名称
target_client (Elasticsearch | None) – 可选,如果指定,将用于写入(从而启用跨集群重新索引)
chunk_size (int) – 发送到 es 的一个块中的文档数量(默认:500)
scroll (字符串) – 指定应为滚动搜索维护索引的一致视图的时间长度
op_type (str | None) – 显式操作类型。默认为 ‘_index’。数据流必须设置为 ‘create’。如果未指定,将自动检测 target_index 是否为数据流。
scan_kwargs (MutableMapping[str, Any]) – 要传递给
scan()
的其他关键字参数bulk_kwargs (MutableMapping[str, Any]) – 要传递给
bulk()
的其他关键字参数client
源索引
目标索引
查询
目标客户端
chunk_size
滚动
操作类型
扫描关键字参数
批量关键字参数
- 返回类型: