/*
 * Decompiled with CFR 0.152.
 */
package com.chdu.ai.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.chdu.ai.canal.CanalClientService;
import com.chdu.ai.canal.CanalEventListener;
import com.chdu.ai.config.CanalConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CanalClientService {
    private static final Logger log = LoggerFactory.getLogger(CanalClientService.class);
    @Autowired
    private CanalConfig canalConfig;
    @Autowired
    private CanalEventListener canalEventListener;
    private List<DestinationConnector> connectors = new ArrayList();
    private ExecutorService executorService;
    private volatile boolean running = false;

    @PostConstruct
    public void init() {
        try {
            List destinations = this.canalConfig.getDestinationList();
            log.info("\u5f00\u59cb\u521d\u59cb\u5316Canal\u5ba2\u6237\u7aef\uff0c\u670d\u52a1\u5668\u5730\u5740: {}:{}, destinations: {}", new Object[]{this.canalConfig.getHost(), this.canalConfig.getPort(), destinations});
            for (String destination : destinations) {
                try {
                    log.info("\u6b63\u5728\u8fde\u63a5destination: {}", (Object)destination);
                    CanalConnector connector = CanalConnectors.newSingleConnector((SocketAddress)new InetSocketAddress(this.canalConfig.getHost(), (int)this.canalConfig.getPort()), (String)destination, (String)this.canalConfig.getUsername(), (String)this.canalConfig.getPassword());
                    connector.connect();
                    log.info("Canal\u5ba2\u6237\u7aef\u8fde\u63a5\u6210\u529f: destination={}", (Object)destination);
                    String filter = this.canalConfig.getFilterByDestination(destination);
                    if (filter == null || filter.isEmpty()) {
                        log.warn("destination={} \u672a\u914d\u7f6e\u8fc7\u6ee4\u89c4\u5219\uff0c\u8df3\u8fc7\u8ba2\u9605", (Object)destination);
                        continue;
                    }
                    connector.subscribe(filter);
                    log.info("Canal\u8ba2\u9605\u8868\u8fc7\u6ee4\u89c4\u5219: destination={}, filter={}", (Object)destination, (Object)filter);
                    connector.rollback();
                    DestinationConnector destConnector = new DestinationConnector(destination, connector);
                    this.connectors.add(destConnector);
                }
                catch (Exception e) {
                    log.error("\u8fde\u63a5destination\u5931\u8d25: destination={}", (Object)destination, (Object)e);
                }
            }
            if (this.connectors.isEmpty()) {
                throw new RuntimeException("\u6240\u6709destination\u8fde\u63a5\u5931\u8d25");
            }
            this.running = true;
            this.executorService = Executors.newFixedThreadPool(this.connectors.size(), r -> {
                Thread thread = new Thread(r, "canal-client-thread");
                thread.setDaemon(true);
                return thread;
            });
            for (DestinationConnector destConnector : this.connectors) {
                destConnector.running = true;
                this.executorService.execute(() -> this.process(destConnector));
            }
            log.info("Canal\u5ba2\u6237\u7aef\u542f\u52a8\u6210\u529f\uff0c\u5171{}\u4e2adestination", (Object)this.connectors.size());
        }
        catch (Exception e) {
            log.error("Canal\u5ba2\u6237\u7aef\u521d\u59cb\u5316\u5931\u8d25", (Throwable)e);
            throw new RuntimeException("Canal\u5ba2\u6237\u7aef\u521d\u59cb\u5316\u5931\u8d25", e);
        }
    }

    private void process(DestinationConnector destConnector) {
        String destination = destConnector.destination;
        log.info("\u5f00\u59cb\u5904\u7406destination\u7684\u6d88\u606f: {}", (Object)destination);
        while (destConnector.running && this.running) {
            try {
                CanalConnector connector = destConnector.connector;
                Message message = connector.getWithoutAck(this.canalConfig.getBatchSize().intValue());
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1L || size == 0) {
                    Thread.sleep(1000L);
                    continue;
                }
                log.debug("\u6536\u5230Canal\u6d88\u606f: destination={}, batchId={}, size={}", new Object[]{destination, batchId, size});
                List entries = message.getEntries();
                for (CanalEntry.Entry entry : entries) {
                    if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) continue;
                    this.canalEventListener.onEvent(entry);
                }
                connector.ack(batchId);
                log.debug("Canal\u6d88\u606f\u5904\u7406\u5b8c\u6210: destination={}, batchId={}", (Object)destination, (Object)batchId);
            }
            catch (Exception e) {
                log.error("\u5904\u7406Canal\u6d88\u606f\u65f6\u53d1\u751f\u9519\u8bef: destination={}", (Object)destination, (Object)e);
                boolean needReconnect = false;
                try {
                    destConnector.connector.rollback();
                }
                catch (Exception rollbackException) {
                    log.error("Canal\u56de\u6eda\u5931\u8d25\uff0c\u53ef\u80fd\u662f\u8fde\u63a5\u65ad\u5f00: destination={}", (Object)destination, (Object)rollbackException);
                    needReconnect = true;
                }
                if (needReconnect) {
                    log.warn("\u68c0\u6d4b\u5230\u8fde\u63a5\u5f02\u5e38\uff0c\u5f00\u59cb\u91cd\u8fde: destination={}", (Object)destination);
                    if (this.reconnect(destConnector)) {
                        log.info("\u91cd\u8fde\u6210\u529f: destination={}", (Object)destination);
                        continue;
                    }
                    log.error("\u91cd\u8fde\u5931\u8d25\uff0c\u7b49\u5f85\u540e\u7ee7\u7eed\u91cd\u8bd5: destination={}", (Object)destination);
                    try {
                        Thread.sleep(10000L);
                        continue;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        log.info("\u505c\u6b62\u5904\u7406destination\u7684\u6d88\u606f: {}", (Object)destination);
    }

    private boolean reconnect(DestinationConnector destConnector) {
        String destination = destConnector.destination;
        try {
            log.info("\u5f00\u59cb\u91cd\u8fdeCanal: destination={}", (Object)destination);
            try {
                if (destConnector.connector != null) {
                    destConnector.connector.disconnect();
                    log.info("\u5df2\u65ad\u5f00\u65e7\u8fde\u63a5: destination={}", (Object)destination);
                }
            }
            catch (Exception e) {
                log.warn("\u65ad\u5f00\u65e7\u8fde\u63a5\u65f6\u53d1\u751f\u5f02\u5e38\uff08\u53ef\u5ffd\u7565\uff09: destination={}", (Object)destination, (Object)e);
            }
            CanalConnector connector = CanalConnectors.newSingleConnector((SocketAddress)new InetSocketAddress(this.canalConfig.getHost(), (int)this.canalConfig.getPort()), (String)destination, (String)this.canalConfig.getUsername(), (String)this.canalConfig.getPassword());
            connector.connect();
            log.info("Canal\u5ba2\u6237\u7aef\u91cd\u8fde\u6210\u529f: destination={}", (Object)destination);
            String filter = this.canalConfig.getFilterByDestination(destination);
            if (filter == null || filter.isEmpty()) {
                log.warn("destination={} \u672a\u914d\u7f6e\u8fc7\u6ee4\u89c4\u5219\uff0c\u8df3\u8fc7\u8ba2\u9605", (Object)destination);
                return false;
            }
            connector.subscribe(filter);
            log.info("Canal\u91cd\u65b0\u8ba2\u9605\u8868\u8fc7\u6ee4\u89c4\u5219: destination={}, filter={}", (Object)destination, (Object)filter);
            connector.rollback();
            destConnector.connector = connector;
            log.info("Canal\u91cd\u8fde\u5b8c\u6210: destination={}", (Object)destination);
            return true;
        }
        catch (Exception e) {
            log.error("\u91cd\u8fdeCanal\u5931\u8d25: destination={}", (Object)destination, (Object)e);
            return false;
        }
    }

    @PreDestroy
    public void destroy() {
        log.info("\u5f00\u59cb\u505c\u6b62Canal\u5ba2\u6237\u7aef");
        this.running = false;
        for (DestinationConnector destConnector : this.connectors) {
            destConnector.running = false;
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    this.executorService.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                this.executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        for (DestinationConnector destConnector : this.connectors) {
            try {
                if (destConnector.connector == null) continue;
                destConnector.connector.disconnect();
                log.info("Canal\u5ba2\u6237\u7aef\u5df2\u65ad\u5f00\u8fde\u63a5: destination={}", (Object)destConnector.destination);
            }
            catch (Exception e) {
                log.error("\u65ad\u5f00Canal\u8fde\u63a5\u65f6\u53d1\u751f\u9519\u8bef: destination={}", (Object)destConnector.destination, (Object)e);
            }
        }
        this.connectors.clear();
        log.info("Canal\u5ba2\u6237\u7aef\u5df2\u5168\u90e8\u505c\u6b62");
    }
}

