package FieldFox;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Charsets;
import java.util.Arrays;
import java.util.List;
import net.sf.xenqtt.client.AsyncClientListener;
import net.sf.xenqtt.client.MqttClient;
import net.sf.xenqtt.client.PublishMessage;
import net.sf.xenqtt.client.Subscription;
import net.sf.xenqtt.message.ConnectReturnCode;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.inputs.MessageInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:FieldFox/AsyncMQTTClientListener.class */
public class AsyncMQTTClientListener implements AsyncClientListener {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncMQTTClientListener.class);
    private final MessageInput messageInput;
    private final List<Subscription> subscriptions;
    private final Meter incomingMessages;
    private final Meter incompleteMessages;
    private final Meter processedMessages;
    private final Buffer processBuffer;

    public AsyncMQTTClientListener(Buffer buffer, MessageInput messageInput, List<Subscription> list, MetricRegistry metricRegistry) {
        this.processBuffer = buffer;
        this.messageInput = messageInput;
        this.subscriptions = list;
        String uniqueReadableId = messageInput.getUniqueReadableId();
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(uniqueReadableId, "incomingMessages"));
        this.incompleteMessages = metricRegistry.meter(MetricRegistry.name(uniqueReadableId, "incompleteMessages"));
        this.processedMessages = metricRegistry.meter(MetricRegistry.name(uniqueReadableId, "processedMessages"));
    }

    @Override // net.sf.xenqtt.client.AsyncClientListener
    public void connected(MqttClient mqttClient, ConnectReturnCode connectReturnCode) {
        if (connectReturnCode != ConnectReturnCode.ACCEPTED) {
            LOG.error("MQTT client not connected! Reason: {}", connectReturnCode);
        } else {
            LOG.info("Connected MQTT client");
            mqttClient.subscribe(this.subscriptions);
        }
    }

    @Override // net.sf.xenqtt.client.AsyncClientListener
    public void subscribed(MqttClient mqttClient, Subscription[] subscriptionArr, Subscription[] subscriptionArr2, boolean z) {
        if (!z) {
            LOG.warn("Couldn't subscribe to all requested topics: {}", Arrays.toString(subscriptionArr));
        }
        LOG.info("Subscribed to topics: {}", Arrays.toString(subscriptionArr2));
    }

    @Override // net.sf.xenqtt.client.AsyncClientListener
    public void unsubscribed(MqttClient mqttClient, String[] strArr) {
        LOG.info("Unsubscribed from topics: {}", Arrays.toString(strArr));
    }

    @Override // net.sf.xenqtt.client.AsyncClientListener
    public void published(MqttClient mqttClient, PublishMessage publishMessage) {
        LOG.trace("Published message {}", publishMessage);
    }

    @Override // net.sf.xenqtt.client.MqttClientListener
    public void publishReceived(MqttClient mqttClient, PublishMessage publishMessage) {
        LOG.info("Received message: {}", publishMessage);
        this.incomingMessages.mark();
        if (publishMessage.isEmpty()) {
            LOG.debug("Received message is empty. Not processing.");
            this.incompleteMessages.mark();
            return;
        }
        if (publishMessage.isDuplicate()) {
            LOG.debug("Received message is a duplicate. Not processing.");
            this.incompleteMessages.mark();
            return;
        }
        try {
            Message message = new Message(new String(publishMessage.getPayload(), Charsets.UTF_8), "MQTT Broker", Tools.dateTimeFromDouble(publishMessage.getReceivedTimestamp() / 1000.0d));
            message.addField("_mqtt_topic", publishMessage.getTopic());
            message.addField("_mqtt_received_timestamp", Long.valueOf(publishMessage.getReceivedTimestamp()));
            LOG.debug("Parsed message successfully, message id: <{}>, complete: {}. Inserting into process buffer.", message.getId(), Boolean.valueOf(message.isComplete()));
            this.processBuffer.insertFailFast(message, this.messageInput);
            publishMessage.ack();
            this.processedMessages.mark();
            LOG.info("Parsed message successfully inserted into process buffer. Length {}", Long.valueOf(this.processBuffer.getUsage()));
        } catch (Exception e) {
            LOG.info("Unable to insert message into process buffer: ", (Throwable) e);
            this.incompleteMessages.mark();
        }
    }

    @Override // net.sf.xenqtt.client.MqttClientListener
    public void disconnected(MqttClient mqttClient, Throwable th, boolean z) {
        LOG.info("Disconnected MQTT client", th);
    }
}
