package alluxio.security.authentication;

import alluxio.core.client.runtime.com.google.common.util.concurrent.SettableFuture;
import alluxio.core.client.runtime.io.grpc.Status;
import alluxio.core.client.runtime.io.grpc.StatusRuntimeException;
import alluxio.core.client.runtime.io.grpc.stub.StreamObserver;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnauthenticatedException;
import alluxio.exception.status.UnavailableException;
import alluxio.exception.status.UnknownException;
import alluxio.grpc.SaslMessage;
import alluxio.util.LogUtils;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.sasl.SaslException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/security/authentication/SaslStreamClientDriver.class */
public class SaslStreamClientDriver implements StreamObserver<SaslMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(SaslStreamClientDriver.class);
    private StreamObserver<SaslMessage> mRequestObserver;
    private SaslHandshakeClientHandler mSaslHandshakeClientHandler;
    private SettableFuture<Boolean> mHandshakeFuture = SettableFuture.create();
    private AtomicBoolean mAuthenticated;
    private UUID mChannelId;
    private final long mGrpcAuthTimeoutMs;

    public SaslStreamClientDriver(SaslHandshakeClientHandler saslHandshakeClientHandler, AtomicBoolean atomicBoolean, UUID uuid, long j) {
        this.mSaslHandshakeClientHandler = saslHandshakeClientHandler;
        this.mChannelId = uuid;
        this.mGrpcAuthTimeoutMs = j;
        this.mAuthenticated = atomicBoolean;
    }

    public void setServerObserver(StreamObserver<SaslMessage> streamObserver) {
        this.mRequestObserver = streamObserver;
    }

    @Override // alluxio.core.client.runtime.io.grpc.stub.StreamObserver
    public void onNext(SaslMessage saslMessage) {
        try {
            LOG.debug("SaslClientDriver received message: {} for channel: {}", saslMessage, this.mChannelId);
            SaslMessage handleSaslMessage = this.mSaslHandshakeClientHandler.handleSaslMessage(saslMessage);
            if (handleSaslMessage != null) {
                this.mRequestObserver.onNext(handleSaslMessage);
            } else {
                this.mHandshakeFuture.set(true);
            }
        } catch (Exception e) {
            LOG.debug("Exception while handling SASL message: {} for channel: {}. Error: {}", saslMessage, this.mChannelId, e);
            this.mHandshakeFuture.setException(e);
            this.mRequestObserver.onError(e);
        }
    }

    @Override // alluxio.core.client.runtime.io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        LOG.warn("Received error on client driver for channel: {}. Error: {}", this.mChannelId, th);
        this.mHandshakeFuture.setException(th);
    }

    @Override // alluxio.core.client.runtime.io.grpc.stub.StreamObserver
    public void onCompleted() {
        LOG.debug("Client authentication closed by server for channel: {}", this.mChannelId);
        this.mAuthenticated.set(false);
    }

    public void start() throws AlluxioStatusException {
        try {
            LOG.debug("Starting SASL handshake for channel: {}", this.mChannelId);
            this.mRequestObserver.onNext(this.mSaslHandshakeClientHandler.getInitialMessage(this.mChannelId));
            this.mAuthenticated.set(this.mHandshakeFuture.get(this.mGrpcAuthTimeoutMs, TimeUnit.MILLISECONDS).booleanValue());
        } catch (SaslException e) {
            throw new UnauthenticatedException(e.getMessage(), e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new UnavailableException(e2.getMessage(), e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause() != null ? e3.getCause() : e3;
            if (cause == null || !(cause instanceof StatusRuntimeException)) {
                throw new UnknownException(cause.getMessage(), cause);
            }
            if (((StatusRuntimeException) cause).getStatus().getCode() != Status.Code.UNIMPLEMENTED) {
                throw AlluxioStatusException.fromStatusRuntimeException((StatusRuntimeException) cause);
            }
            throw new UnauthenticatedException("Authentication is disabled on target host.");
        } catch (TimeoutException e4) {
            throw new UnavailableException(e4);
        }
    }

    public void stop() {
        LOG.debug("Closing client driver for channel: {}", this.mChannelId);
        try {
            if (this.mAuthenticated.get()) {
                this.mRequestObserver.onCompleted();
            }
        } catch (Exception e) {
            LogUtils.warnWithException(LOG, "Failed stopping authentication session with server.", e);
        }
    }
}
