MATERIALIZED VIEW 关联到该引擎时,S3Queue 表引擎就会开始在后台收集数据。
CREATE 表
S3Queue 的参数与 S3 表引擎支持的参数相同。请参见此处的参数部分。
示例
设置
system.s3_queue_settings 表。该功能从 24.10 版本开始可用。
设置名称 (24.7+) 从 24.7 版本开始,S3Queue 设置既可以使用
s3queue_ 前缀指定,也可以不使用:- 现代语法 (24.7+) :
processing_threads_num、tracked_file_ttl_sec等。 - 旧语法 (所有版本) :
s3queue_processing_threads_num、s3queue_tracked_file_ttl_sec等。
模式
- unordered — 在 unordered 模式下,会通过 ZooKeeper 中的持久节点跟踪所有已处理文件的集合。
- ordered — 在有序模式下,文件按字典序处理。这意味着,如果某个名为 ‘BBB’ 的文件已在某个时间点被处理,之后又有一个名为 ‘AA’ 的文件被添加到存储桶中,那么它会被忽略。ZooKeeper 中只会存储已成功消费文件里名称最大的文件名 (按字典序) ,以及因加载失败而需要重试的文件名。
ordered。从 24.6 开始不再提供默认值,必须手动指定该设置。对于在更早版本中创建的表,出于兼容性考虑,默认值将继续保持为 Ordered。
after_processing
- keep。
- delete。
- move。
- tag。
keep。
移动需要额外设置。如果是在同一个 bucket 内移动,则必须通过 after_processing_move_prefix 提供新的 path prefix。
移动到另一个 S3 bucket 时,需要通过 after_processing_move_uri 指定目标 bucket URI,并通过 after_processing_move_access_key_id 和 after_processing_move_secret_access_key 提供 S3 凭证。
示例:
after_processing_move_connection_string,并将容器名称指定为 after_processing_move_container。请参阅 AzureQueue 设置。
添加标签时,需要通过 after_processing_tag_key 和 after_processing_tag_value 提供标签键和值。
after_processing_retries
- 非负整数。
10。
after_processing_move_access_key_id
- String。
after_processing_move_prefix
- String。
after_processing_move_preserve_path
true,在移动已成功处理的文件时,会将完整的源对象路径追加到 after_processing_move_prefix 后面,从而在目标端保留 bucket 下的源目录结构。如果为 false,则只使用文件名,源目录结构会被展平。
可能的值:
true/false。
false。
after_processing_move_secret_access_key
- String。
after_processing_move_uri
- String。
after_processing_tag_key
after_processing='tag' 时,用于为处理成功的文件添加标签的标签键。
可能的值:
- String。
after_processing_tag_value
after_processing='tag' 时,用于为成功处理的文件添加标签的标签值。
可选值:
- String。
keeper_path
s3queue_default_zookeeper_path、数据库 UUID 和表 UUID 构建该路径。绝对路径值 (以 / 开头) 会按原样使用,而相对路径值会追加到已配置的前缀后。诸如 {database} 或 {uuid} 这样的宏会在引擎连接到 ZooKeeper 之前展开。
如需指定辅助 ZooKeeper cluster,请在该值前加上已配置的名称前缀,例如 analytics_keeper:/clickhouse/queue/orders。该名称必须存在于 <auxiliary_zookeepers> 中;否则引擎会报错 Unknown auxiliary ZooKeeper name ...。完整字符串 (包括前缀) 会保留在 SHOW CREATE TABLE 中,因此该语句可以被原样复制。
可能的值:
- String。
/。
loading_retries
- 非负整数。
10。
processing_threads_num
Unordered 模式。
默认值:CPU 数量或 16。
parallel_inserts
processing_threads_num 只会产生一个 INSERT,因此只会用多线程下载文件并进行解析。
但这会限制并行度,因此为了获得更好的吞吐量,建议使用 parallel_inserts=true,这样就可以并行写入数据 (但请注意,这会导致 MergeTree 家族生成更多的数据分区片段) 。
INSERT 会根据 max_process*_before_commit 设置来创建。false。
enable_logging_to_queue_log
system.s3queue_log 写入日志。
默认值:1。
polling_min_timeout_ms
- 正整数。
1000。
polling_max_timeout_ms
- 正整数。
600000。
polling_backoff_ms
- 正整数。
30000。
tracked_files_limit
- 正整数。
1000。
tracked_file_ttl_sec
- 正整数。
0。
cleanup_interval_min_ms
60000。
cleanup_interval_max_ms
60000。
buckets
24.6 起可用。如果 S3Queue 表存在多个副本,且每个副本都使用 Keeper 中相同的元数据目录,那么 buckets 的值至少应等于副本数量。如果同时还使用了 processing_threads 设置,则通常还应进一步增大 buckets 的值,因为它决定了 S3Queue 处理的实际并行度。
use_persistent_processing_nodes
persistent_processing_node_ttl_seconds
use_persistent_processing_nodes,则可能会遗留未被移除的处理中节点。此设置定义了一个时间段,超过该时间后,可以安全地清理这些处理中节点。同样的生存时间 (TTL) 也用于 Ordered 模式下的存储桶锁,而该锁的持有时间可能会长于单个处理中节点,因此设置该值时也应将这一点考虑在内。
默认值:21600 (6 小时) 。
S3 相关设置
S3 基于角色的访问
extra_credentials 参数传入 roleARN,如下所示:
S3Queue ordered 模式
S3Queue 处理模式可以在 ZooKeeper 中存储更少的元数据,但有一个限制:按时间后添加的文件,其名称按字母数字顺序必须更大。
S3Queue 的 ordered 模式和 unordered 模式都支持 (s3queue_)processing_threads_num 设置 (s3queue_ 前缀可选) ,可用于控制在服务器本地处理 S3 文件的线程数。
对于不带分区的 ordered 模式,ClickHouse 可以从上一个已处理的键继续列出 S3 内容,以避免重新列出整个前缀下的历史内容。在按桶划分的 ordered 模式中,为避免跳过未处理的文件,恢复点会保守地选择为所有桶中最小的已处理键。
这种恢复列出优化仅用于不带分区的 ordered 模式下、以 S3 为后端的队列 (不适用于 AzureQueue,也不适用于设置了 partitioning_mode 的情况) 。
此外,ordered 模式还引入了另一个名为 (s3queue_)buckets 的设置,表示“逻辑线程”。也就是说,在分布式场景下,当存在多个带有 S3Queue 表副本的服务器时,该设置定义了处理单元的数量。例如,每个 S3Queue 副本上的每个处理线程都会尝试锁定某个 bucket 进行处理,而每个 bucket 会根据文件名的哈希分配到特定文件。因此,在分布式场景中,强烈建议将 (s3queue_)buckets 设置为至少等于副本数,或更大。桶的数量大于副本数也是完全没问题的。最理想的情况是,(s3queue_)buckets 设置等于 number_of_replicas 与 (s3queue_)processing_threads_num 的乘积。
不建议在 24.6 版本之前使用 (s3queue_)processing_threads_num 设置。
(s3queue_)buckets 设置从 24.6 版本开始可用。
从 S3Queue 表引擎中 SELECT
stream_like_engine_allow_direct_select 设为 True。
S3Queue 引擎针对 SELECT 查询有一个特殊设置:commit_on_select。将其设为 False 可在读取后保留队列中的数据,设为 True 则会将其移除。
描述
SELECT 对流式导入并没有太大用处 (调试除外) ,因为每个文件只能导入一次。更实用的做法是使用 materialized views 创建实时处理线程。为此:
- 使用该 engine 创建一个表,用于从 S3 中指定路径消费数据,并将其视为数据 stream。
- 创建一个具有所需结构的表。
- 创建一个 materialized view,将来自该 engine 的数据转换后写入先前创建的表。
MATERIALIZED VIEW 关联到该 engine 后,它就会开始在后台收集数据。
示例:
虚拟列
_path— 文件路径。_file— 文件名。_size— 文件大小。_time— 文件创建时间。
路径中的通配符
path 参数可以使用类似 bash 的通配符来指定多个文件。要被处理的文件必须存在,并且与整个路径模式匹配。文件列表是在执行 SELECT 时确定的 (而不是在 CREATE 时) 。
*— 匹配任意数量的任意字符,但不包括/,也可以是空字符串。**— 匹配任意数量的任意字符,包括/,也可以是空字符串。?— 匹配任意单个字符。{some_string,another_string,yet_another_one}— 匹配字符串'some_string'、'another_string'、'yet_another_one'中的任意一个。{N..M}— 匹配从 N 到 M 范围内的任意数字,包括两个端点。N 和 M 可以带前导零,例如000..078。
{} 的写法与 remote 表函数类似。
限制
- 出现重复行可能是由以下原因造成的:
-
在文件处理过程中,解析在中途发生异常,且已通过
s3queue_loading_retries启用重试; -
S3Queue配置在多个服务器上,并指向 zookeeper 中的同一路径,而某台服务器尚未来得及提交已处理文件时,keeper 会话就已过期,这可能导致另一台服务器接手处理该文件,而该文件可能已经被第一台服务器部分或全部处理;不过,如果设置了use_persistent_processing_nodes = 1,则从 25.8 版本起不再存在这一问题。 - 服务器异常终止。
- 如果
S3Queue配置在多个服务器上并指向 zookeeper 中的同一路径,同时使用了Ordered模式,则s3queue_loading_retries将不起作用。该问题很快会得到修复。
内部信息
system.s3queue_metadata_cache 和持久化表 system.s3queue_log。
system.s3queue_metadata_cache。此表不是持久化的,用于显示S3Queue的内存状态:当前正在处理哪些文件,以及哪些文件已处理或处理失败。
system.s3queue_log。持久化表。包含与system.s3queue_metadata_cache相同的信息,但针对processed和failed文件。
system.s3queue_log,请在服务器配置文件中定义相应配置: