/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemorySessionStore
implements SessionStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);
    private final String name;
    private final String metricScope;
    private Sensor expiredRecordSensor;
    private InternalProcessorContext<?, ?> context;
    private long observedStreamTime = -1L;
    private final long retentionPeriod;
    private static final String INVALID_RANGE_WARN_MSG = "Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers";
    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> endTimeMap = new ConcurrentSkipListMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>();
    private final Set<InMemorySessionStoreIterator> openIterators = ConcurrentHashMap.newKeySet();
    private volatile boolean open = false;
    private StateStoreContext stateStoreContext;
    private final Position position;

    InMemorySessionStore(String name, long retentionPeriod, String metricScope) {
        this.name = name;
        this.retentionPeriod = retentionPeriod;
        this.metricScope = metricScope;
        this.position = Position.emptyPosition();
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(StateStoreContext stateStoreContext, StateStore root) {
        this.stateStoreContext = stateStoreContext;
        String threadId = Thread.currentThread().getName();
        String taskName = stateStoreContext.taskId().toString();
        if (stateStoreContext instanceof InternalProcessorContext) {
            this.context = (InternalProcessorContext)stateStoreContext;
            StreamsMetricsImpl metrics = this.context.metrics();
            this.expiredRecordSensor = TaskMetrics.droppedRecordsSensor(threadId, taskName, metrics);
        } else {
            this.context = null;
            this.expiredRecordSensor = null;
        }
        if (root != null) {
            boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(stateStoreContext.appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
            stateStoreContext.register(root, records -> {
                Position position = this.position;
                synchronized (position) {
                    for (ConsumerRecord record : records) {
                        this.put(SessionKeySchema.from(Bytes.wrap((byte[])((byte[])record.key()))), (byte[])record.value());
                        ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition((ConsumerRecord<byte[], byte[]>)record, consistencyEnabled, this.position);
                    }
                }
            });
        }
        this.open = true;
    }

    @Override
    public Position getPosition() {
        return this.position;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void put(Windowed<Bytes> sessionKey, byte[] aggregate) {
        this.removeExpiredSegments();
        long windowEndTimestamp = sessionKey.window().end();
        this.observedStreamTime = Math.max(this.observedStreamTime, windowEndTimestamp);
        Position position = this.position;
        synchronized (position) {
            if (windowEndTimestamp <= this.observedStreamTime - this.retentionPeriod) {
                if (this.expiredRecordSensor != null && this.context != null) {
                    this.expiredRecordSensor.record(1.0, this.context.currentSystemTimeMs());
                }
                LOG.warn("Skipping record for expired segment.");
            } else if (aggregate != null) {
                this.endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap());
                ConcurrentNavigableMap keyMap = (ConcurrentNavigableMap)this.endTimeMap.get(windowEndTimestamp);
                keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap());
                ((ConcurrentNavigableMap)keyMap.get(sessionKey.key())).put(sessionKey.window().start(), aggregate);
            } else {
                this.remove(sessionKey);
            }
            StoreQueryUtils.updatePosition(this.position, this.stateStoreContext);
        }
    }

    @Override
    public void remove(Windowed<Bytes> sessionKey) {
        ConcurrentNavigableMap keyMap = (ConcurrentNavigableMap)this.endTimeMap.get(sessionKey.window().end());
        if (keyMap == null) {
            return;
        }
        ConcurrentNavigableMap startTimeMap = (ConcurrentNavigableMap)keyMap.get(sessionKey.key());
        if (startTimeMap == null) {
            return;
        }
        startTimeMap.remove(sessionKey.window().start());
        if (startTimeMap.isEmpty()) {
            keyMap.remove(sessionKey.key());
            if (keyMap.isEmpty()) {
                this.endTimeMap.remove(sessionKey.window().end());
            }
        }
    }

    @Override
    public byte[] fetchSession(Bytes key, long sessionStartTime, long sessionEndTime) {
        ConcurrentNavigableMap startTimeMap;
        ConcurrentNavigableMap keyMap;
        this.removeExpiredSegments();
        Objects.requireNonNull(key, "key cannot be null");
        if (sessionEndTime > this.observedStreamTime - this.retentionPeriod && (keyMap = (ConcurrentNavigableMap)this.endTimeMap.get(sessionEndTime)) != null && (startTimeMap = (ConcurrentNavigableMap)keyMap.get(key)) != null) {
            return (byte[])startTimeMap.get(sessionStartTime);
        }
        return null;
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(long earliestSessionEndTime, long latestSessionEndTime) {
        this.removeExpiredSegments();
        NavigableMap endTimSubMap = this.endTimeMap.subMap((Object)earliestSessionEndTime, true, (Object)latestSessionEndTime, true);
        return this.registerNewIterator(null, null, Long.MAX_VALUE, endTimSubMap.entrySet().iterator(), true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes key, long earliestSessionEndTime, long latestSessionStartTime) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(key, key, latestSessionStartTime, this.endTimeMap.tailMap((Object)earliestSessionEndTime, true).entrySet().iterator(), true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(Bytes key, long earliestSessionEndTime, long latestSessionStartTime) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(key, key, latestSessionStartTime, this.endTimeMap.tailMap((Object)earliestSessionEndTime, true).descendingMap().entrySet().iterator(), false);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes keyFrom, Bytes keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
        this.removeExpiredSegments();
        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
            LOG.warn(INVALID_RANGE_WARN_MSG);
            return KeyValueIterators.emptyIterator();
        }
        return this.registerNewIterator(keyFrom, keyTo, latestSessionStartTime, this.endTimeMap.tailMap((Object)earliestSessionEndTime, true).entrySet().iterator(), true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(Bytes keyFrom, Bytes keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
        this.removeExpiredSegments();
        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
            LOG.warn(INVALID_RANGE_WARN_MSG);
            return KeyValueIterators.emptyIterator();
        }
        return this.registerNewIterator(keyFrom, keyTo, latestSessionStartTime, this.endTimeMap.tailMap((Object)earliestSessionEndTime, true).descendingMap().entrySet().iterator(), false);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(key, key, Long.MAX_VALUE, this.endTimeMap.entrySet().iterator(), true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(key, key, Long.MAX_VALUE, this.endTimeMap.descendingMap().entrySet().iterator(), false);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes keyFrom, Bytes keyTo) {
        this.removeExpiredSegments();
        return this.registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, this.endTimeMap.entrySet().iterator(), true);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes keyFrom, Bytes keyTo) {
        this.removeExpiredSegments();
        return this.registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, this.endTimeMap.descendingMap().entrySet().iterator(), false);
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig config) {
        return StoreQueryUtils.handleBasicQueries(query, positionBound, config, this, this.position, this.context);
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
        if (this.openIterators.size() != 0) {
            LOG.warn("Closing {} open iterators for store {}", (Object)this.openIterators.size(), (Object)this.name);
            for (InMemorySessionStoreIterator it : this.openIterators) {
                it.close();
            }
        }
        this.endTimeMap.clear();
        this.openIterators.clear();
        this.open = false;
    }

    private void removeExpiredSegments() {
        long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod + 1L);
        for (InMemorySessionStoreIterator it : this.openIterators) {
            minLiveTime = Math.min(minLiveTime, it.minTime());
        }
        this.endTimeMap.headMap((Object)minLiveTime, false).clear();
    }

    private InMemorySessionStoreIterator registerNewIterator(Bytes keyFrom, Bytes keyTo, long latestSessionStartTime, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator, boolean forward) {
        InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, this.openIterators::remove, forward);
        this.openIterators.add(iterator);
        return iterator;
    }

    private static class InMemorySessionStoreIterator
    implements KeyValueIterator<Windowed<Bytes>, byte[]> {
        private final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator;
        private Iterator<Map.Entry<Bytes, ConcurrentNavigableMap<Long, byte[]>>> keyIterator;
        private Iterator<Map.Entry<Long, byte[]>> recordIterator;
        private KeyValue<Windowed<Bytes>, byte[]> next;
        private Bytes currentKey;
        private long currentEndTime;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final long latestSessionStartTime;
        private final ClosingCallback callback;
        private final boolean forward;

        InMemorySessionStoreIterator(Bytes keyFrom, Bytes keyTo, long latestSessionStartTime, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator, ClosingCallback callback, boolean forward) {
            this.keyFrom = keyFrom;
            this.keyTo = keyTo;
            this.latestSessionStartTime = latestSessionStartTime;
            this.endTimeIterator = endTimeIterator;
            this.callback = callback;
            this.forward = forward;
            this.setAllIterators();
        }

        @Override
        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            if (this.recordIterator == null) {
                return false;
            }
            this.next = this.getNext();
            return this.next != null;
        }

        @Override
        public Windowed<Bytes> peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return (Windowed)this.next.key;
        }

        @Override
        public KeyValue<Windowed<Bytes>, byte[]> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Windowed<Bytes>, byte[]> ret = this.next;
            this.next = null;
            return ret;
        }

        @Override
        public void close() {
            this.next = null;
            this.recordIterator = null;
            this.callback.deregisterIterator(this);
        }

        Long minTime() {
            return this.currentEndTime;
        }

        private KeyValue<Windowed<Bytes>, byte[]> getNext() {
            if (!this.recordIterator.hasNext()) {
                this.getNextIterators();
            }
            if (this.recordIterator == null) {
                return null;
            }
            Map.Entry<Long, byte[]> nextRecord = this.recordIterator.next();
            SessionWindow sessionWindow = new SessionWindow(nextRecord.getKey(), this.currentEndTime);
            Windowed<Bytes> windowedKey = new Windowed<Bytes>(this.currentKey, sessionWindow);
            return new KeyValue<Windowed<Bytes>, byte[]>(windowedKey, nextRecord.getValue());
        }

        private void setAllIterators() {
            while (this.endTimeIterator.hasNext()) {
                Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> nextEndTimeEntry = this.endTimeIterator.next();
                this.currentEndTime = nextEndTimeEntry.getKey();
                NavigableMap<Object, ConcurrentNavigableMap<Long, Object>> subKVMap = this.keyFrom == null && this.keyTo == null ? nextEndTimeEntry.getValue() : (this.keyFrom == null ? nextEndTimeEntry.getValue().headMap((Object)this.keyTo, true) : (this.keyTo == null ? nextEndTimeEntry.getValue().tailMap((Object)this.keyFrom, true) : nextEndTimeEntry.getValue().subMap((Object)this.keyFrom, true, (Object)this.keyTo, true)));
                this.keyIterator = this.forward ? subKVMap.entrySet().iterator() : subKVMap.descendingMap().entrySet().iterator();
                if (!this.setInnerIterators()) continue;
                return;
            }
            this.recordIterator = null;
        }

        private boolean setInnerIterators() {
            while (this.keyIterator.hasNext()) {
                Map.Entry<Bytes, ConcurrentNavigableMap<Long, byte[]>> nextKeyEntry = this.keyIterator.next();
                this.currentKey = nextKeyEntry.getKey();
                this.recordIterator = this.latestSessionStartTime == Long.MAX_VALUE ? (this.forward ? nextKeyEntry.getValue().descendingMap().entrySet().iterator() : nextKeyEntry.getValue().entrySet().iterator()) : (this.forward ? nextKeyEntry.getValue().headMap((Object)this.latestSessionStartTime, true).descendingMap().entrySet().iterator() : nextKeyEntry.getValue().headMap((Object)this.latestSessionStartTime, true).entrySet().iterator());
                if (!this.recordIterator.hasNext()) continue;
                return true;
            }
            return false;
        }

        private void getNextIterators() {
            if (this.setInnerIterators()) {
                return;
            }
            this.setAllIterators();
        }
    }

    static interface ClosingCallback {
        public void deregisterIterator(InMemorySessionStoreIterator var1);
    }
}

