Créer une table
À partir d’une table
Distributed pointe vers une table du serveur actuel, vous pouvez reprendre le schéma de cette table :
Paramètres de Distributed
| Paramètre | Description |
|---|---|
cluster | Le nom du cluster dans le fichier de configuration du serveur |
database | Le nom d’une base de données distante |
table | Le nom d’une table distante |
sharding_key (Optional) | La clé de sharding. La spécification de sharding_key est nécessaire dans les cas suivants :
|
policy_name (Optional) | Le nom de la stratégie ; il sera utilisé pour stocker les fichiers temporaires pour l’envoi en arrière-plan |
- le paramètre distributed_foreground_insert
- MergeTree pour des exemples
Paramètres de Distributed
| Setting | Description | Default value |
|---|---|---|
fsync_after_insert | Effectue un fsync des données du fichier après une insertion en arrière-plan dans Distributed. Garantit que l’OS a écrit sur le disque l’ensemble des données insérées dans un fichier sur le disque du nœud initiateur. | false |
fsync_directories | Effectue un fsync des répertoires. Garantit que l’OS a actualisé les métadonnées des répertoires après les opérations liées aux insertions en arrière-plan dans une table Distributed (après l’insertion, après l’envoi des données au shard, etc.). | false |
skip_unavailable_shards | Si true, ClickHouse ignore silencieusement les shards indisponibles. Un shard est marqué comme indisponible lorsque : 1) le shard ne peut pas être joint en raison d’un échec de connexion. 2) le shard ne peut pas être résolu via DNS. 3) la table n’existe pas sur le shard. | false |
bytes_to_throw_insert | Si plus que ce nombre d’octets compressés sont en attente pour un INSERT en arrière-plan, une exception est levée. 0 - ne pas lever d’exception. | 0 |
bytes_to_delay_insert | Si plus que ce nombre d’octets compressés sont en attente pour un INSERT en arrière-plan, la requête est retardée. 0 - ne pas retarder. | 0 |
max_delay_to_insert | Délai maximal, en secondes, pour l’insertion de données dans une table Distributed s’il y a beaucoup d’octets en attente d’envoi en arrière-plan. | 60 |
background_insert_batch | Identique à distributed_background_insert_batch | 0 |
background_insert_split_batch_on_failure | Identique à distributed_background_insert_split_batch_on_failure | 0 |
background_insert_sleep_time_ms | Identique à distributed_background_insert_sleep_time_ms | 0 |
background_insert_max_sleep_time_ms | Identique à distributed_background_insert_max_sleep_time_ms | 0 |
flush_on_detach | Vide les données vers les nœuds distants lors d’un DETACH/DROP/de l’arrêt du serveur. | true |
Paramètres de durabilité (
fsync_...) :- Affectent uniquement les
INSERTen arrière-plan (c.-à-d.distributed_foreground_insert=false) lorsque les données sont d’abord stockées sur le disque du nœud initiateur puis, plus tard, envoyées en arrière-plan aux shards. - Peuvent réduire considérablement les performances des
INSERT - Affectent l’écriture des données stockées dans le dossier de la table distribuée sur le nœud qui a accepté votre insertion. Si vous avez besoin de garanties sur l’écriture des données dans les tables MergeTree sous-jacentes, consultez les paramètres de durabilité (
...fsync...) danssystem.merge_tree_settings
..._insert), voir aussi :- le paramètre
distributed_foreground_insert - le paramètre
prefer_localhost_replica bytes_to_throw_insertest traité avantbytes_to_delay_insert, vous ne devez donc pas le définir sur une valeur inférieure àbytes_to_delay_insert
logs, à partir de la table default.hits présente sur chaque serveur du cluster. Les données ne sont pas seulement lues : elles sont aussi partiellement traitées sur les serveurs distants (dans la mesure du possible). Par exemple, pour une requête avec GROUP BY, les données seront agrégées sur les serveurs distants, puis les états intermédiaires des fonctions d’agrégation seront envoyés au serveur à l’origine de la requête. Les données seront ensuite agrégées plus avant.
Au lieu du nom de la base de données, vous pouvez utiliser une expression constante qui renvoie une chaîne. Par exemple : currentDatabase().
Clusters
logs est défini. Il se compose de deux shards, chacun contenant deux répliques. Les shards correspondent à des serveurs qui contiennent différentes parties des données (pour lire l’ensemble des données, vous devez accéder à tous les shards). Les répliques sont des serveurs dupliqués (pour lire l’ensemble des données, vous pouvez accéder aux données sur n’importe laquelle des répliques).
Les noms de cluster ne doivent pas contenir de points.
Les paramètres host, port et, éventuellement, user, password, secure, compression, bind_host sont spécifiés pour chaque serveur :
| Paramètre | Description | Valeur par défaut |
|---|---|---|
host | Adresse du serveur distant. Vous pouvez utiliser soit le nom de domaine, soit l’adresse IPv4 ou IPv6. Si vous indiquez un domaine, le serveur effectue une requête DNS au démarrage, et le résultat est conservé tant que le serveur reste en cours d’exécution. Si la requête DNS échoue, le serveur ne démarre pas. Si vous modifiez l’enregistrement DNS, redémarrez le serveur. | - |
port | Port TCP utilisé pour l’activité de messagerie (tcp_port dans la config, généralement défini sur 9000). À ne pas confondre avec http_port. | - |
user | Nom de l’utilisateur utilisé pour se connecter à un serveur distant. Cet utilisateur doit disposer des droits nécessaires pour se connecter au serveur spécifié. L’accès est configuré dans le fichier users.xml. Pour plus d’informations, consultez la section Droits d’accès. | default |
password | Mot de passe utilisé pour se connecter à un serveur distant (non masqué). | ” |
secure | Indique s’il faut utiliser une connexion SSL/TLS sécurisée. Cela nécessite généralement aussi de préciser le port (le port sécurisé par défaut est 9440). Le serveur doit écouter sur <tcp_port_secure>9440</tcp_port_secure> et être configuré avec les certificats appropriés. | false |
compression | Utiliser la compression des données. | true |
bind_host | Adresse source à utiliser lors de la connexion au serveur distant depuis ce nœud. Seules les adresses IPv4 sont prises en charge. Destiné aux cas d’usage de déploiement avancés où il est nécessaire de définir l’adresse IP source utilisée par les requêtes distribuées ClickHouse. | - |
remote plutôt que de distributed) ou un nombre quelconque de shards. Dans chaque shard, vous pouvez spécifier d’une à un nombre quelconque de répliques. Vous pouvez spécifier un nombre différent de répliques pour chaque shard.
Vous pouvez spécifier autant de clusters que vous le souhaitez dans la configuration.
Pour afficher vos clusters, utilisez la table system.clusters.
Le moteur Distributed permet de travailler avec un cluster comme avec un serveur local. Cependant, la configuration du cluster ne peut pas être définie dynamiquement : elle doit être configurée dans le fichier de configuration du serveur. En général, tous les serveurs d’un cluster auront la même configuration de cluster (bien que cela ne soit pas obligatoire). Les clusters du fichier de configuration sont mis à jour à la volée, sans redémarrage du serveur.
Si vous devez envoyer une requête à un ensemble inconnu de shards et de répliques à chaque fois, vous n’avez pas besoin de créer une table Distributed – utilisez plutôt la fonction de table remote. Voir la section Fonctions de table.
Écriture des données
INSERT sur les tables distantes du cluster vers lesquelles pointe la table Distributed. C’est la solution la plus flexible, car vous pouvez utiliser n’importe quel schéma de sharding, même complexe en raison des exigences du domaine métier. C’est également la solution la plus optimale, puisque les données peuvent être écrites sur différents shards de façon totalement indépendante.
Deuxièmement, vous pouvez exécuter des instructions INSERT sur une table Distributed. Dans ce cas, la table répartit elle-même les données insérées entre les serveurs. Pour écrire dans une table Distributed, le paramètre sharding_key doit être configuré (sauf s’il n’y a qu’un seul shard).
Chaque shard peut avoir un <weight> défini dans le fichier de configuration. Par défaut, le poids est 1. Les données sont réparties entre les shards en proportion de leur poids. Tous les poids des shards sont additionnés, puis le poids de chaque shard est divisé par le total afin de déterminer la part de chaque shard. Par exemple, s’il y a deux shards et que le premier a un poids de 1 tandis que le second a un poids de 2, un tiers (1 / 3) des lignes insérées sera envoyé au premier et deux tiers (2 / 3) au second.
Chaque shard peut avoir le paramètre internal_replication défini dans le fichier de configuration. Si ce paramètre est défini sur true, l’opération d’écriture sélectionne la première réplique saine et y écrit les données. Utilisez ce paramètre si les tables sous-jacentes à la table Distributed sont des tables répliquées (par exemple, n’importe lequel des moteurs de table Replicated*MergeTree). L’une des répliques de la table recevra l’écriture, puis celle-ci sera automatiquement répliquée vers les autres répliques.
Si internal_replication est défini sur false (valeur par défaut), les données sont écrites sur toutes les répliques. Dans ce cas, la table Distributed réplique elle-même les données. C’est moins bien que d’utiliser des tables répliquées, car la cohérence des répliques n’est pas vérifiée et, avec le temps, elles contiendront des données légèrement différentes.
Pour sélectionner le shard auquel une ligne de données est envoyée, l’expression de sharding est analysée, puis on prend le reste de sa division par le poids total des shards. La ligne est envoyée au shard correspondant au demi-intervalle des restes allant de prev_weights à prev_weights + weight, où prev_weights est le poids total des shards ayant le plus petit numéro, et weight est le poids de ce shard. Par exemple, s’il y a deux shards, et que le premier a un poids de 9 tandis que le second a un poids de 10, la ligne sera envoyée au premier shard pour les restes de l’intervalle [0, 9), et au second pour les restes de l’intervalle [9, 19).
L’expression de sharding peut être n’importe quelle expression composée de constantes et de colonnes de table qui renvoie un entier. Par exemple, vous pouvez utiliser l’expression rand() pour une distribution aléatoire des données, ou UserID pour une distribution selon le reste de la division de l’ID de l’utilisateur (les données d’un même utilisateur résideront alors sur un seul shard, ce qui simplifie l’exécution de IN et JOIN par utilisateur). Si l’une des colonnes n’est pas distribuée de manière suffisamment uniforme, vous pouvez l’encapsuler dans une fonction de hachage, par exemple intHash64(UserID).
Un simple reste de division est une solution limitée pour le sharding et n’est pas toujours approprié. Cela fonctionne pour des volumes de données moyens à importants (des dizaines de serveurs), mais pas pour de très grands volumes de données (des centaines de serveurs ou plus). Dans ce dernier cas, utilisez le schéma de sharding requis par le domaine métier plutôt que de vous appuyer sur des entrées dans des tables Distributed.
Vous devez vous préoccuper du schéma de sharding dans les cas suivants :
- Des requêtes nécessitent de joindre des données (
INouJOIN) selon une clé spécifique. Si les données sont shardées selon cette clé, vous pouvez utiliserINouJOINlocal au lieu deGLOBAL INouGLOBAL JOIN, ce qui est bien plus efficace. - Un grand nombre de serveurs est utilisé (des centaines ou plus) avec un grand nombre de petites requêtes, par exemple des requêtes portant sur les données de clients individuels (par exemple, des sites web, des annonceurs ou des partenaires). Afin que les petites requêtes n’affectent pas l’ensemble du cluster, il est judicieux de placer les données d’un même client sur un seul shard. Vous pouvez également mettre en place un sharding à deux niveaux : diviser l’ensemble du cluster en « couches », où une couche peut être composée de plusieurs shards. Les données d’un même client sont situées sur une seule couche, mais des shards peuvent être ajoutés à une couche selon les besoins, et les données y sont réparties aléatoirement. Des tables
Distributedsont créées pour chaque couche, et une table distribuée partagée unique est créée pour les requêtes globales.
Distributed envoie séparément chaque fichier contenant des données insérées, mais vous pouvez activer l’envoi par lots des fichiers avec le paramètre distributed_background_insert_batch. Ce paramètre améliore les performances du cluster en exploitant mieux les ressources du serveur local et du réseau. Vous devez vérifier que les données ont bien été envoyées en consultant la liste des fichiers (données en attente d’envoi) dans le répertoire de la table : /var/lib/clickhouse/data/database/table/. Le nombre de threads exécutant des tâches en arrière-plan peut être défini à l’aide du paramètre background_distributed_schedule_pool_size.
Si le serveur a disparu ou a subi un redémarrage brutal (par exemple, à cause d’une panne matérielle) après un INSERT dans une table Distributed, les données insérées peuvent être perdues. Si une partie de données endommagée est détectée dans le répertoire de la table, elle est déplacée vers le sous-répertoire broken et n’est plus utilisée.
Lecture des données
Distributed, les requêtes SELECT sont envoyées à tous les shards et fonctionnent quelle que soit la façon dont les données sont réparties entre eux (elles peuvent être distribuées de manière totalement aléatoire). Lorsque vous ajoutez un nouveau shard, vous n’avez pas besoin d’y transférer les anciennes données. Vous pouvez plutôt y écrire les nouvelles données en lui attribuant un poids plus élevé : la répartition des données sera légèrement inégale, mais les requêtes continueront de fonctionner correctement et efficacement.
Lorsque l’option max_parallel_replicas est activée, le traitement des requêtes est parallélisé sur toutes les répliques d’un même shard. Pour plus d’informations, consultez la section max_parallel_replicas.
Pour en savoir plus sur le traitement des requêtes distribuées in et global in, consultez cette documentation.
Colonnes virtuelles
_Shard_num
_shard_num — Contient la valeur shard_num issue de la table system.clusters. Type : UInt32.
Comme les fonctions de table
remote et [cluster](../../../sql-reference/table-functions/cluster.md) créent en interne une table Distributed temporaire, _shard_num` y est aussi disponible.- Description des colonnes virtuelles
- Paramètre
background_distributed_schedule_pool_size - Fonctions
shardNum()etshardCount()