package org.eclipse.hono.client.kafka.producer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/kafka/producer/CachingKafkaProducerFactory.class */
public class CachingKafkaProducerFactory<K, V> implements KafkaProducerFactory<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CachingKafkaProducerFactory.class);
    private final Map<String, KafkaProducer<K, V>> activeProducers = new ConcurrentHashMap();
    private final KafkaClientFactory kafkaClientFactory;
    private final BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier;
    private KafkaClientMetricsSupport metricsSupport;

    private CachingKafkaProducerFactory(Vertx vertx, BiFunction<String, Map<String, String>, KafkaProducer<K, V>> biFunction) {
        this.producerInstanceSupplier = biFunction;
        this.kafkaClientFactory = new KafkaClientFactory(vertx);
    }

    public static <K, V> CachingKafkaProducerFactory<K, V> sharedFactory(Vertx vertx) {
        return new CachingKafkaProducerFactory<>(vertx, (str, map) -> {
            return KafkaProducer.createShared(vertx, str, (Map<String, String>) map);
        });
    }

    public static <K, V> CachingKafkaProducerFactory<K, V> nonSharedFactory(Vertx vertx) {
        return new CachingKafkaProducerFactory<>(vertx, (str, map) -> {
            return KafkaProducer.create(vertx, (Map<String, String>) map);
        });
    }

    public static <K, V> CachingKafkaProducerFactory<K, V> testFactory(Vertx vertx, BiFunction<String, Map<String, String>, KafkaProducer<K, V>> biFunction) {
        return new CachingKafkaProducerFactory<>(vertx, biFunction);
    }

    @Override // org.eclipse.hono.client.kafka.producer.KafkaProducerFactory
    public final void setMetricsSupport(KafkaClientMetricsSupport kafkaClientMetricsSupport) {
        this.metricsSupport = kafkaClientMetricsSupport;
    }

    @Override // org.eclipse.hono.client.kafka.producer.KafkaProducerFactory
    public KafkaProducer<K, V> getOrCreateProducer(String str, KafkaProducerConfigProperties kafkaProducerConfigProperties) {
        AtomicReference atomicReference = new AtomicReference();
        KafkaProducer<K, V> computeIfAbsent = this.activeProducers.computeIfAbsent(str, str2 -> {
            Map<String, String> producerConfig = kafkaProducerConfigProperties.getProducerConfig(str);
            String str2 = producerConfig.get("client.id");
            KafkaProducer<K, V> apply = this.producerInstanceSupplier.apply(str, producerConfig);
            atomicReference.set(apply);
            return apply.exceptionHandler(getExceptionHandler(str, apply, str2));
        });
        if (this.metricsSupport != null && computeIfAbsent == atomicReference.get()) {
            this.metricsSupport.registerKafkaProducer(computeIfAbsent.unwrap());
        }
        return computeIfAbsent;
    }

    @Override // org.eclipse.hono.client.kafka.producer.KafkaProducerFactory
    public Future<KafkaProducer<K, V>> getOrCreateProducerWithRetries(String str, KafkaProducerConfigProperties kafkaProducerConfigProperties, BooleanSupplier booleanSupplier, Duration duration) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(kafkaProducerConfigProperties);
        Objects.requireNonNull(booleanSupplier);
        return this.kafkaClientFactory.createClientWithRetries(() -> {
            return getOrCreateProducer(str, kafkaProducerConfigProperties);
        }, booleanSupplier, kafkaProducerConfigProperties.getBootstrapServers(), duration);
    }

    private Handler<Throwable> getExceptionHandler(String str, KafkaProducer<K, V> kafkaProducer, String str2) {
        return th -> {
            if (!isFatalError(th)) {
                LOG.error("producer error occurred [clientId: {}]", str2, th);
                return;
            }
            LOG.error("fatal producer error occurred, closing producer [clientId: {}]", str2, th);
            this.activeProducers.remove(str);
            kafkaProducer.close().onComplete2(asyncResult -> {
                Optional.ofNullable(this.metricsSupport).ifPresent(kafkaClientMetricsSupport -> {
                    kafkaClientMetricsSupport.unregisterKafkaProducer(kafkaProducer.unwrap());
                });
            });
        };
    }

    @Override // org.eclipse.hono.client.kafka.producer.KafkaProducerFactory
    public Optional<KafkaProducer<K, V>> getProducer(String str) {
        return Optional.ofNullable(this.activeProducers.get(str));
    }

    @Override // org.eclipse.hono.client.kafka.producer.KafkaProducerFactory
    public Future<Void> closeProducer(String str) {
        KafkaProducer<K, V> remove = this.activeProducers.remove(str);
        if (remove == null) {
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        remove.close(promise);
        return promise.future().onComplete2(asyncResult -> {
            Optional.ofNullable(this.metricsSupport).ifPresent(kafkaClientMetricsSupport -> {
                kafkaClientMetricsSupport.unregisterKafkaProducer(remove.unwrap());
            });
        });
    }

    public static boolean isFatalError(Throwable th) {
        return (th instanceof ProducerFencedException) || (th instanceof OutOfOrderSequenceException) || (th instanceof AuthorizationException) || (th instanceof UnsupportedVersionException) || (th instanceof UnsupportedForMessageFormatException);
    }
}
