Passer au contenu principal
Le client Rust officiel pour se connecter à ClickHouse, développé à l’origine par Paul Loyd. Le code source du client est disponible dans le dépôt GitHub.

Vue d’ensemble

  • Utilise serde pour l’encodage/décodage des lignes.
  • Prend en charge les attributs serde : skip_serializing, skip_deserializing, rename.
  • Utilise le format RowBinary via HTTP.
    • Il est prévu de passer à Native via TCP.
  • Prend en charge TLS (via les fonctionnalités native-tls et rustls-tls).
  • Prend en charge la compression et la décompression (LZ4).
  • Fournit des API pour interroger ou insérer des données, exécuter des DDLs et faire du batching côté client.
  • Fournit des mocks pratiques pour les tests unitaires.

Installation

Pour utiliser ce crate, ajoutez ce qui suit à votre Cargo.toml :
[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
Voir aussi : la page crates.io.

Fonctionnalités de Cargo

  • lz4 (activée par défaut) — active les variantes Compression::Lz4 et Compression::Lz4Hc(_). Si elle est activée, Compression::Lz4 est utilisée par défaut pour toutes les requêtes, sauf pour WATCH.
  • native-tls — prend en charge les URL dont le schéma est HTTPS via hyper-tls, qui s’appuie sur OpenSSL.
  • rustls-tls — prend en charge les URL dont le schéma est HTTPS via hyper-rustls, qui ne s’appuie pas sur OpenSSL.
  • inserter — active client.inserter().
  • test-util — ajoute des mocks. Voir l’exemple. À utiliser uniquement dans dev-dependencies.
  • watch — active la fonctionnalité client.watch. Voir la section correspondante pour plus de détails.
  • uuid — ajoute serde::uuid pour fonctionner avec le crate uuid.
  • time — ajoute serde::time pour fonctionner avec le crate time.
Lorsque vous vous connectez à ClickHouse via une URL HTTPS, l’une des fonctionnalités native-tls ou rustls-tls doit être activée. Si les deux sont activées, la fonctionnalité rustls-tls sera prioritaire.

Compatibilité des versions de ClickHouse

Le client est compatible avec les versions LTS de ClickHouse et les versions ultérieures, ainsi qu’avec ClickHouse Cloud. Les versions du serveur ClickHouse antérieures à la v22.6 gèrent RowBinary de manière incorrecte dans de rares cas. Vous pouvez utiliser la v0.11+ et activer la fonctionnalité wa-37420 pour résoudre ce problème. Remarque : cette fonctionnalité ne doit pas être utilisée avec des versions plus récentes de ClickHouse.

Exemples

Nous cherchons à couvrir différents scénarios d’utilisation du client à l’aide des exemples du dépôt du client. Une vue d’ensemble est disponible dans le README des exemples. Si un point n’est pas clair ou s’il manque quelque chose dans les exemples ou dans la documentation ci-dessous, n’hésitez pas à nous contacter.

Utilisation

Le crate ch2rs permet de générer un type de ligne à partir de ClickHouse.

Création d’une instance de client

Réutilisez les clients créés ou clonez-les afin de réutiliser le pool de connexions hyper sous-jacent.
use clickhouse::Client;

let client = Client::default()
    // should include both protocol and port
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

Connexion HTTPS ou à ClickHouse Cloud

HTTPS fonctionne avec les fonctionnalités Cargo rustls-tls ou native-tls. Ensuite, créez le client comme d’habitude. Dans cet exemple, les variables d’environnement sont utilisées pour stocker les informations de connexion :
L’URL doit inclure à la fois le protocole et le port, par exemple https://instance.clickhouse.cloud:8443.
fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));
Voir aussi :

Sélection de lignes

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • L’espace réservé ?fields est remplacé par no, name (champs de Row).
  • L’espace réservé ? est remplacé par les valeurs dans les appels bind() suivants.
  • Les méthodes pratiques fetch_one::<Row>() et fetch_all::<Row>() peuvent être utilisées pour obtenir respectivement la première ligne ou l’ensemble des lignes.
  • sql::Identifier peut être utilisé pour lier des noms de table.
NB : comme la réponse entière est transmise en flux, les curseurs peuvent renvoyer une erreur même après avoir produit certaines lignes. Si cela se produit dans votre cas, vous pouvez essayer query(...).with_option("wait_end_of_query", "1") afin d’activer la mise en tampon des réponses côté serveur. Plus de détails. L’option buffer_size peut également être utile.
Utilisez wait_end_of_query avec prudence lors de la sélection de lignes, car cela peut entraîner une consommation de mémoire plus élevée côté serveur et réduira probablement les performances globales.

Insertion de lignes

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • Si end() n’est pas appelé, l’INSERT est annulé.
  • Les lignes sont envoyées progressivement sous forme de flux afin de répartir la charge réseau.
  • ClickHouse n’effectue des inserts par lot de manière atomique que si toutes les lignes tiennent dans la même partition et que leur nombre est inférieur à max_insert_block_size.

Async insert (batching côté serveur)

Vous pouvez utiliser les insertions asynchrones de ClickHouse pour éviter le batching des données entrantes côté client. Pour ce faire, il suffit de fournir l’option async_insert à la méthode insert (ou même directement à l’instance Client, afin qu’elle s’applique à tous les appels à insert).
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");
Voir aussi :

Fonctionnalité Inserter (batching côté client)

Nécessite la fonctionnalité Cargo inserter.
let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
  • Inserter termine l’insertion active dans commit() si l’un des seuils (max_bytes, max_rows, period) est atteint.
  • L’intervalle entre la fin des INSERT actives peut être décalé à l’aide de with_period_bias afin d’éviter des pics de charge dus à des inserters parallèles.
  • Inserter::time_left() peut être utilisé pour détecter la fin de la période en cours. Appelez à nouveau Inserter::commit() pour vérifier les limites si votre flux émet rarement des éléments.
  • Les seuils temporels sont implémentés à l’aide de la crate quanta pour accélérer inserter. Ils ne sont pas utilisés si test-util est activé (le temps peut ainsi être géré par tokio::time::advance() dans des tests personnalisés).
  • Toutes les lignes entre les appels à commit() sont insérées dans la même instruction INSERT.
N’oubliez pas d’effectuer un flush si vous voulez arrêter/finaliser l’insertion :
inserter.end().await?;

Exécution des DDLs

Avec un déploiement mono-nœud, il suffit d’exécuter les DDLs comme suit :
client.query("DROP TABLE IF EXISTS some").execute().await?;
Cependant, dans les déploiements en cluster avec un équilibreur de charge ou ClickHouse Cloud, il est recommandé d’attendre que le DDL soit appliqué à toutes les répliques à l’aide de l’option wait_end_of_query. Voici comment procéder :
client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

Paramètres ClickHouse

Vous pouvez appliquer différents paramètres ClickHouse grâce à la méthode with_option. Par exemple :
let numbers = client
    .query("SELECT number FROM system.numbers")
    // This setting will be applied to this particular query only;
    // it will override the global client setting.
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;
Comme avec query, cela fonctionne de la même manière avec les méthodes insert et inserter ; de plus, cette même méthode peut être appelée sur l’instance Client pour définir des paramètres globaux pour toutes les requêtes.

ID de requête

Avec .with_option, vous pouvez définir l’option query_id afin d’identifier les requêtes dans le journal des requêtes de ClickHouse.
let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;
Outre query, cela fonctionne de façon similaire avec les méthodes insert et inserter.
Si vous définissez query_id manuellement, assurez-vous qu’il est unique. Les UUIDs sont un bon choix pour cela.
Voir aussi : exemple de query_id dans le dépôt du client.

ID de session

Comme pour query_id, vous pouvez définir session_id pour exécuter les instructions dans une même session. session_id peut être défini soit globalement, au niveau du client, soit pour chaque appel à query, insert ou inserter.
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
Avec les déploiements en cluster, en l’absence de “sticky sessions”, vous devez être connecté à un nœud précis du cluster pour utiliser correctement cette fonctionnalité, car, par exemple, un équilibreur de charge round-robin ne garantit pas que les requêtes suivantes seront traitées par le même nœud ClickHouse.
Voir aussi : l’exemple session_id dans le dépôt du client.

En-têtes HTTP personnalisés

Si vous utilisez l’authentification par proxy ou devez transmettre des en-têtes personnalisés, vous pouvez procéder comme suit :
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");
Voir aussi : exemple d’en-têtes HTTP personnalisés dans le dépôt du client.

Client HTTP personnalisé

Cela peut être utile pour ajuster les paramètres du pool de connexions HTTP sous-jacent.
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // For how long keep a particular idle socket alive on the client side (in milliseconds).
    // It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
    // which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
    .pool_idle_timeout(Duration::from_millis(2_500))
    // Sets the maximum idle Keep-Alive connections allowed in the pool.
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
Cet exemple repose sur l’ancienne API Hyper et est susceptible d’évoluer à l’avenir.
Voir aussi : exemple de client HTTP personnalisé dans le dépôt du client.

Types de données

  • (U)Int(8|16|32|64|128) correspond, dans les deux sens, aux types (u|i)(8|16|32|64|128) correspondants ou à des newtypes qui les encapsulent.
  • (U)Int256 ne sont pas pris en charge directement, mais il existe une solution de contournement.
  • Float(32|64) correspond, dans les deux sens, aux types f(32|64) correspondants ou à des newtypes qui les encapsulent.
  • Decimal(32|64|128) correspond, dans les deux sens, aux types i(32|64|128) correspondants ou à des newtypes qui les encapsulent. Il est plus pratique d’utiliser fixnum ou une autre implémentation de nombres signés à virgule fixe.
  • Boolean correspond, dans les deux sens, à bool ou à des newtypes qui l’encapsulent.
  • String correspond, dans les deux sens, à n’importe quel type de chaîne ou d’octets, par exemple &str, &[u8], String, Vec<u8> ou SmartString. Les newtypes sont également pris en charge. Pour stocker des octets, envisagez d’utiliser serde_bytes, car c’est plus efficace.
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N) est pris en charge en tant que tableau d’octets, par exemple [u8; N].
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUID est converti depuis et vers uuid::Uuid à l’aide de serde::uuid. Nécessite la fonctionnalité uuid.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Date se convertit depuis/vers u16 ou un newtype l’encapsulant, et représente un nombre de jours écoulés depuis 1970-01-01. time::Date est également pris en charge via serde::time::date, ce qui nécessite l’activation de la fonctionnalité time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32 se convertit vers/depuis i32 ou un newtype l’encapsulant, et représente un nombre de jours écoulés depuis 1970-01-01. time::Date est également pris en charge via serde::time::date32, ce qui nécessite la fonctionnalité time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTime se convertit en/depuis u32 ou un newtype qui l’encapsule, et représente un nombre de secondes écoulées depuis l’époque Unix. time::OffsetDateTime est également pris en charge via serde::time::datetime, ce qui nécessite la feature time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_) se convertit en/depuis i32 ou un newtype qui l’encapsule, et représente le temps écoulé depuis l’époque UNIX. De plus, time::OffsetDateTime est pris en charge via serde::time::datetime64::*, ce qui nécessite la fonctionnalité time.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...) se convertit vers/depuis (A, B, ...) ou un newtype qui l’encapsule.
  • Array(_) se convertit vers/depuis n’importe quel slice, par ex. Vec<_>, &[_]. Les nouveaux types sont également pris en charge.
  • Map(K, V) se comporte comme Array((K, V)).
  • LowCardinality(_) est pris en charge de manière transparente.
  • Nullable(_) se convertit vers/depuis Option<_>. Pour les helpers clickhouse::serde::*, ajoutez ::option.
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nested est pris en charge en fournissant plusieurs tableaux avec changement de nom.
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Les types Geo sont pris en charge. Point se comporte comme un tuple (f64, f64), et les autres types ne sont que des slices de points.
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • Les types de données Variant, Dynamic et JSON (nouveau) ne sont pas encore pris en charge.

Simulation

Le crate fournit des utilitaires pour simuler un serveur ClickHouse et tester les requêtes DDL, SELECT, INSERT et WATCH. Cette fonctionnalité peut être activée avec la fonctionnalité test-util. Ne l’utilisez que comme dépendance de développement. Voir l’exemple.

Dépannage

CANNOT_READ_ALL_DATA

La cause la plus fréquente de l’erreur CANNOT_READ_ALL_DATA est que la définition de la ligne côté application ne correspond pas à celle de ClickHouse. Considérez la table suivante :
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
Ensuite, si EventLog est défini côté application avec des types incompatibles, par exemple :
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- should be u32 instead!
}
Lors de l’insertion des données, l’erreur suivante peut survenir :
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
Dans cet exemple, cela est résolu par la définition correcte de la structure EventLog :
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

Limitations connues

  • Les types de données Variant, Dynamic et JSON (nouveau) ne sont pas encore pris en charge.
  • Le binding de paramètres côté serveur n’est pas encore pris en charge ; voir ce ticket pour en suivre l’avancement.

Contactez-nous

Si vous avez des questions ou besoin d’aide, n’hésitez pas à nous contacter sur le Slack de la communauté ou via les issues GitHub.
Dernière modification le 29 juin 2026