Passer au contenu principal
Moteur Distributed dans CloudPour créer un moteur de table distribué dans ClickHouse Cloud, vous pouvez utiliser les fonctions de table remote et remoteSecure. La syntaxe Distributed(...) ne peut pas être utilisée dans ClickHouse Cloud.
Les tables utilisant le moteur Distributed ne stockent aucune donnée en propre, mais permettent l’exécution distribuée de requêtes sur plusieurs serveurs. La lecture est automatiquement parallélisée. Lors de la lecture, les index de table sur les serveurs distants sont utilisés, s’ils existent.

Créer une table

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

À partir d’une table

Lorsque la table Distributed pointe vers une table du serveur actuel, vous pouvez reprendre le schéma de cette table :
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Paramètres de Distributed

ParamètreDescription
clusterLe nom du cluster dans le fichier de configuration du serveur
databaseLe nom d’une base de données distante
tableLe nom d’une table distante
sharding_key (Optional)La clé de sharding.
La spécification de sharding_key est nécessaire dans les cas suivants :
  • Pour les INSERTs dans une table distribuée (car le moteur de la table a besoin de sharding_key pour déterminer comment répartir les données). Cependant, si le paramètre insert_distributed_one_random_shard est activé, les INSERTs n’ont pas besoin de clé de sharding.
  • Pour l’utiliser avec optimize_skip_unused_shards, car sharding_key est nécessaire pour déterminer quels shards doivent être interrogés
policy_name (Optional)Le nom de la stratégie ; il sera utilisé pour stocker les fichiers temporaires pour l’envoi en arrière-plan
Voir aussi

Paramètres de Distributed

SettingDescriptionDefault value
fsync_after_insertEffectue un fsync des données du fichier après une insertion en arrière-plan dans Distributed. Garantit que l’OS a écrit sur le disque l’ensemble des données insérées dans un fichier sur le disque du nœud initiateur.false
fsync_directoriesEffectue un fsync des répertoires. Garantit que l’OS a actualisé les métadonnées des répertoires après les opérations liées aux insertions en arrière-plan dans une table Distributed (après l’insertion, après l’envoi des données au shard, etc.).false
skip_unavailable_shardsSi true, ClickHouse ignore silencieusement les shards indisponibles. Un shard est marqué comme indisponible lorsque : 1) le shard ne peut pas être joint en raison d’un échec de connexion. 2) le shard ne peut pas être résolu via DNS. 3) la table n’existe pas sur le shard.false
bytes_to_throw_insertSi plus que ce nombre d’octets compressés sont en attente pour un INSERT en arrière-plan, une exception est levée. 0 - ne pas lever d’exception.0
bytes_to_delay_insertSi plus que ce nombre d’octets compressés sont en attente pour un INSERT en arrière-plan, la requête est retardée. 0 - ne pas retarder.0
max_delay_to_insertDélai maximal, en secondes, pour l’insertion de données dans une table Distributed s’il y a beaucoup d’octets en attente d’envoi en arrière-plan.60
background_insert_batchIdentique à distributed_background_insert_batch0
background_insert_split_batch_on_failureIdentique à distributed_background_insert_split_batch_on_failure0
background_insert_sleep_time_msIdentique à distributed_background_insert_sleep_time_ms0
background_insert_max_sleep_time_msIdentique à distributed_background_insert_max_sleep_time_ms0
flush_on_detachVide les données vers les nœuds distants lors d’un DETACH/DROP/de l’arrêt du serveur.true
Paramètres de durabilité (fsync_...) :
  • Affectent uniquement les INSERT en arrière-plan (c.-à-d. distributed_foreground_insert=false) lorsque les données sont d’abord stockées sur le disque du nœud initiateur puis, plus tard, envoyées en arrière-plan aux shards.
  • Peuvent réduire considérablement les performances des INSERT
  • Affectent l’écriture des données stockées dans le dossier de la table distribuée sur le nœud qui a accepté votre insertion. Si vous avez besoin de garanties sur l’écriture des données dans les tables MergeTree sous-jacentes, consultez les paramètres de durabilité (...fsync...) dans system.merge_tree_settings
Pour les paramètres de limite d’insertion (..._insert), voir aussi :
Exemple
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
Les données seront lues sur tous les serveurs du cluster logs, à partir de la table default.hits présente sur chaque serveur du cluster. Les données ne sont pas seulement lues : elles sont aussi partiellement traitées sur les serveurs distants (dans la mesure du possible). Par exemple, pour une requête avec GROUP BY, les données seront agrégées sur les serveurs distants, puis les états intermédiaires des fonctions d’agrégation seront envoyés au serveur à l’origine de la requête. Les données seront ensuite agrégées plus avant. Au lieu du nom de la base de données, vous pouvez utiliser une expression constante qui renvoie une chaîne. Par exemple : currentDatabase().

Clusters

Les clusters se configurent dans le fichier de configuration du serveur:
<remote_servers>
    <logs>
        <!-- Inter-server per-cluster secret for Distributed queries
             default: no secret (no authentication will be performed)

             If set, then Distributed queries will be validated on shards, so at least:
             - such cluster should exist on the shard,
             - such cluster should have the same secret.

             And also (and which is more important), the initial_user will
             be used as current user for the query.
        -->
        <!-- <secret></secret> -->
        
        <!-- Optional. Whether distributed DDL queries (ON CLUSTER clause) are allowed for this cluster. Default: true (allowed). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- Optional. Shard weight when writing data. Default: 1. -->
            <weight>1</weight>
            <!-- Optional. The shard name.  Must be non-empty and unique among shards in the cluster. If not specified, will be empty. -->
            <name>shard_01</name>
            <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
Ici, un cluster nommé logs est défini. Il se compose de deux shards, chacun contenant deux répliques. Les shards correspondent à des serveurs qui contiennent différentes parties des données (pour lire l’ensemble des données, vous devez accéder à tous les shards). Les répliques sont des serveurs dupliqués (pour lire l’ensemble des données, vous pouvez accéder aux données sur n’importe laquelle des répliques). Les noms de cluster ne doivent pas contenir de points. Les paramètres host, port et, éventuellement, user, password, secure, compression, bind_host sont spécifiés pour chaque serveur :
ParamètreDescriptionValeur par défaut
hostAdresse du serveur distant. Vous pouvez utiliser soit le nom de domaine, soit l’adresse IPv4 ou IPv6. Si vous indiquez un domaine, le serveur effectue une requête DNS au démarrage, et le résultat est conservé tant que le serveur reste en cours d’exécution. Si la requête DNS échoue, le serveur ne démarre pas. Si vous modifiez l’enregistrement DNS, redémarrez le serveur.-
portPort TCP utilisé pour l’activité de messagerie (tcp_port dans la config, généralement défini sur 9000). À ne pas confondre avec http_port.-
userNom de l’utilisateur utilisé pour se connecter à un serveur distant. Cet utilisateur doit disposer des droits nécessaires pour se connecter au serveur spécifié. L’accès est configuré dans le fichier users.xml. Pour plus d’informations, consultez la section Droits d’accès.default
passwordMot de passe utilisé pour se connecter à un serveur distant (non masqué).
secureIndique s’il faut utiliser une connexion SSL/TLS sécurisée. Cela nécessite généralement aussi de préciser le port (le port sécurisé par défaut est 9440). Le serveur doit écouter sur <tcp_port_secure>9440</tcp_port_secure> et être configuré avec les certificats appropriés.false
compressionUtiliser la compression des données.true
bind_hostAdresse source à utiliser lors de la connexion au serveur distant depuis ce nœud. Seules les adresses IPv4 sont prises en charge. Destiné aux cas d’usage de déploiement avancés où il est nécessaire de définir l’adresse IP source utilisée par les requêtes distribuées ClickHouse.-
Lors de la spécification des répliques, l’une des répliques disponibles sera sélectionnée pour chacun des shards lors de la lecture. Vous pouvez configurer l’algorithme d’équilibrage de charge (la préférence pour déterminer à quelle réplique accéder) – voir le paramètre load_balancing. Si la connexion au serveur n’est pas établie, une tentative de connexion sera effectuée avec un délai d’attente court. Si la connexion échoue, la réplique suivante sera sélectionnée, et ainsi de suite pour toutes les répliques. Si la tentative de connexion échoue pour toutes les répliques, elle sera répétée de la même manière, plusieurs fois. Cela améliore la résilience, mais n’assure pas une tolérance aux pannes complète : un serveur distant peut accepter la connexion, mais ne pas fonctionner, ou mal fonctionner. Vous pouvez spécifier un seul shard (dans ce cas, le traitement des requêtes devrait être qualifié de remote plutôt que de distributed) ou un nombre quelconque de shards. Dans chaque shard, vous pouvez spécifier d’une à un nombre quelconque de répliques. Vous pouvez spécifier un nombre différent de répliques pour chaque shard. Vous pouvez spécifier autant de clusters que vous le souhaitez dans la configuration. Pour afficher vos clusters, utilisez la table system.clusters. Le moteur Distributed permet de travailler avec un cluster comme avec un serveur local. Cependant, la configuration du cluster ne peut pas être définie dynamiquement : elle doit être configurée dans le fichier de configuration du serveur. En général, tous les serveurs d’un cluster auront la même configuration de cluster (bien que cela ne soit pas obligatoire). Les clusters du fichier de configuration sont mis à jour à la volée, sans redémarrage du serveur. Si vous devez envoyer une requête à un ensemble inconnu de shards et de répliques à chaque fois, vous n’avez pas besoin de créer une table Distributed – utilisez plutôt la fonction de table remote. Voir la section Fonctions de table.

Écriture des données

Il existe deux méthodes pour écrire des données dans un cluster : Premièrement, vous pouvez définir quelles données écrire sur quels serveurs et effectuer l’écriture directement sur chaque shard. En d’autres termes, exécuter directement des instructions INSERT sur les tables distantes du cluster vers lesquelles pointe la table Distributed. C’est la solution la plus flexible, car vous pouvez utiliser n’importe quel schéma de sharding, même complexe en raison des exigences du domaine métier. C’est également la solution la plus optimale, puisque les données peuvent être écrites sur différents shards de façon totalement indépendante. Deuxièmement, vous pouvez exécuter des instructions INSERT sur une table Distributed. Dans ce cas, la table répartit elle-même les données insérées entre les serveurs. Pour écrire dans une table Distributed, le paramètre sharding_key doit être configuré (sauf s’il n’y a qu’un seul shard). Chaque shard peut avoir un <weight> défini dans le fichier de configuration. Par défaut, le poids est 1. Les données sont réparties entre les shards en proportion de leur poids. Tous les poids des shards sont additionnés, puis le poids de chaque shard est divisé par le total afin de déterminer la part de chaque shard. Par exemple, s’il y a deux shards et que le premier a un poids de 1 tandis que le second a un poids de 2, un tiers (1 / 3) des lignes insérées sera envoyé au premier et deux tiers (2 / 3) au second. Chaque shard peut avoir le paramètre internal_replication défini dans le fichier de configuration. Si ce paramètre est défini sur true, l’opération d’écriture sélectionne la première réplique saine et y écrit les données. Utilisez ce paramètre si les tables sous-jacentes à la table Distributed sont des tables répliquées (par exemple, n’importe lequel des moteurs de table Replicated*MergeTree). L’une des répliques de la table recevra l’écriture, puis celle-ci sera automatiquement répliquée vers les autres répliques. Si internal_replication est défini sur false (valeur par défaut), les données sont écrites sur toutes les répliques. Dans ce cas, la table Distributed réplique elle-même les données. C’est moins bien que d’utiliser des tables répliquées, car la cohérence des répliques n’est pas vérifiée et, avec le temps, elles contiendront des données légèrement différentes. Pour sélectionner le shard auquel une ligne de données est envoyée, l’expression de sharding est analysée, puis on prend le reste de sa division par le poids total des shards. La ligne est envoyée au shard correspondant au demi-intervalle des restes allant de prev_weights à prev_weights + weight, où prev_weights est le poids total des shards ayant le plus petit numéro, et weight est le poids de ce shard. Par exemple, s’il y a deux shards, et que le premier a un poids de 9 tandis que le second a un poids de 10, la ligne sera envoyée au premier shard pour les restes de l’intervalle [0, 9), et au second pour les restes de l’intervalle [9, 19). L’expression de sharding peut être n’importe quelle expression composée de constantes et de colonnes de table qui renvoie un entier. Par exemple, vous pouvez utiliser l’expression rand() pour une distribution aléatoire des données, ou UserID pour une distribution selon le reste de la division de l’ID de l’utilisateur (les données d’un même utilisateur résideront alors sur un seul shard, ce qui simplifie l’exécution de IN et JOIN par utilisateur). Si l’une des colonnes n’est pas distribuée de manière suffisamment uniforme, vous pouvez l’encapsuler dans une fonction de hachage, par exemple intHash64(UserID). Un simple reste de division est une solution limitée pour le sharding et n’est pas toujours approprié. Cela fonctionne pour des volumes de données moyens à importants (des dizaines de serveurs), mais pas pour de très grands volumes de données (des centaines de serveurs ou plus). Dans ce dernier cas, utilisez le schéma de sharding requis par le domaine métier plutôt que de vous appuyer sur des entrées dans des tables Distributed. Vous devez vous préoccuper du schéma de sharding dans les cas suivants :
  • Des requêtes nécessitent de joindre des données (IN ou JOIN) selon une clé spécifique. Si les données sont shardées selon cette clé, vous pouvez utiliser IN ou JOIN local au lieu de GLOBAL IN ou GLOBAL JOIN, ce qui est bien plus efficace.
  • Un grand nombre de serveurs est utilisé (des centaines ou plus) avec un grand nombre de petites requêtes, par exemple des requêtes portant sur les données de clients individuels (par exemple, des sites web, des annonceurs ou des partenaires). Afin que les petites requêtes n’affectent pas l’ensemble du cluster, il est judicieux de placer les données d’un même client sur un seul shard. Vous pouvez également mettre en place un sharding à deux niveaux : diviser l’ensemble du cluster en « couches », où une couche peut être composée de plusieurs shards. Les données d’un même client sont situées sur une seule couche, mais des shards peuvent être ajoutés à une couche selon les besoins, et les données y sont réparties aléatoirement. Des tables Distributed sont créées pour chaque couche, et une table distribuée partagée unique est créée pour les requêtes globales.
Les données sont écrites en arrière-plan. Lorsqu’elles sont insérées dans la table, le bloc de données est simplement écrit sur le système de fichiers local. Les données sont ensuite envoyées aux serveurs distants en arrière-plan dès que possible. La périodicité d’envoi des données est gérée par les paramètres distributed_background_insert_sleep_time_ms et distributed_background_insert_max_sleep_time_ms. Le moteur Distributed envoie séparément chaque fichier contenant des données insérées, mais vous pouvez activer l’envoi par lots des fichiers avec le paramètre distributed_background_insert_batch. Ce paramètre améliore les performances du cluster en exploitant mieux les ressources du serveur local et du réseau. Vous devez vérifier que les données ont bien été envoyées en consultant la liste des fichiers (données en attente d’envoi) dans le répertoire de la table : /var/lib/clickhouse/data/database/table/. Le nombre de threads exécutant des tâches en arrière-plan peut être défini à l’aide du paramètre background_distributed_schedule_pool_size. Si le serveur a disparu ou a subi un redémarrage brutal (par exemple, à cause d’une panne matérielle) après un INSERT dans une table Distributed, les données insérées peuvent être perdues. Si une partie de données endommagée est détectée dans le répertoire de la table, elle est déplacée vers le sous-répertoire broken et n’est plus utilisée.

Lecture des données

Lorsqu’on interroge une table Distributed, les requêtes SELECT sont envoyées à tous les shards et fonctionnent quelle que soit la façon dont les données sont réparties entre eux (elles peuvent être distribuées de manière totalement aléatoire). Lorsque vous ajoutez un nouveau shard, vous n’avez pas besoin d’y transférer les anciennes données. Vous pouvez plutôt y écrire les nouvelles données en lui attribuant un poids plus élevé : la répartition des données sera légèrement inégale, mais les requêtes continueront de fonctionner correctement et efficacement. Lorsque l’option max_parallel_replicas est activée, le traitement des requêtes est parallélisé sur toutes les répliques d’un même shard. Pour plus d’informations, consultez la section max_parallel_replicas. Pour en savoir plus sur le traitement des requêtes distribuées in et global in, consultez cette documentation.

Colonnes virtuelles

_Shard_num

_shard_num — Contient la valeur shard_num issue de la table system.clusters. Type : UInt32.
Comme les fonctions de table remote et [cluster](../../../sql-reference/table-functions/cluster.md) créent en interne une table Distributed temporaire, _shard_num` y est aussi disponible.
Voir aussi
Dernière modification le 29 juin 2026