Создание сообщения avro с использованием php-enqueue

Я изучаю способ создания сообщений avro с php на kafka, используя php-enqueue .

В их документации указано, что вы можете использовать другие форматы, включая Apache Avro.

По умолчанию транспорт сериализует сообщения в формате json, но вы можете использовать другой формат, например Apache Avro. Для этого вам нужно реализовать интерфейс Serializer и установить его в контекст, производителя или потребителя. Если сериализатор настроен на контекст, он будет введен всем потребителям и производителям, созданным контекстом.

<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
    public function toMessage($string) {}

    public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$context->setSerializer(new FooSerializer());

Сериализатор в примере преобразует в строки и обратно. Насколько я понимаю, формат Avro является двоичным, так как же в этом случае должен работать пользовательский сериализатор?


person Vic    schedule 08.07.2020    source источник


Ответы (1)


Строки Php могут содержать двоичные данные. Вот частичная реализация создания сообщения avro с использованием идентификатора схемы, который был зарегистрирован в реестре схемы. Сериализация в avro выполняется с использованием реализации jaumo/avro.

public function toString(RdKafkaMessage $message): string
{
    ...

    $message = json_decode($message->getBody(), true);

    $encodedHeader = $this->createAvroHeader($schemaId);
    $encodedMessage = Serde::encodeMessage($parsedSchema, $message);

    return $encodedHeader . $encodedMessage;
}

private function createAvroHeader(int $schemaId): string
{
    $binarySchemaId = hex2bin(sprintf("%08s", dechex($schemaId)));
    return pack("C", 0) . $binarySchemaId;
}
person Vic    schedule 26.08.2020