Presto Kafka Connector


1 概述

此连接器允许将Apache Kafka中的主题用作Presto中的表。

每条消息在Presto中显示为一行。

主题可以是实时的:行将在数据到达时出现并随着消息被丢弃而消失。如果在单个查询中多次访问同一个表(例如,执行自联接),这可能会导致异常的行为。

注意:支持Apache Kafka 2.3.1+。

2 配置Kafka连接器(Kafka Connector)

要配置Kafka连接器,请创建一个etc/catalog/kafka.properties包含以下内容的目录属性文件,并根据需要替换这些属性:

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

3 多个Kafka集群

您可以根据需要拥有任意数量的目录,因此如果您有其他Kafka集群,只需添加另一个etc/catalog具有不同名称的属性文件(确保以结尾.properties)。例如,如果您将属性文件命名为sales.properties,Presto将创建一个sales使用配置的连接器命名的目录。

4 配置属性

以下配置属性可用:

Preoperty名称

描述

kafka.table-names

目录提供的所有表的列表

kafka.default-schema

表的默认架构名称

kafka.nodes

Kafka集群中的节点列表

kafka.connect-timeout

连接到Kafka集群超时

kafka.max-poll-records

每次投票的最大记录数

kafka.max-partition-fetch-bytes

每次轮询一个分区的最大字节数

kafka.table-description-dir

包含主题描述文件的目录

kafka.hide-internal-columns

控制内部列是否是表架构的一部分

  • kafka.table-names

此目录提供的所有表的逗号分隔列表。表名可以是未限定的(简单名称),并将被放入默认模式(见下文)或使用模式名称(<schema-name>.<table-name>)进行限定。

对于此处定义的每个表,可能存在一个表描述文件(见下文)。如果不存在表描述文件,则在Kafka上以表名作为主题名,没有数据列映射到表中。该表仍将包含所有内部列(见下文)。

此属性是必需的;没有默认值,必须至少定义一张表。

  • kafka.default-schema

定义架构,该架构将包含所有在没有限定架构名称的情况下定义的表。

此属性是可选的;默认为default

  • kafka.nodes

hostname:portKafka数据节点的逗号分隔列表。

此属性是必需的;没有默认值,必须至少定义一个节点。

注意:即使这里只指定了一个子集,Presto仍然必须能够连接到集群的所有节点,因为消息可能仅位于特定节点上。

  • kafka.connect-timeout

连接到数据节点的超时。一个繁忙的Kafka集群在接受连接之前可能需要相当长的时间;当看到由于超时而失败的查询时,增加这个值是一个很好的策略。

此属性是可选的;默认值为10秒(10s)。

  • kafka.max-poll-records

来自Kafka的每个poll的最大记录数。

此属性是可选的;默认为500

  • kafka.max-partition-fetch-bytes

每次轮询一个分区的最大字节数

此属性是可选的;默认为1MB。

  • kafka.table-description-dir

引用Presto部署中的一个文件夹,其中包含一个或多个.json包含表描述文件的JSON文件(必须以结尾)。

此属性是可选的;默认为etc/kafka。

  • kafka.hide-internal-columns

除了表描述文件中定义的数据列之外,连接器还为每个表维护了许多附加列。如果这些列被隐藏,它们仍然可以在查询中使用,但不会显示在或中。DESCRIBE<table-name>SELECT*

此属性是可选的;默认为true。

5 内部列

对于每个定义的表,连接器维护以下列:

列名

类型

描述

_partition_id

大数据

包含此行的Kafka分区的ID。

_partition_offset

大数据

此行在Kafka分区内的偏移量。

_message_corrupt

布尔值

如果解码器无法解码此行的消息,则为真。如果为true,则从消息映射的数据列应视为无效。

_message

VARCHAR

消息字节为UTF-8编码字符串。这仅对文本主题有用。

_message_length

大数据

消息中的字节数。

_key_corrupt

布尔值

如果密钥解码器无法解码此行的密钥,则为真。如果为true,则从键映射的数据列应视为无效。

_key

VARCHAR

关键字节作为UTF-8编码字符串。这仅对文本键有用。

_key_length

大数据

密钥中的字节数。

对于没有表定义文件的表,_key_corrupt和_message_corrupt列将始终为false。、

6 表定义文件

Kafka仅将主题作为字节消息来维护,并将其留给生产者和消费者来定义应如何解释消息。对于Presto,必须将此数据映射到列中以允许对数据进行查询。

注意:

对于包含JSON数据的文本主题,完全可以不使用任何表定义文件,而是使用PrestoJSON函数和运算符来解析_message包含映射到UTF-8字符串的字节的列。但是,这非常麻烦,并且很难编写SQL查询。

表定义文件由表的JSON定义组成。

{
"tableName":...,
"schemaName":...,
"topicName":...,
"key":{
"dataFormat":...,
"fields":[
...
]
},
"message":{
"dataFormat":...,
"fields":[
...
]
}
}

区域

必需的

类型

描述

tableName

必需的

细绳

此文件定义的Presto表名。

schemaName

可选的

细绳

将包含表的架构。如果省略,则使用默认架构名称。

topicName

必需的

细绳

映射的Kafka主题。

key

可选的

JSON对象

映射到消息键的数据列的字段定义。

message

可选的

JSON对象

映射到消息本身的数据列的字段定义。

7 Kafka中的Key和Message

从Kafka0.8开始,主题中的每条消息都可以有一个可选的键。表定义文件包含用于将数据映射到表列的键和消息部分。

表定义中的每个key和message字段都是一个JSON对象,必须包含两个字段:

区域

必需的

类型

描述

dataFormat

必需的

细绳

为这组字段选择解码器。

fields

必需的

JSON数组

字段定义列表。每个字段定义都会在Presto表中创建一个新列。

每个字段定义都是一个JSON对象:

{
"name":...,
"type":...,
"dataFormat":...,
"mapping":...,
"formatHint":...,
"hidden":...,
"comment":...
}

区域

必需的

类型

描述

name

必需的

细绳

Presto表中列的名称。

type

必需的

细绳

列的Presto类型。

dataFormat

可选的

细绳

选择此字段的列解码器。默认为此行数据格式和列类型的默认解码器。

dataSchema

可选的

细绳

Avro架构所在的路径或URL。仅用于Avro解码器。

mapping

可选的

细绳

列的映射信息。这是特定于解码器的,见下文。

formatHint

可选的

细绳

为列解码器设置特定于列的格式提示。

hidden

可选的

布尔值

从和隐藏列。默认为.DESCRIBE<tablename>SELECT*false

comment

可选的

细绳

添加显示为的列注释。DESCRIBE<tablename>

键或消息的字段描述没有限制。

8 行解码

对于键和消息,解码器用于将消息和键数据映射到表列。

  • Kafka连接器包含以下解码器:
  • raw-不解释Kafka消息,原始消息字节范围映射到表列
  • csv-Kafka消息被解释为逗号分隔的消息,字段映射到表列
  • json-Kafka消息被解析为JSON并且JSON字段被映射到表列
  • avro-基于Avro模式解析Kafka消息,并将Avro字段映射到表列

注意:如果表不存在表定义文件,dummy则使用不公开任何列的解码器。

9 raw解码器

原始解码器支持从Kafka消息或密钥读取原始(基于字节的)值并将其转换为Presto列。

  • 对于字段,支持以下属性:
  • dataFormat-选择转换的数据类型的宽度
  • type-Presto数据类型(请参阅下表以获取支持的数据类型列表)
  • mapping-<start>[:<end>];要转换的字节的开始和结束位置(可选)

该dataFormat属性选择转换的字节数。如果不存在,BYTE则假定。所有值都已签名。

支持的值是:

  • BYTE-一个字节
  • SHORT-两个字节(大端)
  • INT-四个字节(大端)
  • LONG-八字节(大端)
  • FLOAT-四字节(IEEE754格式)
  • DOUBLE-八字节(IEEE754格式)

该type属性定义了值映射到的Presto数据类型。

根据分配给列的Presto类型,可以使用不同的dataFormat值:

Presto数据类型

允许dataFormat

BIGINT

BYTE,SHORT,INT,LONG

INTEGER

BYTE,SHORT,INT

SMALLINT

BYTE,SHORT

TINYINT

BYTE

DOUBLE

DOUBLE,FLOAT

BOOLEAN

BYTE,SHORT,INT,LONG

VARCHAR/VARCHAR(x)

BYTE

该mapping属性指定用于解码的密钥或消息中的字节范围。它可以是由冒号(<start>[:<end>])分隔的一个或两个数字。

如果只给出起始位置:

对于固定宽度类型,列将使用指定的适当字节dateFormat(见上文)。
VARCHARvalue被解码时,将从开始位置到消息结束的所有字节都将被使用。

如果给出开始和结束位置,则:

对于固定宽度类型,大小必须等于指定的字节数dataFormat
使用VARCHAR开始(包含)和结束(不包含)之间的所有字节。

如果未mapping指定任何属性,则等效于将开始位置设置为0而未定义结束位置。

数字数据类型(BIGINT,INTEGER,SMALLINT,TINYINT,DOUBLE)的解码方案很简单。从输入消息中读取字节序列并根据以下任一条件进行解码:

大端编码(对于整数类型)
IEEE754格式(forDOUBLE)

解码字节序列的长度由dataFormat.

对于VARCHAR数据类型,根据UTF-8编码解释字节序列。

10 csv解码器

CSV解码器使用UTF-8编码将表示消息或密钥的字节转换为字符串,然后将结果解释为CSV(逗号分隔值)行。

对于字段,必须定义type和mapping属性:

  • type-Presto数据类型(请参阅下表以获取支持的数据类型列表)
  • mapping-CSV记录中字段的索引

dataFormat和formatHint不受支持,必须省略。

下表列出了可用于type解码方案的受支持的Presto类型:

Presto数据类型

解码规则

BIGINT

INTEGER

SMALLINT

TINYINT

使用Java解码Long.parseLong()

DOUBLE

使用Java解码Double.parseDouble()

BOOLEAN

“真”字符序列映射到true;其他字符序列映射到false

VARCHAR/VARCHAR(x)

按原样使用

11 json解码器

JSON解码器根据以下内容将表示消息或密钥的字节转换为JSONRFC4627。请注意,消息或键必须转换为JSON对象,而不是数组或简单类型。

对于字段,支持以下属性:

  • type-Presto类型的列。
  • dataFormat-用于列的字段解码器。
  • mapping-以斜线分隔的字段名称列表,用于从JSON对象中选择一个字段
  • formatHint-仅用于custom-date-time,见下

JSON解码器支持多个字段解码器,_default用于标准表列和许多基于日期和时间类型的解码器。

下表列出了Presto数据类型,可用作intype和匹配字段解码器,可通过dataFormat属性指定。

Presto数据类型

允许dataFormat

BIGINT

INTEGER

SMALLINT

TINYINT

DOUBLE

BOOLEAN

VARCHAR

VARCHAR(x)

默认字段解码器(省略dataFormat属性)

TIMESTAMP

TIMESTAMPWITHTIMEZONE

TIME

TIMEWITHTIMEZONE

custom-date-time,iso8601,rfc2822,milliseconds-since-epoch,seconds-since-epoch

DATE

custom-date-time,iso8601,rfc2822,

12 默认字段解码器

这是支持所有Presto物理数据类型的标准字段解码器。字段值将被JSON转换规则强制转换为boolean、long、double或string值。对于非基于日期/时间的列,应使用此解码器。

13 日期和时间解码器

从JSON对象值变换为的PrestoDATE,TIME,或列,解码器特别必须使用所选择的一个字段定义的属性。TIMEWITHTIMEZONE`,“TIMESTAMPTIMESTAMPWITHTIMEZONEdataFormat

  • iso8601-基于文本,将文本字段解析为ISO8601时间戳。
  • rfc2822-基于文本,将文本字段解析为RFC2822时间戳

custom-date-time-基于文本,根据Joda格式模式解析文本字段

  • 过formatHint属性指定。
  • milliseconds-since-epoch-基于数字,将文本或数字解释为自纪元以来的毫秒数。
  • seconds-since-epoch-基于数字,将文本或数字解释为自纪元以来的毫秒数。

对于和数据类型,如果解码值中存在时区信息,则它将在Presto值中使用。否则结果时区将设置为:TIMESTAMPWITHTIMEZONETIMEWITHTIMEZONEUTC。

14 avro解码器

Avro解码器根据模式转换表示Avro格式的消息或密钥的字节。该消息必须嵌入了Avro架构。Presto不支持无模式Avro解码。

对于密钥/消息,使用avro解码器,dataSchema必须定义。这应该指向需要解码的消息的有效Avro模式文件的位置。此位置可以是远程Web服务器或本地文件系统。如果无法从Presto协调器节点访问此位置,则解码器将失败。dataSchema:’http://example.org/schema/avro_data.avsc’dataSchema:’/usr/local/schema/avro_data.avsc’

对于字段,支持以下属性:

  • name-Presto表中列的名称。
  • type-Presto类型的列。
  • mapping-以斜线分隔的字段名称列表,用于从Avro架构中选择一个字段。如果mapping在原始Avro架构中不存在指定的字段,则读取操作将返回NULL。

下表列出了可用于type等效Avro字段类型的受支持Presto类型。

Presto数据类型

允许的Avro数据类型

BIGINT

INT,LONG

DOUBLE

DOUBLE,FLOAT

BOOLEAN

BOOLEAN

VARCHAR/VARCHAR(x)

STRING

VARBINARY

FIXED,BYTES

ARRAY

ARRAY

MAP

MAP

15 Avro模式演变

Avro解码器支持具有向后兼容性的模式演化功能。通过向后兼容,可以使用较新的架构来读取使用较旧架构创建的Avro数据。Avro架构中的任何更改也必须反映在Presto的主题定义文件中。新添加/重命名的字段必须在Avro架构文件中具有默认值。

模式演化行为如下:

  • 在新模式中添加的列:当表使用新模式时,使用旧模式创建的数据将产生默认值。
  • 在新模式中删除的列:使用旧模式创建的数据将不再从已删除的列中输出数据。
  • 列在新模式中重命名:这相当于删除列并添加新列,当表使用新模式时,使用旧模式创建的数据将产生默认值
  • 更改新模式中的列类型:如果Avro支持类型强制,则转换发生。不兼容的类型会引发错误。