يُعد Apache Avro تنسيق تسلسل موجَّهًا للصفوف يستخدم الترميز الثنائي لمعالجة البيانات بكفاءة. ويدعم تنسيق AvroConfluent قراءة الرسائل المرمّزة بـ Avro وكتابتها باستخدام Confluent Schema Registry (أو الخدمات المتوافقة مع واجهة برمجة التطبيقات).
تستخدم كل رسالة تنسيق Confluent wire: بايت سحري (0x00) يتبعه معرّف مخطط بطول 4 بايت بترتيب big-endian، ثم بيانات Avro الثنائية. عند القراءة، يحدّد ClickHouse معرّف مخطط عبر الاستعلام من السجل. وعند الكتابة، يسجّل ClickHouse المخطط المستمد من أعمدة الإخراج ويضيف المعرّف الناتج إلى بداية كل صف. وتُخزَّن المخططات مؤقتًا لتحقيق أفضل أداء.
يوضح الجدول أدناه جميع أنواع البيانات التي يدعمها تنسيق Apache Avro، وأنواع بيانات ClickHouse المقابلة في استعلامات INSERT وSELECT.
نوع بيانات Avro INSERT | نوع بيانات ClickHouse | نوع بيانات Avro SELECT |
|---|
boolean, int, long, float, double | Int(8\16\32), UInt(8\16\32) | int |
boolean, int, long, float, double | Int64, UInt64 | long |
boolean, int, long, float, double | Float32 | float |
boolean, int, long, float, double | Float64 | double |
bytes, string, fixed, enum | String | bytes أو string * |
bytes, string, fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T), union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
** يقبل نوع Variant القيمة null ضمنيًا كقيمة للحقل، لذلك سيُحوَّل Avro union(T1, T2, null)، على سبيل المثال، إلى Variant(T1, T2).
ونتيجةً لذلك، عند إنشاء Avro من ClickHouse، يجب علينا دائمًا تضمين النوع null ضمن مجموعة أنواع Avro union لأننا لا نعرف أثناء استدلال المخطط ما إذا كانت أي قيمة هي بالفعل null.
*** الأنواع المنطقية في Avro
أنواع البيانات المنطقية غير المدعومة في Avro:
time-millis
time-micros
duration
| الإعداد | الوصف | القيمة الافتراضية |
|---|
input_format_avro_allow_missing_fields | ما إذا كان سيتم استخدام قيمة افتراضية بدلًا من إصدار خطأ عند عدم العثور على حقل في المخطط. | 0 |
input_format_avro_null_as_default | ما إذا كان سيتم استخدام قيمة افتراضية بدلًا من إصدار خطأ عند إدراج قيمة null في عمود لا يقبل NULL. | 0 |
format_avro_schema_registry_url | عنوان URL الخاص بـ Confluent Schema Registry. عند استخدام المصادقة الأساسية، يمكن تضمين بيانات الاعتماد المرمّزة بتنسيق URL مباشرةً في مسار URL. | |
format_avro_schema_registry_connection_timeout | مهلة الاتصال بالثواني لعميل HTTP الخاص بـ Schema Registry (تُستخدم لكل من جلب المخطط والتسجيل). يجب أن تكون أكبر من 0 وأقل من 600 (10 دقائق). | 1 |
format_avro_schema_registry_send_timeout | مهلة الإرسال بالثواني لعميل HTTP الخاص بـ Schema Registry. يجب أن تكون أكبر من 0 وأقل من 600 (10 دقائق). | 1 |
format_avro_schema_registry_receive_timeout | مهلة الاستلام بالثواني لعميل HTTP الخاص بـ Schema Registry. يجب أن تكون أكبر من 0 وأقل من 600 (10 دقائق). | 1 |
output_format_avro_confluent_subject | للإخراج: اسم subject الذي يُسجَّل المخطط تحته في Schema Registry. وهو مطلوب عند الكتابة. | |
output_format_avro_string_column_pattern | للإخراج: تعبير نمطي لأعمدة String لتسلسلها كـ Avro string (القيمة الافتراضية هي bytes). | |
لقراءة موضوع Kafka مُشفَّرًا باستخدام Avro عبر محرك جدول Kafka، استخدم الإعداد format_avro_schema_registry_url لتحديد URL لسجل المخططات.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';
SELECT * FROM topic1_stream;
لكتابة رسائل AvroConfluent إلى موضوع Kafka، عيّن عنوان URL لسجل المخططات واسم الـsubject. يُسجَّل المخطط تلقائيًا في السجل عند أول عملية كتابة.
CREATE TABLE topic1_sink
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';
INSERT INTO topic1_sink VALUES ('hello', 'world');
استخدام المصادقة الأساسية
إذا كان سجل المخططات لديك يتطلب المصادقة الأساسية (على سبيل المثال، إذا كنت تستخدم Confluent Cloud)، فيمكنك تمرير بيانات الاعتماد المرمَّزة وفق URL في الإعداد format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
لمراقبة تقدّم الإدخال واستكشاف أخطاء مستهلك Kafka، يمكنك الاستعلام عن جدول النظام system.kafka_consumers. إذا كان النشر لديك يتضمن عدة نُسخ متماثلة (على سبيل المثال، ClickHouse Cloud)، فيجب عليك استخدام دالة الجدول clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
إذا واجهت مشكلات في التعرّف على المخطط، يمكنك استخدام kafkacat مع clickhouse-local لتشخيص المشكلة:
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
آخر تعديل في ٢٩ يونيو ٢٠٢٦