Введение

Обмен сообщениями с малой задержкой в ​​больших распределенных системах никогда не был простым. Разработка систем с множеством микросервисов, использующих обмен сообщениями для взаимодействия между сервисами, особенно когда сервисы разрабатываются разными командами, сопряжена с множеством проблем. Это требует дисциплины и четкого биржевого контракта. Последнее, с чем вам нужно иметь дело, — это обратно несовместимое изменение в сообщении, отправляемом в тему, используемую вашим сервисом. Наличие четкой схемы тем уменьшит, а в идеале устранит возможность возникновения этой проблемы.

В этом посте я объясню, как этого добиться с помощью Protocol Buffers (или просто Protobuf) и Apache Kafka. Он основан на недавно сделанном PoC, который можно найти на GitHub. Для простоты я буду использовать Scala (хотя я постараюсь использовать как можно меньше фрагментов кода), но этот подход можно использовать и с другими языками, имеющими клиент Kafka и компилятор Protobuf.

Дизайн темы

Существует несколько подходов к оформлению тем, но их можно разделить на следующие категории: один тип сообщения на тему, несколько типов сообщений на тему или многие из них завернуты в общий конверт.

1. Несколько типов сообщений на тему.

Если у вас есть несколько событий и порядок должен быть сохранен, то очень уместно иметь несколько типов сообщений для каждой темы. Этот подход очень распространен в источниках событий, но не ограничивается этим. В этом случае вы будете отправлять события для определенного объекта в один и тот же раздел (Kafka гарантирует порядок сообщений только в том же разделе, но не в теме), установив общий ключ сообщения (например, идентификатор объекта).

2. Один тип сообщения на тему

Хотя гарантии упорядочения являются потенциально важным преимуществом использования одной и той же темы, не имеет особого смысла отправлять несвязанные данные в одну и ту же тему. В этом случае более целесообразно разделить вещи таким образом, чтобы каждая тема имела только один тип сообщения. Таким образом, будет легче рассуждать о данных, записываемых в тему и потребляемых из нее. Это также уменьшает количество нежелательных сообщений, которые потребителям, возможно, придется игнорировать.

3. Конверты с Any полезной нагрузкой

В больших распределенных системах вы, вероятно, захотите отправить некоторые метаданные в дополнение к фактическим данным приложения, будь то идентификатор корреляции для целей распределенной трассировки или подписи сообщений, чтобы гарантировать, что содержимому можно доверять. Конечно, также можно использовать заголовки Kafka, но наличие метаданных в необработанном сообщении, отправляемом в Kafka, упрощает определение контракта во всей системе и дает дополнительное преимущество, заключающееся в независимости от брокера сообщений.

Этого можно добиться, заключив сообщение приложения в конверт. Обычный подход состоит в том, чтобы иметь конверт с несколькими полями метаданных, а затем полезную нагрузку, выраженную с использованием типа Any Protobuf:

import "google/protobuf/any.proto";
message Envelope {  
  string correlation_id = 1;
  string auth_token = 2;
  // some other metadata
  google.protobuf.Any payload = 10;
}

Примечание. google.protobuf.Any нельзя сравнивать напрямую с Object в Java или Any в Scala. Лучше всего это понять, посмотрев на схему Protobuf:

message Any {  
  string type_url = 1;
  bytes value = 2;
}

type_url используется для уникальной идентификации типа сериализованного сообщения Protobuf, а value представляет сериализованную полезную нагрузку. Поскольку value — это просто последовательность байтов, требуется дополнительная десериализация. В результате у нас получается двухэтапная десериализация:

  1. Десериализовать сам конверт. После десериализации конверта мы получаем доступ к нашим полям метаданных и полезной нагрузке типа google.protobuf.Any.
  2. Десериализуйте value байт в реальный тип. Для этого мы будем использовать type_url, чтобы определить, какой десериализатор мы должны использовать для этих value байтов.

Этот конверт можно использовать независимо от того, отправляете ли вы один тип сообщения в тему или несколько. Это также удобный способ обеспечить структуру сообщений в системе. Одним из недостатков типа Any является то, что десериализацию необходимо выполнять в два этапа. В некоторых случаях имеет смысл иметь конверт с общим типом полезной нагрузки, но мы также должны учитывать сложность, которую он вносит.

4. Конверты конкретных/тематических схем

Основное назначение конкретных конвертов — определить схему темы. Это включает в себя определение конверта и список всех возможных типов полезной нагрузки, которые отправляются через тему. Давайте посмотрим на следующий пример, определяющий конкретную оболочку:

message UserEnvelope {  
  string correlation_id = 1;
  string auth_token = 2;
  // some other metadata
  User payload = 10; // Where user is a protobuf message that represents the user state
}
message User {  
  string first_name = 1;
  string last_name = 2;
}

Здесь мы определили UserEnvelope, который напоминает общий Envelope. Интересно, что полезная нагрузка имеет более конкретный тип. Вы можете легко определить тип полезной нагрузки и то, как будет выглядеть десериализация и обработка сообщений. И, что удобно, нам нужно выполнить только один шаг десериализации.

Но тогда возникает вопрос: «Как мы можем работать с несколькими типами полезной нагрузки?» Вот где начинается самое интересное!

Несколько типов полезной нагрузки с использованием oneof

Protobuf позволяет нам определить набор полей, из которых одновременно может быть задано не более одного, используя конструкцию oneof:

message SampleMessage {  
  oneof sample_field {
    string field_1 = 10;
    string field_2 = 11;
    CustomSubMessage field_3 = 12;
  }
}

На стороне записи Protobuf гарантирует, что установка поля oneof автоматически очистит все остальные члены поля oneof. На стороне чтения, когда синтаксический анализатор встречает несколько элементов одного и того же oneof, будет установлен только последний увиденный элемент.

Таким образом, мы можем использовать oneof для поддержки нескольких типов полезной нагрузки, будь то источник событий, текущее состояние конечного автомата или какая-то другая модель. Вы даже можете использовать его для управления версиями в рамках темы, но является ли это хорошей идеей, является предметом другого обсуждения.

Ради этого поста я собираюсь предположить, что мы используем источник событий и определяем набор событий для пользовательского объекта:

syntax = "proto3";
package com.tudorzgureanu.protocol.users.v1;
message UsersEnvelope {  
    string correlation_id = 1;
    // some other metadata
    oneof payload {
        UserCreated user_created = 11;
        UserUpdated user_updated = 12;
        UserActivated user_activated = 13;
    }
}
message UserCreated {  
    string id = 1;
    string first_name = 2;
    string last_name = 3;
}
message UserUpdated {  
    string id = 1;
    string first_name = 2;
    string last_name = 3;
}
message UserActivated {  
    string user_id = 1;
}

Используя oneof для определения конкретного конверта темы, мы можем иметь только один тип сообщения верхнего уровня для каждой темы, но все возможные типы полезной нагрузки перечислены как часть этого определения сообщения. Это хороший способ определить схемы тем. Это дает нам лучший и более четкий контракт, поскольку в нем точно указано, какие типы полезной нагрузки следует ожидать. Это также повышает безопасность типов. Теперь, когда у нас есть это четкое определение допустимых входных данных для темы, все, что не соответствует этой схеме сообщений верхнего уровня, может быть просто отброшено потребителями как недействительные данные.

Единственное, что осталось сделать, это получить это определение сообщения в вашем сервисе, сгенерировать классы Scala (или другого языка), и вы готовы к работе.

Обработка полезной нагрузки в Scala

Обработка полезной нагрузки относительно проста. ScalaPB генерирует тип суммы для всех полей oneof и случай Empty для случая, когда полезная нагрузка вообще не установлена ​​или установлена ​​с полем, которое не ожидается или не поддерживается потребителем (в других языках «пустой» случай может называться по-другому ). Один сценарий, в котором это ожидается, заключается в том, что новые типы событий вводятся и отправляются производителями, но принимаются потребителем, который еще не был обновлен для их обработки.

В Scala самый простой способ обработки всех типов полезной нагрузки — использовать сопоставление с образцом. Благодаря типам сумм сопоставления с образцом у нас есть то преимущество, что компилятор Scala сообщит нам, является ли сопоставление с образцом исчерпывающим или нет.

Вот так выглядит обработка конверта. Я опустил здесь потребительский код Kafka, потому что сообщения потребляются и десериализуются обычным способом, но вы можете ознакомиться с полным UserConsumerActor кодом здесь.

def processUserEvent(key: Option[String], envelope: UsersEnvelope): Future[Either[String, UserEvent]] = {  
  envelope.payload match {
    case Payload.UserCreated(userCreatedProto) =>
      log.info(s"[correlationId: ${envelope.correlationId}] User created $userCreatedProto")
      userService
        .persistUserEvent(UserCreated.fromProtoV1(userCreatedProto))
        .map(Right(_))
    case Payload.UserUpdated(userUpdatedProto) =>
      log.info(s"[correlationId: ${envelope.correlationId}] User updated $userUpdatedProto")
      userService
        .persistUserEvent(UserUpdated.fromProtoV1(userUpdatedProto))
        .map(Right(_))
    case Payload.UserActivated(userActivatedProto) =>
      log.info(s"[correlationId: ${envelope.correlationId}] User activated $userActivatedProto")
      userService.persistUserEvent(UserActivated.fromProtoV1(userActivatedProto)).map(Right(_))
    case Payload.Empty =>
      log.info(
        s"[correlationId: ${envelope.correlationId}] Unexpected payload with key: ${key.getOrElse("null")}. Payload ignored."
      )
      Future.successful(Left("Couldn't not process payload."))
  }
}

Как видите, это довольно удобный способ обработки полезной нагрузки. Если в определение protobuf добавляется новый тип полезной нагрузки, компилятор Scala сообщит нам, что нам нужно обработать новый добавленный тип. В Java вам придется использовать instanceof, за которым следует приведение к правильному типу (пожалуйста, дайте мне знать, если вы хотите увидеть Java-версию этого PoC).

Проблемы

Сжатие журнала

Если вы решите использовать уплотненные темы, над конвертами может потребоваться дополнительная работа.

Но сначала краткий обзор того, что такое сжатие журналов, из Начало работы с Kafka с использованием Scala Kafka Client и Akka:

Когда темы, работающие в обычном режиме работы (этот режим называется "удаление"), достигают установленного предела длины (будь то ограничение по времени и/или пространству), сообщения удаляются из конца журнала.

В режиме сжатия журнала (этот режим называется «компактный») вместо удаления сообщений из хвоста журнала при достижении ограничения длины Kafka создает новую версию журнала, сохраняя по ключу key на основе только самого последнего сообщения, которое было записано в старый несжатый журнал.

Удаление достигается путем отправки пустой полезной нагрузки, которая будет действовать как надгробие для этого ключа. После сжатия надгробия очищаются, не оставляя следов и, следовательно, удаляя этот ключ.

Поскольку удаление достигается путем отправки ключа с сообщением null, метаданные, такие как идентификатор корреляции, также теряются. Чтобы сохранить его, удаление должно быть выполнено в два этапа:

  1. Во-первых, мы отправляем сообщение-конверт со всеми соответствующими заголовками, но с «пустой» полезной нагрузкой (ниже мы рассмотрим два способа сделать это). Это даст нам необходимые метаданные, но также даст нам знать, что этот объект был помечен как удаленный и должен быть обработан соответствующим образом (например, путем удаления объекта из базы данных). Недостатком этого подхода является то, что все стороны должны знать, как сущности помечаются как удаленные.
  2. Сразу же после отправки указанного выше сообщения мы отправляем второе сообщение с тем же ключом, но с пустым конвертом. Это делается только для того, чтобы Kafka знал, что этот ключ нужно удалить из журнала. Когда произойдет уплотнение, Kafka удалит ключ из журнала.

Давайте вернемся к шагу 1 и посмотрим, как мы можем представить удаленную сущность в конверте:

Без oneof

Только с одним типом полезной нагрузки для каждой темы все просто. Давайте вернемся к примеру с одним типом сообщения для каждой темы:

message UserEnvelope {  
  string correlation_id = 1;
  User payload = 2;  
}

Как описано выше, удаление требует двух шагов:

  1. В Protobuf 3 нескалярные поля допускают значение null, и ScalaPB представляет их в сгенерированном коде как Option. Таким образом, можно сообщить приложению об удалении, отправив полезную нагрузку null (None). В качестве альтернативы у нас мог бы быть флаг (например, bool deleted = 3;), который сообщал бы нам, что это представляет собой удаление.
  2. Сразу после шага 1 мы готовы отправить сообщение с тем же ключом и пустым конвертом, чтобы пометить его для удаления.

С oneof

oneof дает нам еще один вариант. Мы можем легко определить отдельное событие/подсообщение об удалении объекта, которое сообщает нам, что объект был удален. Это особенно полезно, когда у вас есть более одного типа сущности:

message UsersEnvelope {  
    string correlation_id = 1;
    // some other metadata
    oneof payload {
        User user = 11;
        UserRemoved user_removed = 12; // Contains the user id. Or a bool could be used if the key sent to Kafka is the entity id
        // more payload types
    }
}

Несмотря на то, что для этого по-прежнему требуется 2 шага (но только 1 десериализация), основное преимущество заключается в том, что вместо установки полезной нагрузки на null (в Java и других языках) или None в Scala мы используем другой случай oneof. Таким образом, у нас может быть несколько типов сущностей и соответствующий случай deletion для каждого из них. Еще одно существенное преимущество этого заключается в удобочитаемости: UserRemoved оставляет мало места для интерпретации читателем, тогда как значение null или None зависит от контекста.

Выводы

В этом посте мы обсудили преимущества и недостатки нескольких тематических дизайнов, уделив особое внимание «конкретным оболочкам». Они обеспечивают лучший контракт между сторонами и безопасный способ обработки полезной нагрузки. Тем не менее, этот подход может не подходить для всех вариантов использования, и вам решать, какой дизайн темы лучше подходит для вашей системы.

Полный код этого PoC доступен здесь. Кроме того, я рекомендую ознакомиться с этим хорошим Учебником по Kafka с использованием Scala Kafka Client.

P.S. Я хотел бы поблагодарить David Piggott и Simon Souter за их ценные отзывы!

Удачного взлома!

Тюдор

Ссылки и дополнительная литература