/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryKeyValueStore
implements KeyValueStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class);
    private final String name;
    private final NavigableMap<Bytes, byte[]> map = new TreeMap<Bytes, byte[]>();
    private final Position position = Position.emptyPosition();
    private volatile boolean open = false;
    private StateStoreContext context;

    public InMemoryKeyValueStore(String name) {
        this.name = name;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(StateStoreContext stateStoreContext, StateStore root) {
        if (root != null) {
            boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(stateStoreContext.appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
            this.open = true;
            stateStoreContext.register(root, records -> {
                Position position = this.position;
                synchronized (position) {
                    for (ConsumerRecord record : records) {
                        this.put(Bytes.wrap((byte[])((byte[])record.key())), (byte[])record.value());
                        ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition((ConsumerRecord<byte[], byte[]>)record, consistencyEnabled, this.position);
                    }
                }
            });
        }
        this.open = true;
        this.context = stateStoreContext;
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public Position getPosition() {
        return this.position;
    }

    @Override
    public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig config) {
        return StoreQueryUtils.handleBasicQueries(query, positionBound, config, this, this.position, this.context);
    }

    @Override
    public synchronized byte[] get(Bytes key) {
        return (byte[])this.map.get(key);
    }

    @Override
    public synchronized void put(Bytes key, byte[] value) {
        this.putInternal(key, value);
    }

    @Override
    public synchronized byte[] putIfAbsent(Bytes key, byte[] value) {
        byte[] originalValue = this.get(key);
        if (originalValue == null) {
            this.put(key, value);
        }
        return originalValue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putInternal(Bytes key, byte[] value) {
        Position position = this.position;
        synchronized (position) {
            if (value == null) {
                this.map.remove(key);
            } else {
                this.map.put(key, value);
            }
            StoreQueryUtils.updatePosition(this.position, this.context);
        }
    }

    @Override
    public synchronized void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        for (KeyValue<Bytes, byte[]> entry : entries) {
            this.putInternal((Bytes)entry.key, (byte[])entry.value);
        }
    }

    @Override
    public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
        Bytes from = Bytes.wrap((byte[])prefixKeySerializer.serialize(null, prefix));
        Bytes to = Bytes.increment((Bytes)from);
        return new InMemoryKeyValueIterator(this.map.subMap(from, true, to, false).keySet(), true);
    }

    @Override
    public synchronized byte[] delete(Bytes key) {
        return (byte[])this.map.remove(key);
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        return this.range(from, to, true);
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to) {
        return this.range(from, to, false);
    }

    private KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, boolean forward) {
        if (from == null && to == null) {
            return this.getKeyValueIterator(this.map.keySet(), forward);
        }
        if (from == null) {
            return this.getKeyValueIterator(this.map.headMap(to, true).keySet(), forward);
        }
        if (to == null) {
            return this.getKeyValueIterator(this.map.tailMap(from, true).keySet(), forward);
        }
        if (from.compareTo(to) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        return this.getKeyValueIterator(this.map.subMap(from, true, to, true).keySet(), forward);
    }

    private KeyValueIterator<Bytes, byte[]> getKeyValueIterator(Set<Bytes> rangeSet, boolean forward) {
        return new InMemoryKeyValueIterator(rangeSet, forward);
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        return this.range(null, null);
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> reverseAll() {
        return new InMemoryKeyValueIterator(this.map.keySet(), false);
    }

    @Override
    public long approximateNumEntries() {
        return this.map.size();
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
        this.map.clear();
        this.open = false;
    }

    private class InMemoryKeyValueIterator
    implements KeyValueIterator<Bytes, byte[]> {
        private final Iterator<Bytes> iter;
        private Bytes currentKey;
        private Boolean iteratorOpen = true;

        private InMemoryKeyValueIterator(Set<Bytes> keySet, boolean forward) {
            this.iter = forward ? new TreeSet<Bytes>(keySet).iterator() : new TreeSet<Bytes>(keySet).descendingIterator();
        }

        @Override
        public boolean hasNext() {
            if (!this.iteratorOpen.booleanValue()) {
                throw new IllegalStateException(String.format("Iterator for store %s has already been closed.", InMemoryKeyValueStore.this.name));
            }
            if (this.currentKey != null) {
                if (InMemoryKeyValueStore.this.map.containsKey(this.currentKey)) {
                    return true;
                }
                this.currentKey = null;
                return this.hasNext();
            }
            if (!this.iter.hasNext()) {
                return false;
            }
            this.currentKey = this.iter.next();
            return this.hasNext();
        }

        @Override
        public KeyValue<Bytes, byte[]> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Bytes, byte[]> ret = new KeyValue<Bytes, byte[]>(this.currentKey, (byte[])InMemoryKeyValueStore.this.map.get(this.currentKey));
            this.currentKey = null;
            return ret;
        }

        @Override
        public void close() {
            this.iteratorOpen = false;
        }

        @Override
        public Bytes peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.currentKey;
        }
    }
}

