الانتقال إلى المحتوى الرئيسي
الإدخالالإخراجاسم بديل

الوصف

يُعد Apache Avro تنسيق تسلسل موجَّهًا للصفوف يستخدم الترميز الثنائي لمعالجة البيانات بكفاءة. ويدعم تنسيق AvroConfluent قراءة الرسائل المرمّزة بـ Avro وكتابتها باستخدام Confluent Schema Registry (أو الخدمات المتوافقة مع واجهة برمجة التطبيقات). تستخدم كل رسالة تنسيق Confluent wire: بايت سحري (0x00) يتبعه معرّف مخطط بطول 4 بايت بترتيب big-endian، ثم بيانات Avro الثنائية. عند القراءة، يحدّد ClickHouse معرّف مخطط عبر الاستعلام من السجل. وعند الكتابة، يسجّل ClickHouse المخطط المستمد من أعمدة الإخراج ويضيف المعرّف الناتج إلى بداية كل صف. وتُخزَّن المخططات مؤقتًا لتحقيق أفضل أداء.

تعيين أنواع البيانات

يوضح الجدول أدناه جميع أنواع البيانات التي يدعمها تنسيق Apache Avro، وأنواع بيانات ClickHouse المقابلة في استعلامات INSERT وSELECT.
نوع بيانات Avro INSERTنوع بيانات ClickHouseنوع بيانات Avro SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes أو string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord
** يقبل نوع Variant القيمة null ضمنيًا كقيمة للحقل، لذلك سيُحوَّل Avro union(T1, T2, null)، على سبيل المثال، إلى Variant(T1, T2). ونتيجةً لذلك، عند إنشاء Avro من ClickHouse، يجب علينا دائمًا تضمين النوع null ضمن مجموعة أنواع Avro union لأننا لا نعرف أثناء استدلال المخطط ما إذا كانت أي قيمة هي بالفعل null. *** الأنواع المنطقية في Avro أنواع البيانات المنطقية غير المدعومة في Avro:
  • time-millis
  • time-micros
  • duration

إعدادات التنسيق

الإعدادالوصفالقيمة الافتراضية
input_format_avro_allow_missing_fieldsما إذا كان سيتم استخدام قيمة افتراضية بدلًا من إصدار خطأ عند عدم العثور على حقل في المخطط.0
input_format_avro_null_as_defaultما إذا كان سيتم استخدام قيمة افتراضية بدلًا من إصدار خطأ عند إدراج قيمة null في عمود لا يقبل NULL.0
format_avro_schema_registry_urlعنوان URL الخاص بـ Confluent Schema Registry. عند استخدام المصادقة الأساسية، يمكن تضمين بيانات الاعتماد المرمّزة بتنسيق URL مباشرةً في مسار URL.
format_avro_schema_registry_connection_timeoutمهلة الاتصال بالثواني لعميل HTTP الخاص بـ Schema Registry (تُستخدم لكل من جلب المخطط والتسجيل). يجب أن تكون أكبر من 0 وأقل من 600 (10 دقائق).1
format_avro_schema_registry_send_timeoutمهلة الإرسال بالثواني لعميل HTTP الخاص بـ Schema Registry. يجب أن تكون أكبر من 0 وأقل من 600 (10 دقائق).1
format_avro_schema_registry_receive_timeoutمهلة الاستلام بالثواني لعميل HTTP الخاص بـ Schema Registry. يجب أن تكون أكبر من 0 وأقل من 600 (10 دقائق).1
output_format_avro_confluent_subjectللإخراج: اسم subject الذي يُسجَّل المخطط تحته في Schema Registry. وهو مطلوب عند الكتابة.
output_format_avro_string_column_patternللإخراج: تعبير نمطي لأعمدة String لتسلسلها كـ Avro string (القيمة الافتراضية هي bytes).

أمثلة

القراءة من Kafka

لقراءة موضوع Kafka مُشفَّرًا باستخدام Avro عبر محرك جدول Kafka، استخدم الإعداد format_avro_schema_registry_url لتحديد URL لسجل المخططات.
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

الكتابة إلى Kafka

لكتابة رسائل AvroConfluent إلى موضوع Kafka، عيّن عنوان URL لسجل المخططات واسم الـsubject. يُسجَّل المخطط تلقائيًا في السجل عند أول عملية كتابة.
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

استخدام المصادقة الأساسية

إذا كان سجل المخططات لديك يتطلب المصادقة الأساسية (على سبيل المثال، إذا كنت تستخدم Confluent Cloud)، فيمكنك تمرير بيانات الاعتماد المرمَّزة وفق URL في الإعداد format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

استكشاف الأخطاء وإصلاحها

لمراقبة تقدّم الإدخال واستكشاف أخطاء مستهلك Kafka، يمكنك الاستعلام عن جدول النظام system.kafka_consumers. إذا كان النشر لديك يتضمن عدة نُسخ متماثلة (على سبيل المثال، ClickHouse Cloud)، فيجب عليك استخدام دالة الجدول clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
إذا واجهت مشكلات في التعرّف على المخطط، يمكنك استخدام kafkacat مع clickhouse-local لتشخيص المشكلة:
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
آخر تعديل في ٢٩ يونيو ٢٠٢٦