Druid数据摄取 概述


1 前言

Druid中,所有的数据都会被分割成段文件,每个文件中最多能够有几百万行数据。在环境下加载数据的操作一般被称为“摄取”或“索引”,即从源数据系统读取数据并为该数据创建段文件的过程。

在Druid的数据摄取过程中,大多数的任务会由MiddleManager或Indexer进程来完成。

当然也有例外:Druid中页存在基于Hadoop的摄取操作,即使用Hadoop MapReduce在YARN上完成整个过程,即便如此,整个Hadoop的作业过程仍会由MiddleManager或Indexer进程来进行监控。

2 摄取方式

1)流式摄取

最推荐、也是最流行的流式摄取方法是直接从Kafka读取数据的Kafka索引服务。当然,您也可以根据需要选择Kinersis和Tranquility,下表简要比较了三种主要的流式摄取方法:

Method

Kafka

Kinesis

Tranquility

Supervisor类型

kafka

kinesis

N/A

如何工作

Druid直接从 Apache Kafka读取数据

Druid直接从Amazon Kinesis中读取数据

Tranquility, 一个独立于Druid的库,用来将数据推送到Druid

可以摄入迟到的数据

Yes

Yes

No(迟到的数据将会被基于 windowPeriod 的配置丢弃掉)

保证不重不丢(Exactly-once)

Yes

Yes

No

2)批量摄取

在使用“批量摄取”方法加载文件时,应使用一次性任务,其中有三个选项需要进行设置:index_parallel(本地并行批任务)、index_hadoop(基于hadoop)或index(本地简单批任务)。

一般来说,如果本地批处理能满足您的需要时我们建议使用它,因为设置更为简单(它不依赖于外部Hadoop集群)。但仍有一些情况是基于Hadoop的批摄取会更好,例如,当您已经有一个正在运行的Hadoop集群并希望使用现有集群的集群资源进行批摄取时,可以选择这种方式。

下表对三种任务类型进行了简单比较:

方式

本地批任务(并行)

基于Hadoop

本地批任务(简单)

任务类型

index_parallel

index_hadoop

index

并行?

如果 inputFormat 是可分割的且 tuningConfig 中的 maxNumConcurrentSubTasks > 1, 则 Yes

Yes

No,每个任务都是单线程的

支持追加或者覆盖

都支持

只支持覆盖

都支持

外部依赖

Hadoop集群,用来提交Map-Reduce任务

输入位置

任何输入数据源

任何Hadoop文件系统或者Druid数据源

任何输入数据源

文件格式

任何输入格式

任何Hadoop输入格式

任何输入格式

Rollup modes

如果 tuningConfig 中的 forceGuaranteedRollup = true, 则为Perfect(最佳rollup)

总是Perfect(最佳rollup)

如果 tuningConfig 中的 forceGuaranteedRollup = true, 则为 Perfect(最佳rollup)

分区选项

可选的有Dynamichash-based 和 range-based 三种分区方式

通过partitionsSpec中指定 hash-based 和 range-based分区

可选的有Dynamichash-based二种分区方式

3 Druid数据模型

1)数据源

与传统RDBMS中的表类似,Druid的数据存储在数据源中。

Druid提供了一个与关系模型、时间序列模型类似的数据建模系统

2)主时间戳列(Timestamp)

Druid Schema必须始终包含一个主时间戳

  • 主时间戳列的作用

主时间戳用于对数据进行分区和排序

通过主时间戳,Druid能够快速识别、检索到与其相对应的数据,此外Druid还可以根据主时间戳对数据进行基于时间的简单操作,如删除、覆盖等。

  • 主时间戳列的解析

主时间戳是基于timestampSpec进行解析的。此外,granularitySpec控制基于主时间戳的其他重要操作。无论从哪个输入字段读取主时间戳,它都将作为列名为__time的数据列存储在Druid数据源中。

  • 辅助时间戳

如果有多个时间戳列,则其他列存储可视作为辅助时间戳

3)维度(Dimensions)

维度是按数据原有格式进行存储的列,用于在查询中对指定维度进行分组、筛选或聚合。

  • 维度的设置

通过dimensionSpec配置维度。

4)指标(Metrics)

指标是以聚合形式存储的列。

用户可以指定任意指标来对数据进行聚合操作。建议在启动rollup的环境下使用,因为:

  1. 即使需要保留摘要信息,也可以将多个数据行折叠为一行。在Rollup教程中,这用于将netflow数据折叠为每(minutesrcIPdstIP)元组一行,同时保留有关总数据包和字节计数的聚合信息。
  2. 对于一些聚合器(特别是是近似聚合器)而言,即使在接收非汇总数据过程中进行局部计算,Druid也可以在查询过程中进行更加快速的计算。
  • 指标的配置

通过metricsSpec配置。

4 Rollup

1)什么是rollup

Druid可以在接收数据过程中对数据进行汇总操作,以对原始数据压缩至最小化的操作。

而Rollup则是一种汇总预聚合的形式,它可以极大程度地减少需要存储的数据大小,从而减少数据行的数量级。

但在使用rollup进行汇总数据时,我们无法对某个单独事件进行查询。

  • 当禁用rollup,Druid将按数据原样加载所有的行,且不进行任何形式的聚合。
  • 当启用rollup,那么任何具有相同维度和时间戳的行(在基于queryGranularity的截断之后)都可以在Druid中折叠或汇总为一行。
  • 系统默认的rollup模式为启用状态。
2)启用或者禁用rollup

Rollup由granularitySpec中的rollup选项进行配置。

如果你想让Druid按原样存储每条记录,而不需要任何汇总,将该值设置为false

3)最大化rollup比率

通过比较Druid中的行数与接收的事件数量,可以测量数据源的汇总率,汇总率越高说明rollup越奏效。

汇总率可以通过Druid SQL语句来查询得到:

SELECT SUM("cnt") / COUNT(*) * 1.0 FROM datasource

在这个查询中,cnt引用在摄取数据时所指定的“count”类型指标。

最大化rollup过程中需要注意的地方:

  • 一般来说,维度越少,其基数也越低。
  • 为了不对rollup比率造成负面影响,可以使用Sketches来避免存储高基数维度
  • 在摄入时调整queryGranularity,例如使用PT5M而不是PT1M会增加Druid中两行具有匹配时间戳的可能性,并可以提高汇总率。
  • 将相同的数据加载到多个Druid数据源中是有益的。有些用户选择创建禁用rollup(或启用rollup,但汇总率设为最小)的“完整”数据源和具有较少维度和较高汇总率的“压缩”数据源。当查询请求只涉及“压缩”数据集的维度时,查询过程能够在更快的时间内完成——这种方案只需稍微增加存储空间即可完成。
  • 如果您使用的best-effort rollup,摄取配置不能保证完全汇总(perfect rollup),则可以通过切换到有保障的完全汇总选项,或在初始摄取后在后台重新编制(reindex)数据索引,从而潜在提高汇总率。
5)perfect rollup VS best-effort rollup
  • perfect rollup

使用该rollup方式意味着输入数据在摄取时会被“完美”地聚合。

  • best-effort rollup

使用该rollup方式意味着输入的数据可能无法完全聚合,因此可能有多个段保存了具有相同时间戳维度值的行。一般而言,选择best-effort rollup的用户是出于一定目的的,他们想要获取没有经过清晰步骤的数据。

5 分区

1)为什么分区

通常情况下,对数据进行分区能够优化压缩性能,往往会提高查询性能

Druid中支持对指定时间块儿内的段文件做进一步的分区。一般而言,使用指定维度进行辅助分区能够改善一定的操作性能,这意味着具有该相同维度值的行将存储在一起并支持快速访问。

2)如何设置分区

并不是所有的摄入方式都支持显式的分区配置,也不是所有的方法都具有同样的灵活性。

在当前的Druid版本中,如果您是通过一个不太灵活的方法(如Kafka)进行初始摄取,那么您可以使用重新索引的技术(reindex),在初始摄取数据后再对其进行重新分区。这是一种强大的技术:即使您不断地从流中添加新数据, 也可以使用它来确保任何早于某个阈值的数据都得到最佳分区。

下表显示了每个摄取方法如何处理分区:

方法

如何工作

本地批

通过tuningConfig中的partitionsSpec

Hadoop批

通过tuningConfig中的partitionsSpec

Kafka索引服务

Druid中的分区是由Kafka主题的分区方式决定的。您可以在初次摄入后 重新索引的技术(reindex)以重新分区

Kinesis索引服务

Druid中的分区是由Kinesis流的分区方式决定的。您可以在初次摄入后 重新索引的技术(reindex)以重新分区

6 摄入规范

无论使用哪一种摄入方式,数据要么是通过一次性tasks或者通过持续性的supervisor(运行并监控一段时间内的一系列任务)来被加载到Druid中。

在任意一种情况下,task或者supervisor都需要在摄入规范中进行定义。

摄入规范包括以下三个主要的部分:

  • dataSchema, 包含了 数据源名称主时间戳列维度指标 和 转换与过滤
  • ioConfig, 该部分告诉Druid如何去连接数据源系统以及如何去解析数据。
  • tuningConfig, 该部分控制着每一种摄入方法的不同的特定调整参数

一个 index_parallel 类型任务的示例摄入规范如下:

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "wikipedia",
      "timestampSpec": {
        "column": "timestamp",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          { "type": "string", "page" },
          { "type": "string", "language" },
          { "type": "long", "name": "userId" }
        ]
      },
      "metricsSpec": [
        { "type": "count", "name": "count" },
        { "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
        { "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
      ],
      "granularitySpec": {
        "segmentGranularity": "day",
        "queryGranularity": "none",
        "intervals": [
          "2013-08-31/2013-09-01"
        ]
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "local",
        "baseDir": "examples/indexing/",
        "filter": "wikipedia_data.json"
      },
      "inputFormat": {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [
            { "type": "path", "name": "userId", "expr": "$.user.id" }
          ]
        }
      }
    },
    "tuningConfig": {
      "type": "index_parallel"
    }
  }
}
  • dataSchema

dataSchema 包含了以下部分:数据源名称、主时间戳列、维度、指标、转换与过滤

一个dataSchema的示例如下:

"dataSchema": {
  "dataSource": "wikipedia",
  "timestampSpec": {
    "column": "timestamp",
    "format": "auto"
  },
  "dimensionsSpec": {
    "dimensions": [
      { "type": "string", "page" },
      { "type": "string", "language" },
      { "type": "long", "name": "userId" }
    ]
  },
  "metricsSpec": [
    { "type": "count", "name": "count" },
    { "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
    { "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
  ],
  "granularitySpec": {
    "segmentGranularity": "day",
    "queryGranularity": "none",
    "intervals": [
      "2013-08-31/2013-09-01"
    ]
  }
}
  • dataSource

dataSource位于dataSchema -> dataSource中,简单的标识了数据将被写入的数据源的名称,示例如下:

"dataSource": "my-first-datasource"
  • timestampSpec

timestampSpec位于dataSchema -> timestampSpec中,用来配置主时间戳,示例如下:

"timestampSpec": {
  "column": "timestamp",
  "format": "auto"
}

timestampSpec可以包含以下的部分:

字段

描述

默认值

column

要从中读取主时间戳的输入行字段。

不管这个输入字段的名称是什么,主时间戳总是作为一个名为”__time”的列存储在您的Druid数据源中

timestamp

format

时间戳格式,可选项有:

  • iso: 使用”T”分割的ISO8601,像”2000-01-01T01:02:03.456″
  • posix: 自纪元以来的秒数
  • millis: 自纪元以来的毫秒数
  • micro: 自纪元以来的微秒数
  • nano: 自纪元以来的纳秒数
  • auto: 自动检测ISO或者毫秒格式
  • 任何Joda DateTimeFormat字符串

auto

missingValue

用于具有空或缺少时间戳列的输入记录的时间戳。应该是ISO8601格式,如“2000-01-01T01:02:03.456”。由于Druid需要一个主时间戳,因此此设置对于接收根本没有任何时间戳的数据集非常有用。

none

  • dimensionSpec

dimensionsSpec位于dataSchema -> dimensionsSpec,用来配置维度。示例如下:

 "dimensionsSpec" : {
  "dimensions": [
    "page",
    "language",
    { "type": "long", "name": "userId" }
  ],
  "dimensionExclusions" : [],
  "spatialDimensions" : []
}

dimensionsSpec可以包括以下部分:

字段

描述

默认值

dimensions

维度名称或者对象的列表,在 dimensions 和 dimensionExclusions 中不能包含相同的列。

如果该配置为一个空数组,Druid将会把所有未出现在 dimensionExclusions 中的非时间、非指标列当做字符串类型的维度列。

[]

dimensionExclusions

在摄取中需要排除的列名称,在该配置中只支持名称,不支持对象。在 dimensions 和 dimensionExclusions 中不能包含相同的列。

[]

spatialDimensions

一个空间维度的数组

[]

  • Dimension objects

在dimensions列的每一个维度可以是一个名称,也可以是一个对象。 提供一个名称等价于提供了一个给定名称的string类型的维度对象。例如: page 等价于 {“name”: “page”, “type”: “string”}。

维度对象可以有以下的部分:

字段

描述

默认值

type

stringlongfloat 或者 double

string

name

维度名称,将用作从输入记录中读取的字段名,以及存储在生成的段中的列名。

注意: 如果想在摄取的时候重新命名列,可以使用 transformSpec

none(必填)

createBitmapIndex

对于字符串类型的维度,是否应为生成的段中的列创建位图索引。创建位图索引需要更多存储空间,但会加快某些类型的筛选(特别是相等和前缀筛选)。仅支持字符串类型的维度。

true

  • Inclusions and exclusions

Druid以两种可能的方式来解释 dimensionsSpec : normal 和 schemaless

当 dimensions 或者 spatialDimensions 为非空时, 将会采用正常的解释方式。 在该情况下, 前边说的两个列表结合起来的集合当做摄入的维度集合。

当 dimensions 和 spatialDimensions 同时为空或者null时候,将会采用无模式的解释方式。 在该情况下,维度集合由以下方式决定:

  1. 首先,从 inputFormat (或者 flattenSpec, 如果正在使用 )中所有输入字段集合开始
  2. 排除掉任何在 dimensionExclusions 中的列
  3. 排除掉在 timestampSpec 中的时间列
  4. 排除掉 metricsSpec 中用于聚合器输入的列
  5. 排除掉 metricsSpec 中任何与聚合器同名的列
  6. 所有的其他字段都被按照默认配置摄入为 string 类型的维度
  • metricsSpec

metricsSpec位于dataSchema -> metricsSpec中,是一个在摄入阶段要应用的聚合器列表。 在启用了rollup时是很有用的,因为它将配置如何在摄入阶段进行聚合。

一个metricsSpec实例如下:

"metricsSpec": [
  { "type": "count", "name": "count" },
  { "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
  { "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
]
  • granularitySpec

granularitySpec位于dataSchema -> granularitySpec, 用来配置以下操作:

  1. 通过 segmentGranularity 来将数据源分区到时间块
  2. 如果需要的话,通过 queryGranularity 来截断时间戳
  3. 通过 interval 来指定批摄取中应创建段的时间块
  4. 通过 rollup 来指定是否在摄取时进行汇总
  5. 除了 rollup, 这些操作都是基于主时间戳列

一个granularitySpec实例如下:

"granularitySpec": {
  "segmentGranularity": "day",
  "queryGranularity": "none",
  "intervals": [
    "2013-08-31/2013-09-01"
  ],
  "rollup": true
}
  • transformSpec

transformSpec位于dataSchema -> transformSpec,用来摄取时转换和过滤输入数据。 一个transformSpec实例如下:

"transformSpec": {
  "transforms": [
    { "type": "expression", "name": "countryUpper", "expression": "upper(country)" }
  ],
  "filter": {
    "type": "selector",
    "dimension": "country",
    "value": "San Serriffe"
  }
}

除了这些属性之外,每个摄取方法都有自己的特定调整属性