/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.share;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.common.runtime.Serializer;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.ShareCoordinatorShard;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;

public class ShareCoordinatorService
implements ShareCoordinator {
    private final ShareCoordinatorConfig config;
    private final Logger log;
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime;
    private final ShareCoordinatorMetrics shareCoordinatorMetrics;
    private volatile int numPartitions = -1;
    private final Time time;
    private final Timer timer;
    private final PartitionWriter writer;
    private final Map<TopicPartition, Long> lastPrunedOffsets;
    private final Supplier<Boolean> shareGroupConfigEnabledSupplier;
    private volatile boolean shouldRunPeriodicJob;

    public ShareCoordinatorService(LogContext logContext, ShareCoordinatorConfig config, CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime, ShareCoordinatorMetrics shareCoordinatorMetrics, Time time, Timer timer, PartitionWriter writer, Supplier<Boolean> shareGroupConfigEnabledSupplier) {
        this.log = logContext.logger(ShareCoordinatorService.class);
        this.config = config;
        this.runtime = runtime;
        this.shareCoordinatorMetrics = shareCoordinatorMetrics;
        this.time = time;
        this.timer = timer;
        this.writer = writer;
        this.lastPrunedOffsets = new ConcurrentHashMap<TopicPartition, Long>();
        this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier;
    }

    @Override
    public int partitionFor(SharePartitionKey key) {
        this.throwIfNotActive();
        return Utils.abs((int)key.asCoordinatorKey().hashCode()) % this.numPartitions;
    }

    @Override
    public Properties shareGroupStateTopicConfigs() {
        Properties properties = new Properties();
        properties.put("cleanup.policy", "delete");
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put("segment.bytes", (Object)this.config.shareCoordinatorStateTopicSegmentBytes());
        properties.put("min.insync.replicas", (Object)this.config.shareCoordinatorStateTopicMinIsr());
        properties.put("retention.ms", (Object)-1);
        return properties;
    }

    @Override
    public void startup(IntSupplier shareGroupTopicPartitionCount) {
        if (!this.isActive.compareAndSet(false, true)) {
            this.log.warn("Share coordinator is already running.");
            return;
        }
        this.log.info("Starting up.");
        this.numPartitions = shareGroupTopicPartitionCount.getAsInt();
        this.log.info("Startup complete.");
    }

    private void setupPeriodicJobs() {
        this.setupRecordPruning();
        this.setupSnapshotColdPartitions();
    }

    void setupRecordPruning() {
        this.log.debug("Scheduling share-group state topic prune job.");
        this.timer.add(new TimerTask(this.config.shareCoordinatorTopicPruneIntervalMs()){

            public void run() {
                if (!ShareCoordinatorService.this.shouldRunPeriodicJob) {
                    return;
                }
                ArrayList futures = new ArrayList();
                ShareCoordinatorService.this.runtime.activeTopicPartitions().forEach(tp -> futures.add(ShareCoordinatorService.this.performRecordPruning((TopicPartition)tp)));
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((res, exp) -> {
                    if (exp != null) {
                        ShareCoordinatorService.this.log.error("Received error in share-group state topic prune.", exp);
                    }
                    ShareCoordinatorService.this.setupRecordPruning();
                });
            }
        });
    }

    private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
        CompletableFuture<Void> fut = new CompletableFuture<Void>();
        this.runtime.scheduleWriteOperation("write-state-record-prune", tp, Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), ShareCoordinatorShard::lastRedundantOffset).whenComplete((result, exception) -> {
            if (exception != null) {
                this.log.debug("Last redundant offset for tp {} lookup threw an error.", (Object)tp, exception);
                Errors error = Errors.forException((Throwable)exception);
                if (!error.equals((Object)Errors.COORDINATOR_LOAD_IN_PROGRESS) && !error.equals((Object)Errors.NOT_COORDINATOR)) {
                    this.log.error("Last redundant offset lookup for tp {} threw an error.", (Object)tp, exception);
                    fut.completeExceptionally((Throwable)exception);
                    return;
                }
                fut.complete(null);
                return;
            }
            if (result.isPresent()) {
                Long off = (Long)result.get();
                Long lastPrunedOffset = this.lastPrunedOffsets.get(tp);
                if (lastPrunedOffset != null && lastPrunedOffset.longValue() == off.longValue()) {
                    this.log.debug("{} already pruned till offset {}", (Object)tp, (Object)off);
                    fut.complete(null);
                    return;
                }
                this.log.debug("Pruning records in {} till offset {}.", (Object)tp, (Object)off);
                this.writer.deleteRecords(tp, off.longValue()).whenComplete((res, exp) -> {
                    if (exp != null) {
                        this.log.error("Exception while deleting records in {} till offset {}.", new Object[]{tp, off, exp});
                        fut.completeExceptionally((Throwable)exp);
                        return;
                    }
                    this.shareCoordinatorMetrics.recordPrune(off.longValue(), tp);
                    fut.complete(null);
                    this.lastPrunedOffsets.put(tp, off);
                });
            } else {
                this.log.debug("No offset value for tp {} found.", (Object)tp);
                fut.complete(null);
            }
        });
        return fut;
    }

    void setupSnapshotColdPartitions() {
        this.log.debug("Scheduling cold share-partition snapshotting.");
        this.timer.add(new TimerTask(this.config.shareCoordinatorColdPartitionSnapshotIntervalMs()){

            public void run() {
                if (!ShareCoordinatorService.this.shouldRunPeriodicJob) {
                    return;
                }
                List futures = ShareCoordinatorService.this.runtime.scheduleWriteAllOperation("snapshot-cold-partitions", Duration.ofMillis(ShareCoordinatorService.this.config.shareCoordinatorWriteTimeoutMs()), ShareCoordinatorShard::snapshotColdPartitions);
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((__, exp) -> {
                    if (exp != null) {
                        ShareCoordinatorService.this.log.error("Received error while snapshotting cold partitions.", exp);
                    }
                    ShareCoordinatorService.this.setupSnapshotColdPartitions();
                });
            }
        });
    }

    @Override
    public void shutdown() {
        if (!this.isActive.compareAndSet(true, false)) {
            this.log.warn("Share coordinator is already shutting down.");
            return;
        }
        this.log.info("Shutting down.");
        Utils.closeQuietly(this.runtime, (String)"coordinator runtime");
        Utils.closeQuietly((AutoCloseable)this.shareCoordinatorMetrics, (String)"share coordinator metrics");
        this.log.info("Shutdown complete.");
    }

    @Override
    public CompletableFuture<WriteShareGroupStateResponseData> writeState(RequestContext context, WriteShareGroupStateRequestData request) {
        if (ShareCoordinatorService.isEmpty(request.topics())) {
            this.log.error("Topic Data is empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new WriteShareGroupStateResponseData());
        }
        for (WriteShareGroupStateRequestData.WriteStateData topicData2 : request.topics()) {
            if (!ShareCoordinatorService.isEmpty(topicData2.partitions())) continue;
            this.log.error("Partition Data for topic {} is empty: {}", (Object)topicData2.topicId(), (Object)request);
            return CompletableFuture.completedFuture(new WriteShareGroupStateResponseData());
        }
        String groupId = request.groupId();
        if (ShareCoordinatorService.isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new WriteShareGroupStateResponseData());
        }
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(this.generateErrorWriteStateResponse(request, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        HashMap futureMap = new HashMap();
        long startTimeMs = this.time.hiResClockMs();
        request.topics().forEach(topicData -> {
            Map partitionFut = futureMap.computeIfAbsent(topicData.topicId(), k -> new HashMap());
            topicData.partitions().forEach(partitionData -> {
                CompletionStage future = this.runtime.scheduleWriteOperation("write-share-group-state", this.topicPartitionFor(SharePartitionKey.getInstance((String)groupId, (Uuid)topicData.topicId(), (int)partitionData.partition())), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.writeState(new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicData.topicId()).setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData().setPartition(partitionData.partition()).setStartOffset(partitionData.startOffset()).setLeaderEpoch(partitionData.leaderEpoch()).setStateEpoch(partitionData.stateEpoch()).setStateBatches(partitionData.stateBatches()))))))).exceptionally(exception -> (WriteShareGroupStateResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"write-share-group-state", (Object)request, (Throwable)exception, (error, message) -> WriteShareGroupStateResponse.toErrorResponseData((Uuid)topicData.topicId(), (int)partitionData.partition(), (Errors)error, (String)("Unable to write share group state: " + exception.getMessage())), (Logger)this.log));
                partitionFut.put(partitionData.partition(), future);
            });
        });
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf((CompletableFuture[])futureMap.values().stream().flatMap(partMap -> partMap.values().stream()).toArray(CompletableFuture[]::new));
        return combinedFuture.thenApply(v -> {
            ArrayList writeStateResults = new ArrayList(futureMap.size());
            futureMap.forEach((topicId, topicEntry) -> {
                ArrayList partitionResults = new ArrayList(topicEntry.size());
                topicEntry.forEach((partitionId, responseFut) -> {
                    WriteShareGroupStateResponseData partitionData = responseFut.getNow(null);
                    partitionResults.addAll(((WriteShareGroupStateResponseData.WriteStateResult)partitionData.results().get(0)).partitions());
                });
                writeStateResults.add(WriteShareGroupStateResponse.toResponseWriteStateResult((Uuid)topicId, partitionResults));
            });
            this.shareCoordinatorMetrics.record("ShareCoordinatorWriteLatency", this.time.hiResClockMs() - startTimeMs);
            return new WriteShareGroupStateResponseData().setResults(writeStateResults);
        });
    }

    @Override
    public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestContext context, ReadShareGroupStateRequestData request) {
        String groupId = request.groupId();
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        if (ShareCoordinatorService.isEmpty(request.topics())) {
            this.log.error("Topic Data is empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new ReadShareGroupStateResponseData());
        }
        for (ReadShareGroupStateRequestData.ReadStateData topicData : request.topics()) {
            if (!ShareCoordinatorService.isEmpty(topicData.partitions())) continue;
            this.log.error("Partition Data for topic {} is empty: {}", (Object)topicData.topicId(), (Object)request);
            return CompletableFuture.completedFuture(new ReadShareGroupStateResponseData());
        }
        if (ShareCoordinatorService.isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new ReadShareGroupStateResponseData());
        }
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(this.generateErrorReadStateResponse(request, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        for (ReadShareGroupStateRequestData.ReadStateData topicData : request.topics()) {
            Uuid topicId = topicData.topicId();
            for (ReadShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) {
                SharePartitionKey coordinatorKey = SharePartitionKey.getInstance((String)request.groupId(), (Uuid)topicId, (int)partitionData.partition());
                ReadShareGroupStateRequestData requestForCurrentPartition = new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId).setPartitions(List.of(partitionData))));
                CompletionStage readFuture = this.runtime.scheduleWriteOperation("read-update-leader-epoch-state", this.topicPartitionFor(coordinatorKey), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.readStateAndMaybeUpdateLeaderEpoch(requestForCurrentPartition)).exceptionally(readException -> (ReadShareGroupStateResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"read-update-leader-epoch-state", (Object)request, (Throwable)readException, (error, message) -> ReadShareGroupStateResponse.toErrorResponseData((Uuid)topicData.topicId(), (int)partitionData.partition(), (Errors)error, (String)("Unable to read share group state: " + readException.getMessage())), (Logger)this.log));
                futureMap.computeIfAbsent(topicId, k -> new HashMap()).put(partitionData.partition(), readFuture);
            }
        }
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf((CompletableFuture[])futureMap.values().stream().flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
        return combinedFuture.thenApply(v -> {
            ArrayList readStateResult = new ArrayList(futureMap.size());
            futureMap.forEach((topicId, topicEntry) -> {
                ArrayList partitionResults = new ArrayList(topicEntry.size());
                topicEntry.forEach((partitionId, responseFut) -> partitionResults.add((ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)((ReadShareGroupStateResponseData)responseFut.getNow(null)).results().get(0)).partitions().get(0)));
                readStateResult.add(ReadShareGroupStateResponse.toResponseReadStateResult((Uuid)topicId, partitionResults));
            });
            return new ReadShareGroupStateResponseData().setResults(readStateResult);
        });
    }

    @Override
    public CompletableFuture<ReadShareGroupStateSummaryResponseData> readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(this.generateErrorReadStateSummaryResponse(request, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        String groupId = request.groupId();
        if (ShareCoordinatorService.isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData());
        }
        if (ShareCoordinatorService.isEmpty(request.topics())) {
            this.log.error("Topic Data is empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData());
        }
        for (Object topicData : request.topics()) {
            if (!ShareCoordinatorService.isEmpty(topicData.partitions())) continue;
            this.log.error("Partition Data for topic {} is empty: {}", (Object)topicData.topicId(), (Object)request);
            return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResponseData());
        }
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) {
            Uuid topicId = topicData.topicId();
            for (ReadShareGroupStateSummaryRequestData.PartitionData partitionData : topicData.partitions()) {
                SharePartitionKey coordinatorKey = SharePartitionKey.getInstance((String)request.groupId(), (Uuid)topicId, (int)partitionData.partition());
                ReadShareGroupStateSummaryRequestData requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData().setGroupId(groupId).setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(List.of(partitionData))));
                CompletionStage readFuture = this.runtime.scheduleWriteOperation("read-share-group-state-summary", this.topicPartitionFor(coordinatorKey), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.readStateSummary(requestForCurrentPartition)).exceptionally(readException -> (ReadShareGroupStateSummaryResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"read-share-group-state-summary", (Object)request, (Throwable)readException, (error, message) -> ReadShareGroupStateSummaryResponse.toErrorResponseData((Uuid)topicData.topicId(), (int)partitionData.partition(), (Errors)error, (String)("Unable to read share group state summary: " + readException.getMessage())), (Logger)this.log));
                futureMap.computeIfAbsent(topicId, k -> new HashMap()).put(partitionData.partition(), readFuture);
            }
        }
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf((CompletableFuture[])futureMap.values().stream().flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
        return combinedFuture.thenApply(v -> {
            ArrayList readStateSummaryResult = new ArrayList(futureMap.size());
            futureMap.forEach((topicId, topicEntry) -> {
                ArrayList partitionResults = new ArrayList(topicEntry.size());
                topicEntry.forEach((partitionId, responseFut) -> partitionResults.add((ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)((ReadShareGroupStateSummaryResponseData)responseFut.getNow(null)).results().get(0)).partitions().get(0)));
                readStateSummaryResult.add(ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult((Uuid)topicId, partitionResults));
            });
            return new ReadShareGroupStateSummaryResponseData().setResults(readStateSummaryResult);
        });
    }

    @Override
    public CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestContext context, DeleteShareGroupStateRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(this.generateErrorDeleteStateResponse(request, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        String groupId = request.groupId();
        if (ShareCoordinatorService.isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new DeleteShareGroupStateResponseData());
        }
        if (ShareCoordinatorService.isEmpty(request.topics())) {
            this.log.error("Topic Data is empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new DeleteShareGroupStateResponseData());
        }
        for (Object topicData : request.topics()) {
            if (!ShareCoordinatorService.isEmpty(topicData.partitions())) continue;
            this.log.error("Partition Data for topic {} is empty: {}", (Object)topicData.topicId(), (Object)request);
            return CompletableFuture.completedFuture(new DeleteShareGroupStateResponseData());
        }
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        for (DeleteShareGroupStateRequestData.DeleteStateData topicData : request.topics()) {
            Uuid topicId = topicData.topicId();
            for (DeleteShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) {
                SharePartitionKey coordinatorKey = SharePartitionKey.getInstance((String)request.groupId(), (Uuid)topicId, (int)partitionData.partition());
                DeleteShareGroupStateRequestData requestForCurrentPartition = new DeleteShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(List.of(partitionData))));
                CompletionStage deleteFuture = this.runtime.scheduleWriteOperation("delete-share-group-state", this.topicPartitionFor(coordinatorKey), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.deleteState(requestForCurrentPartition)).exceptionally(deleteException -> (DeleteShareGroupStateResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"delete-share-group-state", (Object)request, (Throwable)deleteException, (error, message) -> DeleteShareGroupStateResponse.toErrorResponseData((Uuid)topicData.topicId(), (int)partitionData.partition(), (Errors)error, (String)("Unable to delete share group state: " + deleteException.getMessage())), (Logger)this.log));
                futureMap.computeIfAbsent(topicId, k -> new HashMap()).put(partitionData.partition(), deleteFuture);
            }
        }
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf((CompletableFuture[])futureMap.values().stream().flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
        return combinedFuture.thenApply(v -> {
            ArrayList deleteStateResult = new ArrayList(futureMap.size());
            futureMap.forEach((topicId, topicEntry) -> {
                ArrayList partitionResults = new ArrayList(topicEntry.size());
                topicEntry.forEach((partitionId, responseFut) -> partitionResults.add((DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)((DeleteShareGroupStateResponseData)responseFut.getNow(null)).results().get(0)).partitions().get(0)));
                deleteStateResult.add(DeleteShareGroupStateResponse.toResponseDeleteStateResult((Uuid)topicId, partitionResults));
            });
            return new DeleteShareGroupStateResponseData().setResults(deleteStateResult);
        });
    }

    @Override
    public CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(this.generateErrorInitStateResponse(request, Errors.COORDINATOR_NOT_AVAILABLE, "Share coordinator is not available."));
        }
        String groupId = request.groupId();
        if (ShareCoordinatorService.isGroupIdEmpty(groupId)) {
            this.log.error("Group id must be specified and non-empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new InitializeShareGroupStateResponseData());
        }
        if (ShareCoordinatorService.isEmpty(request.topics())) {
            this.log.error("Topic Data is empty: {}", (Object)request);
            return CompletableFuture.completedFuture(new InitializeShareGroupStateResponseData());
        }
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        for (InitializeShareGroupStateRequestData.InitializeStateData topicData : request.topics()) {
            Uuid topicId = topicData.topicId();
            for (InitializeShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) {
                SharePartitionKey coordinatorKey = SharePartitionKey.getInstance((String)request.groupId(), (Uuid)topicId, (int)partitionData.partition());
                InitializeShareGroupStateRequestData requestForCurrentPartition = new InitializeShareGroupStateRequestData().setGroupId(groupId).setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData().setTopicId(topicId).setPartitions(List.of(partitionData))));
                CompletionStage initializeFuture = this.runtime.scheduleWriteOperation("initialize-share-group-state", this.topicPartitionFor(coordinatorKey), Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.initializeState(requestForCurrentPartition)).exceptionally(initializeException -> (InitializeShareGroupStateResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"initialize-share-group-state", (Object)request, (Throwable)initializeException, (error, message) -> InitializeShareGroupStateResponse.toErrorResponseData((Uuid)topicData.topicId(), (int)partitionData.partition(), (Errors)error, (String)("Unable to initialize share group state: " + initializeException.getMessage())), (Logger)this.log));
                futureMap.computeIfAbsent(topicId, k -> new HashMap()).put(partitionData.partition(), initializeFuture);
            }
        }
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf((CompletableFuture[])futureMap.values().stream().flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
        return combinedFuture.thenApply(v -> {
            ArrayList initializeStateResult = new ArrayList(futureMap.size());
            futureMap.forEach((topicId, topicEntry) -> {
                ArrayList partitionResults = new ArrayList(topicEntry.size());
                topicEntry.forEach((partitionId, responseFut) -> partitionResults.add((InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)((InitializeShareGroupStateResponseData)responseFut.getNow(null)).results().get(0)).partitions().get(0)));
                initializeStateResult.add(InitializeShareGroupStateResponse.toResponseInitializeStateResult((Uuid)topicId, partitionResults));
            });
            return new InitializeShareGroupStateResponseData().setResults(initializeStateResult);
        });
    }

    private ReadShareGroupStateResponseData generateErrorReadStateResponse(ReadShareGroupStateRequestData request, Errors error, String errorMessage) {
        return new ReadShareGroupStateResponseData().setResults(request.topics().stream().map(topicData -> {
            ReadShareGroupStateResponseData.ReadStateResult resultData = new ReadShareGroupStateResponseData.ReadStateResult();
            resultData.setTopicId(topicData.topicId());
            resultData.setPartitions(topicData.partitions().stream().map(partitionData -> ReadShareGroupStateResponse.toErrorResponsePartitionResult((int)partitionData.partition(), (Errors)error, (String)errorMessage)).toList());
            return resultData;
        }).toList());
    }

    private ReadShareGroupStateSummaryResponseData generateErrorReadStateSummaryResponse(ReadShareGroupStateSummaryRequestData request, Errors error, String errorMessage) {
        return new ReadShareGroupStateSummaryResponseData().setResults(request.topics().stream().map(topicData -> {
            ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult resultData = new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult();
            resultData.setTopicId(topicData.topicId());
            resultData.setPartitions(topicData.partitions().stream().map(partitionData -> ReadShareGroupStateSummaryResponse.toErrorResponsePartitionResult((int)partitionData.partition(), (Errors)error, (String)errorMessage)).toList());
            return resultData;
        }).toList());
    }

    private WriteShareGroupStateResponseData generateErrorWriteStateResponse(WriteShareGroupStateRequestData request, Errors error, String errorMessage) {
        return new WriteShareGroupStateResponseData().setResults(request.topics().stream().map(topicData -> {
            WriteShareGroupStateResponseData.WriteStateResult resultData = new WriteShareGroupStateResponseData.WriteStateResult();
            resultData.setTopicId(topicData.topicId());
            resultData.setPartitions(topicData.partitions().stream().map(partitionData -> WriteShareGroupStateResponse.toErrorResponsePartitionResult((int)partitionData.partition(), (Errors)error, (String)errorMessage)).toList());
            return resultData;
        }).toList());
    }

    private DeleteShareGroupStateResponseData generateErrorDeleteStateResponse(DeleteShareGroupStateRequestData request, Errors error, String errorMessage) {
        return new DeleteShareGroupStateResponseData().setResults(request.topics().stream().map(topicData -> {
            DeleteShareGroupStateResponseData.DeleteStateResult resultData = new DeleteShareGroupStateResponseData.DeleteStateResult();
            resultData.setTopicId(topicData.topicId());
            resultData.setPartitions(topicData.partitions().stream().map(partitionData -> DeleteShareGroupStateResponse.toErrorResponsePartitionResult((int)partitionData.partition(), (Errors)error, (String)errorMessage)).toList());
            return resultData;
        }).toList());
    }

    private InitializeShareGroupStateResponseData generateErrorInitStateResponse(InitializeShareGroupStateRequestData request, Errors error, String errorMessage) {
        return new InitializeShareGroupStateResponseData().setResults(request.topics().stream().map(topicData -> {
            InitializeShareGroupStateResponseData.InitializeStateResult resultData = new InitializeShareGroupStateResponseData.InitializeStateResult();
            resultData.setTopicId(topicData.topicId());
            resultData.setPartitions(topicData.partitions().stream().map(partitionData -> InitializeShareGroupStateResponse.toErrorResponsePartitionResult((int)partitionData.partition(), (Errors)error, (String)errorMessage)).toList());
            return resultData;
        }).toList());
    }

    private static boolean isGroupIdEmpty(String groupId) {
        return groupId == null || groupId.isEmpty();
    }

    @Override
    public void onElection(int partitionIndex, int partitionLeaderEpoch) {
        this.throwIfNotActive();
        this.runtime.scheduleLoadOperation(new TopicPartition("__share_group_state", partitionIndex), partitionLeaderEpoch);
    }

    @Override
    public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) {
        this.throwIfNotActive();
        TopicPartition tp = new TopicPartition("__share_group_state", partitionIndex);
        this.lastPrunedOffsets.remove(tp);
        this.runtime.scheduleUnloadOperation(tp, partitionLeaderEpoch);
    }

    @Override
    public void onTopicsDeleted(Set<Uuid> deletedTopicIds, BufferSupplier bufferSupplier) throws ExecutionException, InterruptedException {
        this.throwIfNotActive();
        if (deletedTopicIds.isEmpty()) {
            return;
        }
        CompletableFuture.allOf(FutureUtils.mapExceptionally((List)this.runtime.scheduleWriteAllOperation("on-topics-deleted", Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs()), coordinator -> coordinator.maybeCleanupShareState(deletedTopicIds)), exception -> {
            this.log.error("Received error while trying to cleanup deleted topics.", exception);
            return null;
        }).toArray(new CompletableFuture[0])).get();
    }

    @Override
    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
        this.throwIfNotActive();
        this.runtime.onNewMetadataImage(newImage, delta);
        boolean enabled = this.isShareGroupsEnabled(newImage);
        if (enabled ^ this.shouldRunPeriodicJob) {
            this.shouldRunPeriodicJob = enabled;
            if (enabled) {
                this.setupPeriodicJobs();
            }
        }
    }

    TopicPartition topicPartitionFor(SharePartitionKey key) {
        return new TopicPartition("__share_group_state", this.partitionFor(key));
    }

    private static <P> boolean isEmpty(List<P> list) {
        return list == null || list.isEmpty();
    }

    private void throwIfNotActive() {
        if (!this.isActive.get()) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
    }

    private boolean isShareGroupsEnabled(MetadataImage image) {
        return this.shareGroupConfigEnabledSupplier.get() != false || ShareVersion.fromFeatureLevel((short)image.features().finalizedVersions().getOrDefault("share.version", (short)0)).supportsShareGroups();
    }

    boolean shouldRunPeriodicJob() {
        return this.shouldRunPeriodicJob;
    }

    public static class Builder {
        private final int nodeId;
        private final ShareCoordinatorConfig config;
        private PartitionWriter writer;
        private CoordinatorLoader<CoordinatorRecord> loader;
        private Time time;
        private Timer timer;
        private Supplier<Boolean> shareGroupConfigEnabledSupplier;
        private ShareCoordinatorMetrics coordinatorMetrics;
        private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;

        public Builder(int nodeId, ShareCoordinatorConfig config) {
            this.nodeId = nodeId;
            this.config = config;
        }

        public Builder withWriter(PartitionWriter writer) {
            this.writer = writer;
            return this;
        }

        public Builder withLoader(CoordinatorLoader<CoordinatorRecord> loader) {
            this.loader = loader;
            return this;
        }

        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder withCoordinatorMetrics(ShareCoordinatorMetrics coordinatorMetrics) {
            this.coordinatorMetrics = coordinatorMetrics;
            return this;
        }

        public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
            this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
            return this;
        }

        public Builder withShareGroupEnabledConfigSupplier(Supplier<Boolean> shareGroupConfigEnabledSupplier) {
            this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier;
            return this;
        }

        public ShareCoordinatorService build() {
            if (this.config == null) {
                throw new IllegalArgumentException("Config must be set.");
            }
            if (this.writer == null) {
                throw new IllegalArgumentException("Writer must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.coordinatorMetrics == null) {
                throw new IllegalArgumentException("Share Coordinator metrics must be set.");
            }
            if (this.coordinatorRuntimeMetrics == null) {
                throw new IllegalArgumentException("Coordinator runtime metrics must be set.");
            }
            if (this.shareGroupConfigEnabledSupplier == null) {
                throw new IllegalArgumentException("Share group enabled config enabled supplier must be set.");
            }
            String logPrefix = String.format("ShareCoordinator id=%d", this.nodeId);
            LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
            CoordinatorShardBuilderSupplier supplier = () -> new ShareCoordinatorShard.Builder(this.config);
            MultiThreadedEventProcessor processor = new MultiThreadedEventProcessor(logContext, "share-coordinator-event-processor-", this.config.shareCoordinatorNumThreads(), this.time, this.coordinatorRuntimeMetrics);
            CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime(this.time).withTimer(this.timer).withLogPrefix(logPrefix).withLogContext(logContext).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter(this.writer).withLoader(this.loader).withCoordinatorShardBuilderSupplier(supplier).withTime(this.time).withDefaultWriteTimeOut(Duration.ofMillis(this.config.shareCoordinatorWriteTimeoutMs())).withCoordinatorRuntimeMetrics(this.coordinatorRuntimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)this.coordinatorMetrics).withSerializer((Serializer)new ShareCoordinatorRecordSerde()).withCompression(Compression.of((CompressionType)this.config.shareCoordinatorStateTopicCompressionType()).build()).withAppendLingerMs(this.config.shareCoordinatorAppendLingerMs()).withExecutorService(Executors.newSingleThreadExecutor()).build();
            return new ShareCoordinatorService(logContext, this.config, (CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>)runtime, this.coordinatorMetrics, this.time, this.timer, this.writer, this.shareGroupConfigEnabledSupplier);
        }
    }
}

