/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.springframework.core.NestedRuntimeException;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecoveryStrategy;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.FixedBackOff;

public final class SeekUtils {
    public static final int DEFAULT_MAX_FAILURES = 10;
    public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0L, 9L);
    private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();

    private SeekUtils() {
    }

    public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception, boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, LogAccessor logger) {
        return SeekUtils.doSeeks(records, consumer, exception, recoverable, (rec, ex, cont, cons) -> skipper.test(rec, ex), null, logger);
    }

    public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception, boolean recoverable, RecoveryStrategy recovery, @Nullable MessageListenerContainer container, LogAccessor logger) {
        LinkedHashMap<TopicPartition, Long> partitions = new LinkedHashMap<TopicPartition, Long>();
        AtomicBoolean first = new AtomicBoolean(true);
        AtomicBoolean skipped = new AtomicBoolean();
        records.forEach(record -> {
            if (recoverable && first.get()) {
                try {
                    boolean test = recovery.recovered((ConsumerRecord<?, ?>)record, exception, container, consumer);
                    skipped.set(test);
                }
                catch (Exception ex) {
                    if (SeekUtils.isBackoffException(ex)) {
                        logger.debug((Throwable)ex, () -> ListenerUtils.recordToString(record) + " included in seeks due to retry back off");
                    } else {
                        logger.error((Throwable)ex, () -> "Failed to determine if this record (" + ListenerUtils.recordToString(record) + ") should be recovererd, including in seeks");
                    }
                    skipped.set(false);
                }
                if (skipped.get()) {
                    logger.debug(() -> "Skipping seek of: " + ListenerUtils.recordToString(record));
                }
            }
            if (!(recoverable && first.get() && skipped.get())) {
                partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), offset -> record.offset());
            }
            first.set(false);
        });
        SeekUtils.seekPartitions(consumer, partitions, logger);
        return skipped.get();
    }

    public static void seekPartitions(Consumer<?, ?> consumer, Map<TopicPartition, Long> partitions, LogAccessor logger) {
        partitions.forEach((topicPartition, offset) -> {
            try {
                logger.trace(() -> "Seeking: " + topicPartition + " to: " + offset);
                consumer.seek(topicPartition, offset.longValue());
            }
            catch (Exception e) {
                logger.error((Throwable)e, () -> "Failed to seek " + topicPartition + " to " + offset);
            }
        });
    }

    public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered, BiPredicate<ConsumerRecord<?, ?>, Exception> skipPredicate, LogAccessor logger, KafkaException.Level level) {
        SeekUtils.seekOrRecover(thrownException, records, consumer, container, commitRecovered, (ConsumerRecord<?, ?> rec, Exception ex, MessageListenerContainer cont, Consumer<?, ?> cons) -> skipPredicate.test(rec, ex), logger, level);
    }

    public static void seekOrRecover(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container, boolean commitRecovered, RecoveryStrategy recovery, LogAccessor logger, KafkaException.Level level) {
        if (ObjectUtils.isEmpty(records)) {
            if (thrownException instanceof SerializationException) {
                throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer", thrownException);
            }
            throw new IllegalStateException("This error handler cannot process '" + thrownException.getClass().getName() + "'s; no record information is available", thrownException);
        }
        if (records == null || !SeekUtils.doSeeks(records, consumer, thrownException, true, recovery, container, logger)) {
            throw new KafkaException("Seek to current after exception", level, thrownException);
        }
        if (commitRecovered) {
            if (container.getContainerProperties().getAckMode().equals((Object)ContainerProperties.AckMode.MANUAL_IMMEDIATE)) {
                ConsumerRecord<?, ?> record = records.get(0);
                Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1L));
                if (container.getContainerProperties().isSyncCommits()) {
                    consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
                } else {
                    OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
                    if (commitCallback == null) {
                        commitCallback = LOGGING_COMMIT_CALLBACK;
                    }
                    consumer.commitAsync(offsetToCommit, commitCallback);
                }
            } else {
                logger.debug(() -> "'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE, not " + (Object)((Object)container.getContainerProperties().getAckMode()));
            }
        }
    }

    public static boolean isBackoffException(Exception exception) {
        return NestedRuntimeException.class.isAssignableFrom(exception.getClass()) && ((NestedRuntimeException)exception).contains(KafkaBackoffException.class);
    }
}

