Когда вызывать метод закрытия KafkaProducer?

Как сказано в документе Кафки,

Производитель является потокобезопасным, и совместное использование одного экземпляра производителя между потоками, как правило, будет быстрее, чем наличие нескольких экземпляров.

Итак, у меня есть следующий код, и я хочу иметь только один экземпляр KafkaProducer для каждого запроса на отправку. Но в каком месте кода лучше всего вызывать для него метод close? Поскольку я не могу вызвать метод close в методе отправки. Как мне написать код для обработки?

public class Producer {
    private final KafkaProducer<Integer, String> producer;

    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
    }

    public void send(String message) {
        producer.send(new ProducerRecord<>(topic, messageNo, messageStr);
    }
}

person ttt    schedule 25.07.2018    source источник


Ответы (3)


Кафка производитель реализует интерфейс AutoClosable. Таким образом, вы можете объявить его в блоке try-with-resources, и он должен позаботиться об освобождении ресурсов, когда ваш код выходит за рамки блока.

person Bitswazsky    schedule 26.07.2018
comment
Я не хочу закрывать производителя каждый раз после вызова отправки, иначе мне придется каждый раз создавать экземпляр производителя. - person ttt; 26.07.2018
comment
Вы используете несколько потоков? Если да, то являются ли они запускаемыми или вызываемыми? Они постоянно бегают? Если возможно, обновите свой вопрос с помощью этой информации. - person Bitswazsky; 26.07.2018
comment
не несколько потоков, только один. Но есть конечная точка POST, и каждый раз, когда она вызывается, здесь будет вызываться метод send. Если в таком случае, то когда пора делать close? - person ttt; 26.07.2018
comment
Если это постоянно работающее приложение, то вызов метода close() может не иметь особого смысла. - person Bitswazsky; 26.07.2018
comment
Да, это загрузочное веб-приложение Spring. Значит, не нужно вызывать метод close? - person ttt; 26.07.2018
comment
Я так думаю. В нашем производстве у нас есть приложение на основе отдыха, которое принимает данные в Kafka, и я почти уверен, что оно никогда не закрывается. До сих пор мы не видели никаких утечек ресурсов, связанных с производителем, так что все должно быть в порядке. - person Bitswazsky; 26.07.2018
comment
Потрясающе, спасибо. Вот что я хочу знать. Я также думаю реализовать интерфейс Closeable моего класса Producer, если есть утечка некоторых ресурсов. - person ttt; 26.07.2018

Вы можете сначала создать производителя и передать (внедрить) его в свой веб-ресурс или ресурсы.

Затем вы можете использовать хук выключения, чтобы закрыть производителя с его ссылкой.

Еще лучше, если вы можете использовать хуки остановки жизненного цикла в своей структуре. Пример в dropwizard: https://www.dropwizard.io/en/latest/manual/core.html?highlight=managed#managed-objects

person Smalltalkguy    schedule 15.11.2020

Вам когда-нибудь приходилось менять параметры производителя во время выполнения? Например, при изменении URL-адресов брокера или во время тестов?

Если вам это нужно и у вас есть одиночный производитель, обязательно предоставьте хук для закрытия и повторного создания производителя с новыми параметрами.

person ntucci    schedule 05.07.2021