由于 Datadog 产品的性质,Husky 的存储引擎几乎完全围绕服务大型扫描和聚合查询进行了优化。
虽然它可以执行点查找并运行大海捞针搜索查询,但它并不是为了以高容量和低延迟执行点查找而设计的。
这种设计给摄取方面带来了挑战:我们如何保证数据被准确地摄取到 Husky 中一次,确保永远不会出现重复事件?
同时,Datadog 是一个大规模的多租户平台。
我们的解决方案必须与我们现有的多租户摄取管道配合使用,同时保持合理的摄取延迟且不会增加成本。
shard router
我们为每个租户分配一个分片列表,然后为租户的事件选择一个特定的 shard。
- 容易去重。一个特定的 timestamp + id 的 event 的数据会进入指定的 shard。这意味着我们只需要在单个 shard 内去执行去重。(同时,每个 worker 只有 id 的子集,这使在内存中去重 id 更方便)。
- 更低的存储成本。每个 worker 创建的文件数量与租户的数量成正比。减少文件的数量就减少了成本和 compactor 的工作。
需要解决的一些挑战性问题:
- 处理分配变更。租户的事件路由到的分片集可能需要随着时间的推移而变化,通常是由于租户流量的变化或可用分片总数的变化。个体租户在短时间内突然将其数量(暂时或持续)增加一到两个数量级是很常见的。扩展操作(为租户添加更多分片)或重新平衡操作(为租户使用不同的分片以更好地平衡流量)可以更改分片集。
- 分布式 Shard Router 节点之间达成共识。具体来说,每个分片路由器节点必须始终就将给定事件路由到哪个分片做出相同的决定,否则我们将面临将重复数据引入系统的风险。
- 负载平衡。每个分片应接收大致相等的输入流量切片,以便摄取节点实现负载平衡。
使用有时间限制的分片放置处理分配更改
当我们将租户分配给一组分片时,我们确保分配仅在特定的时间范围内有效:通常只有几分钟,这足以有效地对可能改变理想分片集的环境更新做出反应对于一个租户。
与分片分配器建立共识
为了确保所有分片路由器节点对分配的分片位置具有一致的视图,我们使用称为分片分配器的中央服务
使用 foundationdb 实现。
利用不可变的特性,缓存在 shared router 中。
Sharding Allocator如何选择分片
分片分配器有一些必须牢记的限制:
- 分片分配器必须快速、可靠并且具有最小的依赖性。
- 我们希望最大限度地减少同一租户的连续放置之间的分片更改。例如,如果分片路由器在一个时间间隔要求 5 个分片,在下一个时间间隔要求 6 个分片,则理想的布局将重用与前一个布局相同的 5 个分片并添加 1 个分片。当分片总数发生变化时,同样的要求也适用 - 理想情况下,如果我们从 100 个分片中选择了 5 个分片,那么稍后当有 101 个分片可供选择时,我们应该选择相同的 5 个分片。
- 有时排除分片很有用。我们应该能够跳过已标记为已排除的分片,例如帮助落后的消费者赶上。
h(tenantid + salt, totalshards)。
使用 tenantid 作为 key,选择第一个 shard 以及后续的 shard 作为分片,这样保证了分配是连续均匀的。
通过 salt 更改第一个 shard 的位置,确保了不同 shard 的流量是均衡的。
更改后,每个 shard 的消费速率都变得更为均匀。
适应突发流量
分片放置是不可变的,具有一组固定的分片,并且适用于特定的租户和时间范围。因此,分配的分片数量较少的租户有可能在放置适用的时间范围内急剧增加其流量。足够大量的流量涌入少量分片可能会使相关工作人员超载,从而导致摄取滞后。
通常,一些自动化机制可以减轻流量突发的影响:
-
自动扩容
分片的大小足够小,以便每个写入者通常一次分配多个分片。处理大量流量的影响将体现在 CPU 使用率等重要指标值的增加上。Watermark Pod Autoscaler会观察这些指标,并在观察到的值超过阈值时快速扩大规模。增加工作人员数量会减少分配给每个分片的分片数量,从而增加每个分片的最大潜在吞吐量。换句话说,如果 Writer 之前被分配了 10 个分片,但突然只分配了 5 个分片,那么它现在每秒可以从每个剩余分配的分片中处理大约两倍的事件。
-
分片排除
由于流量激增而过载的分片可以暂时排除。排除分片意味着它不会包含在未来的分片放置中,因此最终新流量不会路由到它。在极端情况下,这可以使分片从大量积压中恢复。
无状态重复数据删除
无状态 Writer 的含义是 Writer 分配的分片可以更改。这意味着其他工作人员可以(暂时)从同一分片接收事件,这可能会导致工作人员之间出现重复事件 - 除了单个分片内的重复事件之外。那么我们如何进行重复数据删除呢?
以下是一些无效的简单解决方案:
- 简单地查询 Husky 的事件 ID 以查看它是否已被写入,效率非常低。Husky 针对搜索和聚合进行了优化,但不适用于低延迟点查找,尤其是在我们的数据摄取管道运行的高吞吐量下。
- 我们不能简单地将所有看到的 ID 存储在内存中,因为它们装不下内存。此外,这并不能帮助我们处理重新启动或协调重复/冲突,当两个编写器在部署和节点重新启动期间临时处理相同的分片时,这些重复/冲突总是会发生。
我们需要某种持久性存储3来存储事件 ID,我们可以从中有效地检索和查找 ID。两个明显的候选人:
- 使用嵌入式本地键值存储,例如 RocksDB。这肯定是高效的,但它也会破坏 Writer 节点的无状态性质。
- 使用 Redis 或 Cassandra 等远程键值存储。这可能效果很好,但它增加了额外的依赖性,并且可能使得在较长的时间窗口(例如数月或数年)内进行重复数据删除变得困难。
这两种解决方案都存在另一个问题:它们在两个事实来源之间分割了一致性和正确性。Husky 的设计采用单一事实来源——元数据存储(由 FoundationDB 支持)。如果事件已在那里提交,则该事件存在;如果没有提交,则该事件不存在。使用额外的存储会增加协调两个系统之间分歧的复杂性:如果我们能够将事件 ID 提交到 Cassandra,但我们无法将事件数据本身提交到元数据存储,该怎么办?
在 Husky 中存储 ID
我们选择了混合方法来存储 ID:我们将事件 ID 保留在 Husky 本身中,但将它们存储在与原始事件数据不同的 Husky 表中。这样一下子就解决了我们最大的两个问题:
为了快速查找,事件 ID 也存储在内存中,并且我们根据需要以事件 ID 的间隔从 Husky 中延迟分页。当内存结构变得太大时,我们借助 LRU(最近最少使用)缓存清除不需要的间隔。
这是一个优雅的解决方案,原因如下:
- 元数据仍然是我们唯一的事实来源。我们在同一个原子事务中提交租户事件数据和事件 ID:因此,一旦事件数据本身和事件 ID 提交到元数据存储,我们就可以保证事件数据本身和事件 ID 之间的一致性。
- Husky 旨在长期存储大量数据,因此我们可以根据需要存储所有事件 ID。
- 事件 ID 表只是常规的 Husky 表,因此 Compactors 会自动优化它们以进行读取(由写入器分页),就像它们对所有其他“常规”事件数据表所做的那样。此外,这些 ID 表可以具有生存时间配置,就像 Husky 中的任何其他表一样,并在可配置的时间后自动过期。
当工作线程重新启动(或接收新的分片分配)时,它必须在许多事件 ID 间隔中进行分页,因为内存中没有事件 ID。在实践中,由于可观察性数据往往按大致顺序到达,因此页面调入速率比 Writer 的实际摄取速率低几个数量级。无论如何,写入器可以以比处理传入事件高约 2 个数量级的速率调入 ID。
## [下一步是什么?](https://www.datadoghq.com/blog/engineering/husky-deep-dive/#whats-next)
在这里,我们回顾了不可变但动态的上游路由与 FoundationDB 支持的冲突解决机制如何使我们能够将大量多租户数据流摄取到 Husky 中并进行重复数据删除,同时保持摄取路径的每个部分都是无状态的,可扩展且高性能。本系列的后续文章将涵盖 Husky 压缩系统和查询路径的有趣技术细节。