助手

一系列简单的辅助函数,抽象了原始 API 的一些细节。

连接

from elasticsearch import Elasticsearch

client = Elasticsearch("https://.../", api_key="YOUR_API_KEY")

批量助手

对于 bulk API,有几个助手,因为其对特定格式和其它考虑因素的要求,如果直接使用会很麻烦。

所有批量助手都接受一个 Elasticsearch 类实例和一个可迭代的 actions(任何可迭代对象,也可以是生成器,在大多数情况下这是理想的选择,因为它允许您索引大型数据集,而无需将它们加载到内存中)。

action 可迭代对象中的项目应该是我们希望以多种格式索引的文档。最常见的一种与 search() 返回的结果相同,例如

{
    '_index': 'index-name',
    '_id': 42,
    '_routing': 5,
    'pipeline': 'my-ingest-pipeline',
    '_source': {
        "title": "Hello World!",
        "body": "..."
    }
}

或者,如果 _source 不存在,它将从 doc 中弹出所有元数据字段,并使用其余部分作为文档数据

{
    "_id": 42,
    "_routing": 5,
    "title": "Hello World!",
    "body": "..."
}

The bulk() api 接受 indexcreatedeleteupdate 操作。使用 _op_type 字段指定操作(_op_type 默认值为 index

{
    '_op_type': 'delete',
    '_index': 'index-name',
    '_id': 42,
}
{
    '_op_type': 'update',
    '_index': 'index-name',
    '_id': 42,
    'doc': {'question': 'The life, universe and everything.'}
}

示例:

假设我们有一个可迭代的数据。假设一个名为 mywords 的单词列表,我们希望将这些单词索引到单个文档中,其中文档的结构类似于 {"word": "<myword>"}

from elasticsearch.helpers import bulk

def gendata():
    mywords = ['foo', 'bar', 'baz']
    for word in mywords:
        yield {
            "_index": "mywords",
            "word": word,
        }

bulk(client, gendata())

有关更完整和更复杂的示例,请查看 https://github.com/elastic/elasticsearch-py/blob/main/examples/bulk-ingest

The parallel_bulk() api 是 bulk() api 的包装器,用于提供线程。 parallel_bulk() 返回一个生成器,必须使用它来生成结果。

要查看结果,请使用

for success, info in parallel_bulk(...):
if not success:
    print('A document failed:', info)

如果您不关心结果,可以使用 collections 中的 deque

from collections import deque
deque(parallel_bulk(...), maxlen=0)

注意

从文件读取原始 json 字符串时,也可以直接传入它们(无需先解码为字典)。但是,在这种情况下,您将失去在每个记录的基础上指定任何内容(索引、op_type 甚至 id)的能力,所有文档都将被发送到 elasticsearch 以按原样索引。

elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), *args, **kwargs)

Streaming bulk 从传入的可迭代对象中消耗操作,并为每个操作生成结果。对于非流式用例,请使用 bulk(),它是 streaming bulk 的包装器,在整个输入被消耗并发送后返回有关批量操作的摘要信息。

如果您指定 max_retries,它还会重试任何被拒绝的文档,其状态代码为 429。为此,它将等待(**通过调用 time.sleep,这将阻塞**)initial_backoff 秒,然后,对于同一块的每次后续拒绝,每次等待两倍的时间,直到 max_backoff 秒。

参数:
  • client (Elasticsearch) – Elasticsearch 的实例,用于使用

  • actions (Iterable[bytes | str | Dict[str, Any]]) – 包含要执行的操作的可迭代对象

  • chunk_size (int) – 发送到 es 的一个块中的文档数量(默认:500)

  • max_chunk_bytes (int) – 请求的最大大小(字节)(默认:100MB)

  • raise_on_error (bool) – 当出现错误时,引发包含错误(作为 .errors)的 BulkIndexError。默认情况下,我们引发错误。

  • raise_on_exception (bool) – 如果为 False,则不从对 bulk 的调用中传播异常,只报告失败的项目。

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 在传入的每个操作上执行的回调,应返回一个元组,其中包含操作行和数据行(None 如果数据行应该被省略)。

  • max_retries (int) – 当收到 429 时,文档将被重试的最大次数,设置为 0(默认)表示在 429 上不重试

  • initial_backoff (float) – 我们在第一次重试之前应该等待的秒数。任何后续重试都将是 initial_backoff * 2**retry_number 的幂

  • max_backoff (float) – 重试将等待的最大秒数

  • yield_ok (bool) – 如果设置为 False,将跳过输出中的成功文档

  • ignore_status (int | Collection[int]) – 您想要忽略的 HTTP 状态代码列表

  • client

  • actions

  • chunk_size

  • max_chunk_bytes

  • raise_on_error

  • expand_action_callback

  • raise_on_exception

  • max_retries

  • initial_backoff

  • max_backoff

  • yield_ok

  • ignore_status

  • args (Any)

  • kwargs (Any)

返回类型:

可迭代[元组[布尔值, 字典[字符串, 任意]]]

elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=104857600, queue_size=4, expand_action_callback=<function expand_action>, ignore_status=(), *args, **kwargs)

批量助手并行版本,在多个线程中同时运行。

参数:
  • client (Elasticsearch) – Elasticsearch 的实例,用于使用

  • actions (可迭代[字节 | 字符串 | 字典[字符串, 任意]]) – 包含操作的迭代器

  • thread_count (整数) – 用于批量请求的线程池大小

  • chunk_size (int) – 发送到 es 的一个块中的文档数量(默认:500)

  • max_chunk_bytes (int) – 请求的最大大小(字节)(默认:100MB)

  • raise_on_error – 当出现错误时,引发包含错误(作为 .errors)的 BulkIndexError。默认情况下,我们引发错误。

  • raise_on_exception – 如果为 False,则不从调用 bulk 中传播异常,只报告失败的项目。

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 在传入的每个操作上执行的回调,应返回一个元组,其中包含操作行和数据行(None 如果数据行应该被省略)。

  • queue_size (整数) – 主线程(生成要发送的块)和处理线程之间的任务队列大小。

  • ignore_status (int | Collection[int]) – 您想要忽略的 HTTP 状态代码列表

  • client

  • actions

  • thread_count

  • chunk_size

  • max_chunk_bytes

  • queue_size

  • expand_action_callback

  • ignore_status

  • args (Any)

  • kwargs (Any)

返回类型:

可迭代[元组[布尔值, 任意]]

elasticsearch.helpers.bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)

用于 bulk() API 的助手,提供更人性化的界面 - 它使用操作迭代器,并将它们以块的形式发送到 Elasticsearch。它返回一个包含摘要信息的元组 - 成功执行的操作数量,以及错误列表或错误数量(如果 stats_only 设置为 True)。请注意,默认情况下,当遇到错误时,我们会引发 BulkIndexError,因此像 stats_only 这样的选项仅在 raise_on_error 设置为 False 时适用。

当收集错误时,原始文档数据将包含在错误字典中,这会导致内存使用量过高。如果您需要处理大量数据并希望忽略/收集错误,请考虑使用 streaming_bulk() 助手,它只返回错误,不会将它们存储在内存中。

参数:
返回类型:

元组[整数, 整数 | 列表[字典[字符串, 任意]]]

任何其他关键字参数都将传递给 streaming_bulk(),它用于执行操作,有关更多接受的参数,请参见 streaming_bulk()

扫描

elasticsearch.helpers.scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)

scroll() API 之上的简单抽象 - 一个简单的迭代器,它会生成由底层滚动请求返回的所有命中结果。

默认情况下,扫描不会以任何预定的顺序返回结果。为了在滚动时在返回的文档中具有标准顺序(按分数或显式排序定义),请使用 preserve_order=True。这可能是一个昂贵的操作,并且会抵消使用 scan 带来的性能优势。

参数:
  • client (Elasticsearch) – Elasticsearch 的实例,用于使用

  • query (任意 | ) – 用于 search() API 的主体

  • scroll (字符串) – 指定应为滚动搜索维护索引的一致视图的时间长度

  • raise_on_error (布尔值) – 如果遇到错误(某些分片无法执行),则引发异常 (ScanError)。默认情况下,我们引发错误。

  • preserve_order (布尔值) – 不要将 search_type 设置为 scan - 这将导致滚动进行分页,同时保留顺序。请注意,这可能是一个非常昂贵的操作,并且很容易导致不可预测的结果,请谨慎使用。

  • size (整数) – 每次迭代发送的批次大小(每个分片)。

  • request_timeout (浮点数 | ) – 对每次调用 scan 的显式超时时间

  • clear_scroll (bool) – 在方法完成或出错时,通过清除滚动 API 显式调用滚动 ID 上的删除操作,默认为 true。

  • scroll_kwargs (MutableMapping[str, Any] | None) – 要传递给 scroll() 的其他关键字参数

  • client

  • 查询

  • 滚动

  • raise_on_error

  • 保留顺序

  • 大小

  • 请求超时

  • 清除滚动

  • 滚动关键字参数

  • kwargs (Any)

返回类型:

Iterable[Dict[str, Any]]

任何其他关键字参数都将传递给初始的 search() 调用

scan(client,
    query={"query": {"match": {"title": "python"}}},
    index="orders-*",
    doc_type="books"
)

重新索引

elasticsearch.helpers.reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})

将满足给定查询的一个索引中的所有文档重新索引到另一个索引,可能(如果指定了 target_client)在不同的集群上。如果您没有指定查询,您将重新索引所有文档。

2.3 开始,reindex() API 作为 Elasticsearch 本身的一部分可用。建议尽可能使用 API 而不是此帮助程序。此帮助程序主要用于向后兼容性和需要更多灵活性的情况。

注意

此帮助程序不传输映射,只传输数据。

参数:
  • client (Elasticsearch) – 要使用的 Elasticsearch 实例(如果指定了 target_client,则用于读取)

  • source_index (str | Collection[str]) – 要从中读取文档的索引(或索引列表)

  • target_index (str) – 要填充的目标集群中索引的名称

  • query (任意 | ) – 用于 search() API 的主体

  • target_client (Elasticsearch | None) – 可选,如果指定,将用于写入(从而启用跨集群重新索引)

  • chunk_size (int) – 发送到 es 的一个块中的文档数量(默认:500)

  • scroll (字符串) – 指定应为滚动搜索维护索引的一致视图的时间长度

  • op_type (str | None) – 显式操作类型。默认为 ‘_index’。数据流必须设置为 ‘create’。如果未指定,将自动检测 target_index 是否为数据流。

  • scan_kwargs (MutableMapping[str, Any]) – 要传递给 scan() 的其他关键字参数

  • bulk_kwargs (MutableMapping[str, Any]) – 要传递给 bulk() 的其他关键字参数

  • client

  • 源索引

  • 目标索引

  • 查询

  • 目标客户端

  • chunk_size

  • 滚动

  • 操作类型

  • 扫描关键字参数

  • 批量关键字参数

返回类型:

元组[整数, 整数 | 列表[字典[字符串, 任意]]]