ES批量请求`es_rejected_execution_exception`

2024年 2月 11日 116.2k 0

es批量请求`es_rejected_execution_exception`

php小编苹果为您介绍ES批量请求中的常见问题:`es_rejected_execution_exception`。在使用Elasticsearch进行批量请求时,有时会遇到这个异常。这个异常通常表示请求的并发数超过了Elasticsearch服务器的处理能力,导致请求被拒绝执行。本文将为您解析这个异常的原因,并给出解决方案,帮助您顺利处理该问题。

问题内容

我有一个大约 5M 条目的切片(为简单起见,假设每个条目都是一个字节切片,它使用 getIndexerItem 函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push函数,切片长度为5M/200。

根据我对Refresh的理解:wait_for,每当向elastic发出请求时,只有当该请求所做的更改对搜索可见时才会完成(IMO将其转换为不再有此特定请求的批量请求队列) )。那为什么我会收到此错误?

error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]:
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
[Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]

登录后复制

所有条目都将进入相同的索引,ankit-test

func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
return esutil.BulkIndexerItem{
Index: index,
DocumentID: id,
Body: bytes.NewReader(body),
Action: "index",
DocumentType: "logs",
OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
fmt.Printf("error indexing item: %sn", err.Error())
} else {
fmt.Printf("error indexing item: %s: %sn", res.Error.Type, res.Error.Reason)
}
},
}
}

func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Refresh: "wait_for",
NumWorkers: 1,
OnError: func(ctx context.Context, err error) {
fmt.Printf("received onError %sn", err.Error())
},
})
if err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %s", err)
}

ctx := context.Background()
for _, d := range data {
if err := indexer.Add(ctx, d); err != nil {
fmt.Printf("error adding data to indexer: %sn", err)
}
}
if err := indexer.Close(ctx); err != nil {
fmt.Printf("error flushing and closing indexer: %sn", err)
}

indexerStats := indexer.Stats()
return &indexerStats, nil
}

登录后复制

假设没有其他进程以任何方式与索引交互。

解决方法

使用多个 ES 文档,我能够找到上述问题的解决方案。下面的回答是基于我的理解。如果您发现可以改进/纠正的内容,请发表评论。

这是请求生命周期:

  • golang es客户端将多个请求合并为一个,并以单个批量请求的形式发送到服务器。单个批量请求可以包含发往多个索引和分片的文档。
  • 当批量请求到达集群中的节点(也称为协调节点)时,它会被整体放入批量队列中,并由批量线程池中的线程进行处理。
  • 协调节点根据文档需要路由到的分片来拆分批量请求。
    每个批量子请求都会转发到保存相应主分片的数据节点。批量子请求在该节点的批量队列中排队。如果队列上没有可用空间,则通知协调节点批量子请求已被拒绝。
  • 所有子请求完成或被拒绝后,就会创建响应并将其返回给客户端。有可能甚至很可能批量请求中只有部分文档被拒绝。
  • 我的问题是我使用 refresh = false (默认)发送请求。相反,应该使用 refresh = wait_for 。为什么?刷新提供了3种模式:

  • false:不执行与刷新相关的操作。此请求所做的更改将在请求返回后的某个时刻可见。在收到响应时,请求不必已完成。请求可能仍在节点的队列中。
  • true:操作发生后立即刷新相关主分片和副本分片。确保在发回响应之前请求已完成。请求已从节点队列中删除。
  • wait_for:等待请求所做的更改在回复之前通过刷新可见。与 true 不同的是,这不会强制立即刷新,而是等待刷新发生。比 refresh = true 便宜(就服务器负载而言),但仍然确保在发送回响应之前请求已完成。请求已从节点队列中删除。
  • 所有数据都被重定向到同一节点,并且由于 refresh = false,在现有请求从队列中清除之前返回了响应,这导致了溢出。

    以上就是ES批量请求`es_rejected_execution_exception`的详细内容,更多请关注每日运维网(www.mryunwei.com)其它相关文章!

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论