Message Queue cơ bản - part 5

Message Queue cơ bản - part 5

·

5 min read

Message Serialization là quá trình chuyển đổi dữ liệu hoặc trạng thái của một đối tượng thành một dạng có thể truyền tải qua mạng hoặc lưu trữ vào bộ nhớ hoặc ổ đĩa. Trong ngữ cảnh của Message Queues, Serialization đảm bảo rằng dữ liệu được đóng gói thành các thông điệp có thể truyền đi qua mạng hoặc lưu trữ mà không gặp vấn đề về định dạng hoặc độ lớn.

Tại sao cần Message Serialization?

  • Truyền tải dữ liệu: Khi giao tiếp giữa các thành phần của hệ thống, đặc biệt là trong các hệ thống phân tán, việc truyền tải dữ liệu qua mạng là cần thiết. Message Serialization đảm bảo rằng dữ liệu có thể được đóng gói và gửi đi một cách hiệu quả.

  • Lưu trữ dữ liệu: Khi cần lưu trữ dữ liệu vào cơ sở dữ liệu hoặc bộ nhớ ngoại tuyến, việc Serialization giúp biến đổi dữ liệu thành định dạng phù hợp để lưu trữ và khôi phục sau này.

Các thành phần của Message Serialization:

  1. Encode: Đây là quá trình chuyển đổi dữ liệu từ dạng đối tượng hoặc cấu trúc dữ liệu sang dạng chuỗi hoặc byte stream có thể truyền đi.

  2. Decode: Ngược lại với quá trình Encode, Decode chuyển đổi dữ liệu từ dạng chuỗi hoặc byte stream thành dạng đối tượng hoặc cấu trúc dữ liệu ban đầu.

Quá trình Message Serialization thường bao gồm các thành phần sau:

  • Đối tượng nguồn: Là đối tượng cần được chuyển đổi thành định dạng dữ liệu khác.

  • Bộ mã hóa: Là phần mềm chịu trách nhiệm chuyển đổi đối tượng nguồn thành định dạng dữ liệu đích.

  • Định dạng dữ liệu đích: Là định dạng dữ liệu mà đối tượng nguồn sẽ được chuyển đổi thành, ví dụ như JSON, XML, hoặc YAML.

  • Bộ giải mã: Là phần mềm chịu trách nhiệm chuyển đổi dữ liệu ở định dạng đích thành đối tượng gốc.

Ví Dụ 1

Dưới đây là một ví dụ về cách sử dụng Kafka để serialize và deserialize dữ liệu bằng Python, sử dụng Avro schema để đảm bảo tính nhất quán và hiệu suất khi truyền tải dữ liệu:

  1. Cài đặt các thư viện cần thiết:
pip install confluent-kafka avro-python3
  1. Tạo một Avro schema cho dữ liệu của bạn. Ví dụ, chúng ta sẽ tạo một schema đơn giản cho dữ liệu người dùng:
from avro import schema, io

avro_schema_str = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "city", "type": "string"}
  ]
}
"""

user_schema = schema.Parse(avro_schema_str)
  1. Serialize dữ liệu bằng Avro schema và gửi vào Kafka topic:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

avro_producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}

avro_producer = AvroProducer(avro_producer_conf, default_value_schema=user_schema)

user_data = {"name": "John", "age": 30, "city": "New York"}
avro_producer.produce(topic='user-topic', value=user_data)
avro_producer.flush()
  1. Deserialize dữ liệu từ Kafka topic sử dụng Avro schema:
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer

avro_consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'user-consumer-group',
    'auto.offset.reset': 'earliest',
    'schema.registry.url': 'http://localhost:8081'
}

avro_consumer = AvroConsumer(avro_consumer_conf)
avro_consumer.subscribe(['user-topic'])

while True:
    msg = avro_consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    value = msg.value()
    print("Received user data: {}".format(value))

Trong ví dụ này, chúng ta sử dụng AvroProducer để serialize dữ liệu và gửi nó vào Kafka topic, sau đó sử dụng AvroConsumer để deserialize dữ liệu từ Kafka topic bằng Avro schema đã được xác định trước. Điều này đảm bảo rằng dữ liệu được truyền tải giữa các producer và consumer thông qua Kafka là nhất quán và có hiệu suất cao.

Ví Dụ 2:

Ví dụ này minh họa việc sử dụng Message Serialization với Apache Kafka trong Python, sử dụng định dạng Avro và Schema Registry. Avro là một định dạng dữ liệu nhị phân hiệu quả và linh hoạt, được thiết kế để lưu trữ và truyền tải dữ liệu cấu trúc. Schema Registry là một dịch vụ lưu trữ và quản lý các schema Avro cho các ứng dụng Kafka.

1. Cài đặt:

Ngoài thư viện kafka-python được đề cập trong ví dụ trước, bạn cần cài đặt các thư viện sau:

  • confluent-avro: Thư viện Avro cho Python

  • avro-schema-registry: Thư viện Schema Registry cho Python

Bash

pip install confluent-avro avro-schema-registry

2. Khởi động Schema Registry:

Khởi động Schema Registry với chế độ đơn (single-node) và cung cấp URL để truy cập:

avro-schema-registry start --auto-register --http-port 8081 --url http://localhost:8081

3. Tạo Schema Avro:

Tạo một file message.avsc để định nghĩa schema Avro cho tin nhắn:

{
  "type": "record",
  "name": "Message",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    }
  ]
}

4. Tạo lớp dữ liệu:

Tạo một lớp Message tương ứng với schema Avro đã định nghĩa:

from confluent_avro.schema import AvroSchema

class Message:
    def __init__(self, id, name, email):
        self.id = id
        self.name = name
        self.email = email

    def to_avro(self):
        schema = AvroSchema(open('message.avsc').read())
        return schema.dumps(self.__dict__)

    @classmethod
    def from_avro(cls, data):
        schema = AvroSchema(open('message.avsc').read())
        return cls(**schema.loads(data))

5. Producer:

from confluent_avro import AvroProducer

producer = AvroProducer({
    'bootstrap.servers': ['localhost:9092'],
    'schema.registry.url': 'http://localhost:8081',
    'schema.registry.auto.register': True
})

6. Gửi tin nhắn:

Tạo một tin nhắn mới và gửi nó đến chủ đề my-topic:

message = Message(1, "Bob", "bob@example.com")
serialized_message = message.to_avro()

producer.send('my-topic', serialized_message)
print(f"Sent message: {message}")

7. Tạo người tiêu dùng (Consumer):

Tạo một consumer với cấu hình sử dụng Schema Registry:

from confluent_avro import AvroConsumer

consumer = AvroConsumer({
    'bootstrap.servers': ['localhost:9092'],
    'schema.registry.url': 'http://localhost:8081',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['my-topic'])

8. Nhận tin nhắn:

Nhận tin nhắn từ chủ đề my-topic và in ra nội dung:

while True:
    try:
        message = consumer.poll(1.0)
        if message:
            deserialized_message = Message.from_avro(message.value())
            print(f"Received message: {deserialized_message}")
    except KeyboardInterrupt:
        break

9. Chạy chương trình:

Chạy hai chương trình riêng biệt: một chương trình cho producer và một chương trình cho consumer. Producer sẽ gửi tin nhắn đến Kafka, và consumer sẽ nhận tin nhắn và in ra nội dung.