kafka offset что значит
Русские Блоги
Кафка офсетное (Offset) управление
1. Определение
Каждый раздел в Kafka состоит из серии упорядоченных и неизменяемых сообщений, которые последовательно добавляются к разделу. Каждое сообщение в разделе имеет непрерывный порядковый номер, который используется для уникальной идентификации сообщения в разделе.
Смещение записывает порядковый номер следующего сообщения, которое должно быть отправлено потребителю.
Три общие семантики в системах потоковой обработки:
Максимум однажды | Каждая запись обрабатывается один раз или не обрабатывается совсем |
Хотя бы один раз | Это лучше, чем один раз, потому что это гарантирует, что никакие данные не будут потеряны. Но могут быть дубликаты |
Да и только один раз | Каждая запись будет обработана ровно один раз, данные не будут потеряны, и никакие данные не будут обрабатываться несколько раз. |
The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)
2.Kafka offset Management with Spark Streaming
В первую очередь рекомендуется сохранять смещение в Zookeeper. Zookeeper более легкий, чем HBASE и т. Д., И он используется для HA (высокая доступность), а смещение более безопасно.
Стандартные двухэтапные операции для управления смещением:
3. Экологическая подготовка
Запустите продюсер Kafka и протестируйте тему: tp_kafka:
Запустите потребителя Kafka:
Создавайте данные в IDEA:
4. Первый метод управления смещением: наименьшее
Количество Spark Streaming link статистика Kafka:
Но если Spark Streaming останавливается и перезапускается в это время:
Вы обнаружите, что отсчет начинается здесь снова, потому что значение auto.offset.reset установлено на наименьшее значение в коде. (До версии kafka-0.10.1.X)
5. Второй способ управления смещением: контрольно-пропускной пункт
Создайте папку / offset в HDFS:
Использовать контрольную точку:
Примечание. IDEA изменяет пользователя HDFS в параметрах виртуальной машины в настройках:
Установил, что предыдущие 100 штук были израсходованы. Это после остановки, производства 100 штук, а затем запуска:
Было обнаружено, что здесь было прочитано только 100 элементов между концом последнего раза и началом этого времени, вместо того, чтобы читать все предыдущие элементы, как самые маленькие.
Но у checkpiont есть проблема: если смещение управляется таким образом, пока бизнес-логика меняется, контрольная точка не действует. Потому что он вызывает getOrCreate ().
6. Третий тип управления смещением: ручное управление смещением.
Решение 1: реализовать идемпотент
Характеристика идемпотентной операции в программировании состоит в том, что эффект от любого количества выполнений такой же, как от одного выполнения.
Решение 2: транзакция (транзакция)
1. Транзакция базы данных может включать в себя одну или несколько операций с базой данных, но эти операции составляют логическое целое.
2. Либо все эти операции с базой данных, составляющие логическое целое, выполняются успешно, либо все не выполняются.
3. Все операции, составляющие транзакцию, либо все влияют на базу данных, либо ни одну из них, то есть независимо от того, успешно ли выполнена транзакция, база данных всегда может поддерживать согласованное состояние.
4. Сказанное выше остается верным, даже если база данных выходит из строя и существуют параллельные транзакции.
Сохраните бизнес-логику и смещение в одной транзакции и выполните ее только один раз.
7. auto.kafka.reset после версии Kafka-0.10.1.X:
Что такое фиксации смещений и почему они так важны для Kafka
В прошлый раз мы говорили про потоки событий в брокере Kafka. Сегодня поговорим про фиксации смещений в Kafka. Читайте далее про особенности фиксаций смещений, благодаря которым брокер Kafka может легко добавлять новые записи Big Data в топики, а также получать доступ к старым.
Как работают фиксации смещений в брокере Apache Kafka: особенности обработки записей Big Data
Каждый из этих видов мы подробнее рассмотрим далее.
Синхронная фиксация смещений
Синхронная фиксация смещений — это автоматическая фиксация текущего смещения записи в момент ее появления. Как только смещение успешно фиксируется, выполнение процедуры завершается. В случае сбоя синхронной фиксации генерируется исключение, и фиксация возобновляется, выполняясь до тех пор, пока смещение не зафиксируется. Следующий код на языке Java отвечает за выполнение синхронной фиксации смещений записей с помощью метода commitSync() [2]:
Из кода видно, что фиксация смещения происходит после получения приложением каждой записи (с помощью метода poll() ). Метод commitSync() будет повторять фиксацию каждой новой записи до тех пор, пока не возникнет непоправимая ошибка типа CommitFailedException (например, полный выход из строя Kafka-сервера). Если произойдет такая ошибка, сведения о ней автоматически запишутся в журнал логирования (logging), который содержит информацию об этапах выполнения программы.
Асинхронная фиксация смещений
При синхронной фиксации смещений приложение блокируется (остальные функции и запросы становятся недоступны) до тех пор, пока брокер Kafka не подтвердит успешную фиксацию. Это ограничивает пропускную способность (количество информации, передаваемое в единицу времени) приложения. В этом случае можно использовать асинхронную фиксацию. Асинхронная фиксация смещений — это фиксация, которая выполняется независимо (параллельно) от выполнения остальных функций приложения и не требует обязательного подтверждения факта успешной фиксации от Kafka-сервера. Следующий код на языке Java отвечает за выполнение асинхронной фиксации смещения записей в топике с помощью метода commitAsync() [2]:
Главное отличие между асинхронной и синхронной фиксациями состоит в том, что синхронная фиксация будет повторять попытку фиксации смещения до тех пор, пока она не завершится успешно (за исключением полного выхода из строя сервера Kafka). Асинхронная фиксация, в случае возникновения ошибочной ситуации (например, истечение времени ожидания или временный сбой Kafka-сервера), не станет повторять попытку фиксации смещения текущей записи, а сразу перейдет к фиксации смещения следующей доступной (или поступившей) записи [2].
Таким образом, благодаря механизму управления фиксациями, брокер Kafka может весьма эффективно регистрировать новые записи Big Data в топиках, обращаться к старым, а также удалять из топиков записи, которые более не используются. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про перебалансировку разделов в Kafka.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
Краткий обзор Apache Kafka
Apache Kafka — брокер сообщений, реализующий паттерн Producer-Consumer с хорошими способностями к горизонтальному масштабированию. Это Open Source разработка, созданная компанией LinkedIn на JVM стеке (Scala).
Горизонтально масштабируя какую-либо систему, вы поневоле делаете её распределённой, а работа с распределённой системой имеет свои особенности.
Формально, для описания свойств распределённых систем существует CAP-теорема.
В распределённой системе невозможно обеспечить одновременное выполнение всех трёх свойств: консистентности, доступности, устойчивости к сбоям узлов.
Что это за свойства:
Консистентность (Consistency) Говорит о том, что система всегда выдаёт только логически непротиворечивые ответы. Не бывает такого, что вы добавили в корзину товар, а после рефреша страницы его там не видите. Доступность (Availability) Означает, что сервис отвечает на запросы, а не выдаёт ошибки о том, что он недоступен. Устойчивость к сбоям сети (Partition tolerance) Означает, что распределённая по кластеру система работает в расчёте на случаи произвольной потери пакетов внутри сети.
С точки зрения CAP-теоремы, Kafka имеет CA*, т.е. выполняются условия консистентности и доступности, но не гарантируется устойчивость к сбоям в сети — по отзывам пользователей, Kafka не очень устойчива к netsplit (моменту, когда ваш кластер, например, разваливается пополам), хотя официальной документации на этот счёт мы не нашли.
На самом низком уровне Kafka — это просто распределённый лог-файл. То есть, по сути, файл, разбитый на несколько частей (партиций) и «раскатанный» на несколько узлов кластера. Запись в этот файл всегда происходит в конец. Разделение файла на части необходимо для ускорения чтения из очереди и горизонтального масштабирования. Ваш Topic (тема) может быть «порезан» на сколько угодно частей. Соответственно, вы можете разделить Topic на сколько угодно серверов. Из каждой партиции может читать не более одного Consumer (читатель). Это значит, что максимальное число параллельных читателей равно количеству частей, на которые разбит ваш Topic.
Соответственно, для одной партиции топика гарантируется очерёдность сообщений, так как из каждой партиции может читать не более одного читателя.
У каждого сообщения есть свой сквозной номер внутри патриции. В терминах Kafka это называется offset. При чтении из партиции читатель делает коммит оффсета. Это необходимо для того, чтобы, если, например, текущий читатель упадёт, то следующий (новый читатель) начнёт с последнего коммита.
Читатели объединяются в группы, что так и называется — consumer group. При добавлении нового читателя или падении текущего, группа перебалансирутся. Это занимает какое-то время, поэтому лучший способ чтения — подключить читателя и не переподключать его без необходимости.
Что касается доступности, Kafka обеспечивает репликацию сообщений и disk persistence, сохраняя сообщения на диск.
Формат репликации называется InSync. Это значит, что слейвы (в терминах Kafka это фолловеры) сами постоянно спрашивают мастера о новых сообщениях. Это pull-модель. Синхронностью/асинхронностью репликации вы можете управлять сами, указывая какие гарантии (acknowledgement) вы хотите получить при записи в очередь. Kafka поддерживает три режима:
Вы должны найти компромисс между возможностью потери сообщений и минимальным откликом приложения. Чем выше гарантии доставки, тем, соответственно, дольше запись в очередь.
Поскольку Kafka гарантирует консистентность, для читателей сообщение будет видно только после записи по всем репликам. Репликация происходит отдельно для каждой партиции в топике.
Если вспомнить про disk persistence, то он вытекает из устройства Kafka. Так как вся система — это просто лог, то все сообщения в любом случае попадают на диск и это невозможно выключить, но в конфигурации можно подкрутить ручку, какими периодами сообщения падают на диск. Что, соответственно, уменьшит ваши гарантии на потерю сообщений, но увеличит производительность.
Клиенты для Kafka достаточно интеллектуальные и работают на уровне TCP. В коробке с Kafka лежит клиент на Java (так как сама Kafka написана на Scala) и библиотека на C.
Выводы: Apache Kafka менее удобна, чем тот же RabbitMQ, но если вы не можете терять сообщения, то вариант с Kafka подходит больше. К тому же у Kafka гораздо больше scalability (расширяемость).
Как получать записи с заданными смещениями в Kafka
В прошлый раз мы говорили про цикл опроса в Big Data брокере Apache Kafka. Сегодня поговорим про механизм получения записей с заданными смещениями в Kafka. Читайте далее про механизм заданных смещений, благодаря которому брокеры Kafka могут получать любые необходимые данные из топиков независимо от того, в какой момент времени они появились.
Как работает механизм заданных смещений в Apache Kafka: особенности получения Big Data
Заданное смещение (current offset) в Kafka — это порядковый номер записи (индекс), который указывается для получения конкретной записи в данный момент времени независимо от того, когда она была создана. Заданные пользователем смещения позволяют переходить к конкретной записи минуя все остальные. Брокер Kafka устроен таким образом, что каждой записи при создании присваивается порядковый номер, на который указывает курсор (метка, которая перемещается при обращении или создании записи). Как только пользователь обращается к конкретной записи, указывая соответствующий индекс, курсор перемещается в данном направлении и фиксируется на заданном элементе (записи) [1].
Обращение к записи по заданному смещению
Работа с заданными смещениями: несколько практических примеров
Однако стоит отметить, что данный способ не гарантирует корректной работы при аварийном сбое приложения, так как при каждом сбое все обращения к серверу будут инициироваться заново, и при повторном заполнении коллекции в ней появятся дубликаты. Для того, чтобы гарантировать корректное обращение к записям по заданным смещениям во время сбоя, необходимо предусмотреть возможность перебалансировки (передача раздела от неактивного подписчика активному) и каждый раз, когда она происходит, заново выполнять поиск заданного смещения с помощью метода seek() [1]:
Таким образом, благодаря механизму заданных смещений, брокер Apache Kafka гарантирует возможность долгосрочного хранения и получения записей Big Data с необходимой позиции независимо от того, в какой момент времени они были сформированы. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про десериализаторы в Kafka.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
Понимание брокеров сообщений. Изучение механики обмена сообщениями посредством ActiveMQ и Kafka. Глава 3. Kafka
Продолжение перевода небольшой книги:
«Understanding Message Brokers»,
автор: Jakub Korab, издательство: O’Reilly Media, Inc., дата издания: June 2017, ISBN: 9781492049296.
ГЛАВА 3
Kafka
Kafka была разработана в LinkedIn для того, чтобы обойти некоторые ограничения традиционных брокеров сообщений и избежать необходимости настраивать несколько брокеров сообщений для разных взаимодействий «точка-точка», что описано в данной книге в разделе «Вертикальное и горизонтальное масштабирование» на странице 28. Сценарии использования в LinkedIn в основном основывались на однонаправленном поглощении очень больших объемов данных, таких как клики на страницах и журналы доступа, в то же время позволяя использовать эти данные нескольким системам, не влияя на производительность продюсеров или других консюмеров. Фактически, причина существования Kafka заключается в том, чтобы получить такую архитектуру обмена сообщениями, которую описывает Universal Data Pipeline.
С учетом этой конечной цели, естественно, возникли и другие требования. Kafka должна:
Унифицированная модель адресата
В оставшейся части этой главы, если мы явно не укажем иное, термин «топик» будет относиться к топику Kafka.
Термины «журнал» и «указатель» не встречаются в документации Kafka. Эти хорошо известные термины используются здесь, чтобы помочь пониманию.
Эта модель полностью отличается от ActiveMQ, где сообщения из всех очередей хранятся в одном журнале, а брокер помечает сообщения, как удаленные, после того как они были прочитаны.
Давайте теперь немного углубимся и рассмотрим журнал топика более подробно.
Журнал Kafka состоит из нескольких партиций (Figure 3-1). Kafka гарантирует строгую упорядоченность в каждой партиции. Это означает, что сообщения, записанные в партицию в определенном порядке, будут прочитаны в том же порядке. Каждая партиция реализована в виде цикличного (rolling) файла журнала, который содержит подмножество (subset) всех сообщений, отправленных в топик его продюсерами. Созданный топик содержит по-умолчанию одну партицию. Идея партиций — это центральная идея Kafka для горизонтального масштабирования.
Figure 3-1. Партиции Kafka
Когда продюсер отправляет сообщение в топик Kafka, он решает, в какую партицию отправить сообщение. Мы рассмотрим это более подробно позже.
Чтение сообщений
Клиент, который хочет прочитать сообщения, управляет именованным указателем, называемым группа консюмеров (consumer group), который указывает на смещение (offset) сообщения в партиции. Смещение — это позиция с возрастающим номером, которая начинается с 0 в начале партиции. Эта группа консюмеров, на которую ссылаются в API через определяемый пользователем идентификатор group_id, соответствует одному логическому потребителю или системе.
Большинство систем, использующих обмен сообщениями, читают данные из адресата посредством нескольких экземпляров и потоков для параллельной обработки сообщений. Таким образом, обычно будет много экземпляров консюмеров, совместно использующих одну и ту же группу консюмеров.
Проблему чтения можно представить следующим образом:
Консюмеры и группы консюмеров
Давайте возьмем в качестве отправной точки топик с одной партицией (Figure 3-2).
Figure 3-2. Консюмер читает из партиции
Когда экземпляр консюмера подключается со своим собственным group_id к этому топику, ему назначается партиция для чтения и смещение в этой партиции. Положение этого смещения конфигурируется в клиенте, как указатель на самую последнюю позицию (самое новое сообщение) или самую раннюю позицию (самое старое сообщение). Консюмер запрашивает (polls) сообщения из топика, что приводит к их последовательному чтению из журнала.
Позиция смещения регулярно коммитится обратно в Kafka и сохраняется, как сообщения во внутреннем топике _consumer_offsets. Прочитанные сообщения все равно не удаляются, в отличие от обычного брокера, и клиент может перемотать (rewind) смещение, чтобы повторно обработать уже просмотренные сообщения.
Когда подключается второй логический консюмер, используя другой group_id, он управляет вторым указателем, который не зависит от первого (Figure 3-3). Таким образом, топик Kafka действует как очередь, в которой существует один консюмер и, как обычный топик издатель-подписчик (pub-sub), на который подписаны несколько консюмеров, с дополнительным преимуществом, что все сообщения сохраняются и могут обрабатываться несколько раз.
Figure 3-3. Два консюмера в разных группах консюмеров читают из одной партиции
Консюмеры в группе консюмеров
Когда один экземпляр консюмера читает данные из партиции, он полностью контролирует указатель и обрабатывает сообщения, как описано в предыдущем разделе.
Если несколько экземпляров консюмеров были подключены с одним и тем же group_id к топику с одной партицией, то экземпляру, который подключился последним, будет передан контроль над указателем и с этого момента он будет получать все сообщения (Figure 3-4).
Figure 3-4. Два консюмера в одной и той же группе консюмеров читают из одной партиции
Этот режим обработки, в котором количество экземпляров консюмеров превышает число партиций, можно рассматривать как разновидность монопольного потребителя. Это может быть полезно, если вам нужна «активно-пассивная» (или «горячая-теплая») кластеризация ваших экземпляров консюмеров, хотя параллельная работа нескольких консюмеров («активно-активная» или «горячая-горячая») намного более типична, чем консюмеры в режиме ожидания.
Такое поведение распределения сообщений, описанное выше, может вызывать удивление в сравнении с тем, как ведет себя обычная очередь JMS. В этой модели сообщения, отправленные в очередь, будут равномерно распределены между двумя консюмерами.
Чаще всего, когда мы создаем несколько экземпляров консюмеров, мы делаем это либо для параллельной обработки сообщений, либо для увеличения скорости чтения, либо для повышения устойчивости процесса чтения. Поскольку читать данные из партиции может одновременно только один экземпляр консюмера, то как это достигается в Kafka?
Один из способов сделать это — использовать один экземпляр консюмера, чтобы прочитать все сообщения и передать их в пул потоков. Хотя этот подход увеличивает пропускную способность обработки, он увеличивает сложность логики консюмеров и ничего не делает для повышения устойчивости системы чтения. Если один экземпляр консюмера отключается из-за сбоя питания или аналогичного события, то вычитка прекращается.
Каноническим способом решения этой проблемы в Kafka является использование бОльшего количества партиций.
Партиционирование
Партиции являются основным механизмом распараллеливания чтения и масштабирования топика за пределы пропускной способности одного экземпляра брокера. Чтобы лучше понять это, давайте рассмотрим ситуацию, когда существует топик с двумя партициями и на этот топик подписывается один консюмер (Figure 3-5).
Figure 3-5. Один консюмер читает из нескольких партиций
В этом сценарии консюмеру дается контроль над указателями, соответствующими его group_id в обоих партициях, и начинается чтение сообщений из обеих партиций.
Когда в этот топик добавляется дополнительный консюмер для того же group_id, Kafka переназначает (reallocate) одну из партиций с первого на второй консюмер. После чего каждый экземпляр консюмера будет вычитывать из одной партиции топика (Figure 3-6).
Чтобы обеспечить обработку сообщений параллельно в 20 потоков, вам потребуется как минимум 20 партиций. Если партиций будет меньше, у вас останутся консюмеры, которым не над чем работать, что описано ранее в обсуждении монопольных консюмеров.
Figure 3-6. Два консюмера в одной и той же группе консюмеров читают из разных партиций
Эта схема значительно снижает сложность работы брокера Kafka по сравнению с распределением сообщений, необходимым для поддержки очереди JMS. Здесь не нужно заботится о следующих моментах:
Однако, требования к распараллеливанию вычитки и повторной отправке неудачных сообщений никуда не деваются — ответственность за них просто переходит от брокера к клиенту. Это означает, что они должны быть учтены в вашем коде.
Отправка сообщений
Ответственность за решение, в какую партицию отправить сообщение, возлагается на продюсер этого сообщения. Чтобы понять механизм, с помощью которого это делается, сначала нужно рассмотреть, что именно мы на самом деле отправляем.
В то время, как в JMS мы используем структуру сообщения с метаданными (заголовками и свойствами) и телом, содержащим полезную нагрузку (payload), в Kafka сообщение является парой «ключ-значение». Полезная нагрузка сообщения отправляется, как значение (value). Ключ, с другой стороны, используется главным образом для партиционирования и должен содержать специфичный для бизнес-логики ключ, чтобы поместить связанные сообщений в ту же партицию.
В Главе 2 мы обсуждали сценарий онлайн-ставок, когда связанные события должны обрабатываться по порядку одним консюмером:
Этот интерфейс выглядит следующим образом:
Реализация Partitioner для определения партиции использует по-умолчанию алгоритм хеширования ключа (general-purpose hashing algorithm over the key) или циклический перебор (round-robin), если ключ не указан. Это значение по-умолчанию работает хорошо в большинстве случаев. Однако, в будущем вы захотите написать свой собственный.
Написание собственной стратегии партиционирования
Давайте рассмотрим пример, когда вы хотите отправить метаданные вместе с полезной нагрузкой сообщения. Полезная нагрузка в нашем примере — это инструкция для внесения депозита на игровой счет. Инструкция — это то, что мы хотели бы гарантированно не модифицировать при передаче и хотим быть уверены, что только доверенная вышестоящая система может инициировать эту инструкцию. В этом случае отправляющая и принимающая системы согласовывают использование подписи для проверки подлинности сообщения.
В обычном JMS мы просто определяем свойство «подпись сообщения» и добавляем его к сообщению. Тем не менее, Kafka не предоставляет нам механизм для передачи метаданных — только ключ и значение.
Поскольку значение — это полезная нагрузка банковского перевода (bank transfer payload), целостность которой мы хотим сохранить, у нас не остается другого выбора, кроме определения структуры данных для использования в ключе. Предполагая, что нам нужен идентификатор учетной записи для партиционирования, так как все сообщения, относящиеся к учетной записи, должны обрабатываться по порядку, мы придумаем следующую структуру JSON:
Поскольку значение подписи будет варьироваться в зависимости от полезной нагрузки, дефолтная стратегия хеширования интерфейса Partitioner не будет надежно группировать связанные сообщения. Поэтому нам нужно будет написать свою собственную стратегию, которая будет анализировать этот ключ и разделять (partition) значение accountId.
Kafka включает контрольные суммы для обнаружения повреждения сообщений в хранилище и имеет полный набор функций безопасности. Даже в этом случае иногда появляются специфические для отрасли требования, такие как приведенное выше.
Пользовательская стратегия партиционирования должна гарантировать, что все связанные сообщения окажутся в одной партиции. Хотя это кажется простым, но требование может быть усложнено из-за важности упорядочивания связанных сообщений и того, насколько фиксировано количество партиций в топике.
Количество партиций в топике может изменяться со временем, так как их можно добавить, если трафик выходит за пределы первоначальных ожиданий. Таким образом, ключи сообщений могут быть связаны с партицией, в которую они были первоначально отправлены, подразумевая часть состояния, которое должно быть распределено между экземплярами продюсера.
Другим фактором, который следует учитывать, является равномерность распределения сообщений между партициями. Как правило, ключи не распределяются равномерно по сообщениям, и хеш-функции не гарантируют справедливое распределение сообщений для небольшого набора ключей.
Важно отметить, что, как бы вы ни решили разделить сообщения, сам разделитель, возможно, придется использовать повторно.
Рассмотрим требование репликации данных между кластерами Kafka в разных географических расположениях. Для этой цели Kafka поставляется с инструментом командной строки под названием MirrorMaker, который используется для чтения сообщений из одного кластера и передачи их в другой.
MirrorMaker должен понимать ключи реплицируемого топика, чтобы поддерживать относительный порядок между сообщениями при репликации между кластерами, поскольку количество партиций для этого топика может не совпадать в двух кластерах.
Пользовательские стратегии партиционирования встречаются относительно редко, так как дефолтные хеширование или циклический перебор успешно работают в большинстве сценариев. Однако, если вам требуются строгие гарантии упорядочивания или вам необходимо извлечь метаданные из полезных нагрузок, то партиционирование — это то, на что вам следует взглянуть более подробно.
Преимущества масштабируемости и производительности Kafka обусловлены переносом некоторых обязанностей традиционного брокера на клиента. В этом случае принимается решение о распределении потенциально связанных сообщений по нескольким консюмерам, работающим параллельно.
JMS брокеры также должны иметь дело с такими требованиями. Интересно, что механизм отправки связанных сообщений одному и тому же консюмеру, реализованный через JMS Message Groups (разновидность стратегии балансировки sticky load balancing (SLB)), также требует, чтобы отправитель помечал сообщения, как связанные. В случае JMS, брокер отвечает за отправку этой группы связанных сообщений одному консюмеру из многих и передачу прав собственности на группу если консюмер отвалился.
Соглашения по продюсеру
Партиционирование — это не единственное, что необходимо учитывать при отправке сообщений. Давайте рассмотрим методы send () класса Producer в Java API:
Следует сразу отметить, что оба метода возвращают Future, что указывает на то, что операция отправки не выполняется немедленно. В результате получается, что сообщение (ProducerRecord) записывается в буфер отправки для каждой активной партиции и передается брокеру фоновым потоком в библиотеке клиента Kafka. Хотя это делает работу невероятно быстрой, это означает, что неопытно написанное приложение может потерять сообщения, если его процесс будет остановлен.
Как всегда, есть способ сделать операцию отправки более надежной за счет производительности. Размер этого буфера можно установить в 0, и поток отправляющего приложения будет вынужден ждать, пока передача сообщения брокеру не будет завершена, следующим образом:
Еще раз о чтении сообщений
Чтение сообщений имеет дополнительные сложности, о которых необходимо порассуждать. В отличие от API JMS, который может запускать слушателя сообщений (message listener) в ответ на поступление сообщения, интерфейс Consumer Kafka только опрашивает (polling). Давайте подробнее рассмотрим метод poll (), используемый для этой цели:
Возвращаемое значение метода — это контейнерная структура, содержащая несколько объектов ConsumerRecord из потенциально нескольких партиций. ConsumerRecord сам по себе является объектом-холдером для пары ключ-значение с соответствующими метаданными, такими, как партиция, из которой он получен.
Как обсуждалось в Главе 2, мы должны постоянно помнить, что происходит с сообщениями после их успешной или неуспешной обработки, например, если клиент не может обработать сообщение или если он прерывает работу. В JMS это обрабатывалось через режим подтверждения (acknowledgement mode). Брокер либо удалит успешно обработанное сообщение, либо повторно доставит необработанное или зафейленное (при условии, что были использованы транзакции).
Kafka работает совсем по-другому. Сообщения не удаляются в брокере после вычитки и ответственность за то, что происходит при сбое, лежит на самом вычитывающем коде.
Как мы уже говорили, группа консюмеров связана со смещением в журнале. Позиция в журнале, связанная с этим смещением, соответствует следующему сообщению, которое будет выдано в ответ на poll (). Решающее значение при чтении имеет момент времени, когда это смещение увеличивается.
Возвращаясь к модели чтения, рассмотренной ранее, обработка сообщения состоит из трех этапов:
До Kafka 0.10 клиент, использовавший этот параметр, отправлял смещение последнего прочитанного сообщения при следующем вызове poll () после обработки. Это означало, что любые сообщения, которые уже были извлечены (fetched), могли быть повторно обработаны, если клиент их уже обработал, но был неожиданно уничтожен перед вызовом poll (). Поскольку брокер не сохраняет никакого состояния относительно того, сколько раз сообщение было прочитано, следующий консюмер, который извлекает это сообщение, не будет знать, что произошло что-то плохое. Это поведение было псевдо-транзакционным. Смещение коммитилось только в случае успешной обработки сообщения, но если клиент прерывал работу, брокер снова отправлял то же самое сообщение другому клиенту. Такое поведение соответствовало гарантии доставки сообщений «по крайней мере один раз«.
В Kafka 0.10 код клиента был изменен таким образом, что коммит стал периодически запускаться библиотекой клиента, в соответствии с настройкой auto.commit.interval.ms. Это поведение находится где-то между режимами JMS AUTO_ACKNOWLEDGE и DUPS_OK_ACKNOWLEDGE. При использовании автокоммита сообщения могли быть подтверждены независимо от того, были ли они фактически обработаны — это могло произойти в случае медленного консюмера. Если консюмер прерывал работу, сообщения извлекались следующим консюмером, начиная с закоммиченной позиции, что могло привести к пропуску сообщения. В этом случае Kafka не теряла сообщения, читающий код просто не обрабатывал их.
Этот режим имеет те же перспективы, что и в версии 0.9: сообщения могут быть обработаны, но в случае сбоя, смещение может быть не закоммичено, что потенциально может привести к задвоению доставки. Чем больше сообщений вы извлекаете при выполнении poll (), тем больше эта проблема.
Как обсуждалось в разделе «Вычитка сообщений из очереди» в Главе 2, в системе обмена сообщениями нет такого понятия, как однократная доставка сообщения, если принять во внимание режимы сбоев.
В Kafka есть два способа зафиксировать (закоммитить) смещение (оффсет): автоматически и вручную. В обоих случаях сообщения могут обрабатываться несколько раз, в том случае, если сообщение было обработано, но произошел сбой до коммита. Вы также можете вообще не обрабатывать сообщение, если коммит произошел в фоне и ваш код был завершен до того, как он приступил к обработке (возможно в Kafka 0.9 и более ранних версиях).
Управлять процессом коммита смещения вручную можно в API консюмера Kafka, установив параметр enable.auto.commit в значение false и явно вызвав один из следующих методов:
Если вы стремитесь обработать сообщение «хотя бы один раз», вы должны закоммитить смещение вручную с помощью commitSync (), выполнив эту команду сразу после обработки сообщений.
Эти методы не позволяют подтверждать (acknowledged) сообщения до того, как они будут обработаны, но они ничего не делают для устранения потенциального задвоения обработки, в то же время создавая видимость транзакционности. В Kafka отсутствуют транзакции. У клиента нет возможности сделать следующее:
Если существует вероятность того, что смещение консюмера может увеличиться до того, как сообщение было обработано, например, во время сбоя консюмера, то у консюмера нет способа узнать, пропустила ли его группа консюмеров сообщения, когда ей назначают партицию. Таким образом, одна из стратегий заключается в перемотке (rewind) смещения на предыдущую позицию. API консюмера Kafka предоставляет следующие методы для этого:
Метод seek () может использоваться с методом
offsetsForTimes (Map timestampsToSearch) для перемотки в состояние в какой-либо определенный момент в прошлом.
Неявно, использование этого подхода означает, что, весьма вероятно, некоторые сообщения, которые были обработаны ранее, будут прочитаны и обработаны заново. Чтобы избежать этого, мы можем использовать идемпотентное чтение, как описано в Главе 4, для отслеживания ранее просмотренных сообщений и исключения дубликатов.
Как альтернатива, код вашего консюмера может быть простым, если допустима потеря или дублирование сообщений. Когда мы рассматриваем сценарии использования, для которых обычно используется Kafka, например, обработка событий логов, метрик, отслеживание кликов и т.д., мы понимаем, что потеря отдельных сообщений вряд ли окажет значимое влияние на окружающие приложения. В таких случаях значения по-умолчанию вполне допустимы. С другой стороны, если вашему приложению нужно передавать платежи, вы должны тщательно заботиться о каждом отдельном сообщении. Все сводится к контексту.
Личные наблюдения показывают, что с ростом интенсивности сообщений, ценность каждого отдельного сообщения снижается. Сообщения большого объема становятся, как правило, ценными, если их рассматривать в агрегированной форме.
Высокая доступность (High Availability)
Подход Kafka в отношении высокой доступности существенно отличается от подхода ActiveMQ. Kafka разработана на базе горизонтально масштабируемых кластеров, в которых все экземпляры брокера принимают и раздают сообщения одновременно.
Кластер Kafka состоит из нескольких экземпляров брокера, работающих на разных серверах. Kafka была разработана для работы на обычном автономном железе, где каждый узел имеет свое собственное выделенное хранилище. Использование сетевых хранилищ (SAN) не рекомендуется, поскольку множественные вычислительные узлы могут конкурировать за временнЫе интервалы хранилища и создавать конфликты.
Kafka — это постоянно включенная система. Многие крупные пользователи Kafka никогда не гасят свои кластеры и программное обеспечение всегда обеспечивает обновление путем последовательного рестарта. Это достигается за счет гарантирования совместимости с предыдущей версией для сообщений и взаимодействий между брокерами.
Брокеры подключены к кластеру серверов ZooKeeper, который действует, как реестр конфигурационных данный и используется для координации ролей каждого брокера. ZooKeeper сам является распределенной системой, которая обеспечивает высокую доступность посредством репликации информации путем установления кворума.
В базовом случае топик создается в кластере Kafka со следующими свойствами:
В рантайме для каждой партиции топика Контроллер назначает брокеру роли лидера (leader, master, ведущего) и последователей (followers, slaves, подчиненных). Брокер, выступающий в качестве лидера для данной партиции, отвечает за прием всех сообщений, отправленных ему продюсерами, и распространение сообщений по консюмерам. При отправке сообщений в партицию топика они реплицируются на все узлы брокера, выступающие в качестве последователей для этой партиции. Каждый узел, содержащий журналы для партиции, называется репликой. Брокер может выступать в качестве лидера для одних партиций и в качестве последователя для других.
Последователь, содержащий все сообщения, хранящиеся у лидера, называется синхронизированной репликой (репликой, находящейся в синхронизированном состоянии, in-sync replica). Если брокер, выступающий в качестве лидера для партиции, отключается, любой брокер, который находится в актуализированном или синхронизированном состоянии для этой партиции, может взять на себя роль лидера. Это невероятно устойчивый дизайн.
Частью конфигурации продюсера является параметр acks, который определяет, сколько реплик должно подтвердить (acknowledge) получение сообщения, прежде чем поток приложения продолжит отправку: 0, 1 или все. Если задано значение all, то при получении сообщения лидер отправит подтверждение (confirmation) обратно продюсеру, как только получит подтверждение (acknowledgements) записи от нескольких реплик (включая саму себя), определенных настройкой топика min.insync.replicas (по умолчанию 1). Если сообщение не может быть успешно реплицировано, то продюсер вызовет исключение для приложения (NotEnoughReplicas или NotEnoughReplicasAfterAppend).
В типичной конфигурации создается топик с коэффициентом репликации 3 (1 лидер, 2 последователя для каждой партиции) и параметр min.insync.replicas устанавливается в значение 2. В этом случае, кластер будет допускать, чтобы один из брокеров, управляющих партицией топика, мог отключаться без влияния на клиентские приложения.
Это возвращает нас к уже знакомому компромиссу между производительностью и надежностью. Репликация происходит за счет дополнительного времени ожидания подтверждений (acknowledgments) от последователей. Хотя, поскольку она выполняется параллельно, репликация, как минимум на три узла, имеет такую же производительность, как и на два (игнорируя увеличение использования пропускной способности сети).
Используя эту схему репликации, Kafka ловко избегает необходимости обеспечивать физическую запись каждого сообщения на диск с помощью операции sync (). Каждое сообщение, отправленное продюсером, будет записано в журнал партиции, но, как обсуждалось в Главе 2, запись в файл первоначально выполняется в буфер операционной системы. Если это сообщение реплицировано на другой экземпляр Kafka и находится в его памяти, потеря лидера не означает, что само сообщение было потеряно — его может взять на себя синхронизированная реплика.
Отказ от необходимости выполнять операцию sync () означает, что Kafka может принимать сообщения со скоростью, с которой она может записывать их в память. И наоборот, чем дольше можно избежать сброса (flushing) памяти на диск, тем лучше. По этой причине нередки случаи, когда брокерам Kafka выделяется 64 Гб памяти или более. Такое использование памяти означает, что один экземпляр Kafka может легко работать на скоростях во много тысяч раз быстрее, чем традиционный брокер сообщений.
Kafka также можно настроить для применения операции sync () к пакетам сообщений. Поскольку всё в Kafka ориентировано на работу с пакетами, это на самом деле работает довольно хорошо для многих сценариев использования и является полезным инструментом для пользователей, которые требуют очень сильных гарантий. Большая часть чистой производительности Kafka связана с сообщениями, которые отправляются брокеру в виде пакетов, и с тем, что эти сообщения считываются из брокера последовательными блоками с помощью zero-copy операций (операциями, в ходе которых не выполняется задача копирования данных из одной области памяти в другую). Последнее является большим выигрышем с точки зрения производительности и ресурсов и возможно только благодаря использованию лежащей в основе структуры данных журнала, определяющей схему партиции.
В кластере Kafka возможна гораздо более высокая производительность, чем при использовании одного брокера Kafka, поскольку партиции топика могут горизонтально масштабироваться на множестве отдельных машин.
Итоги
В этой главе мы рассмотрели, как архитектура Kafka переосмысливает отношения между клиентами и брокерами, чтобы обеспечить невероятно устойчивый конвейер обмена сообщениями, с пропускной способностью во много раз большей, чем у обычного брокера сообщений. Мы обсудили функциональность, которую она использует для достижения этой цели, и кратко рассмотрели архитектуру приложений, обеспечивающих эту функциональность. В следующей главе мы рассмотрим общие проблемы, которые необходимо решать приложениям на основе обмена сообщениями, и обсудим стратегии их решения. Мы завершим главу, обрисовав, как рассуждать о технологиях обмена сообщениями в целом, чтобы вы могли оценить их пригодность для ваших сценариев использования.