/*
 * Decompiled with CFR 0.152.
 */
package azkaban.executor;

import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
import azkaban.db.SQLTransaction;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;

@Singleton
public class ExecutionFlowDao {
    private static final Logger logger = Logger.getLogger(ExecutionFlowDao.class);
    private final DatabaseOperator dbOperator;

    @Inject
    public ExecutionFlowDao(DatabaseOperator dbOperator) {
        this.dbOperator = dbOperator;
    }

    public synchronized void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
        String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)";
        long submitTime = System.currentTimeMillis();
        flow.setStatus(Status.PREPARING);
        SQLTransaction insertAndGetLastID = transOperator -> {
            transOperator.update("INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)", new Object[]{flow.getProjectId(), flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime});
            transOperator.getConnection().commit();
            return transOperator.getLastInsertId();
        };
        try {
            long id = (Long)this.dbOperator.transaction(insertAndGetLastID);
            logger.info((Object)("Flow given " + flow.getFlowId() + " given id " + id));
            flow.setExecutionId((int)id);
            this.updateExecutableFlow(flow);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error creating execution.", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow History", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId, skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow history", e);
        }
    }

    public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows() throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW, (ResultSetHandler)new FetchQueuedExecutableFlows(), new Object[0]);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId, status.getNumVal(), skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query("SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE end_time > ? AND status IN (?, ?, ?)", (ResultSetHandler)new FetchRecentlyFinishedFlows(), new Object[]{System.currentTimeMillis() - maxAge.toMillis(), Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(), Status.FAILED.getNumVal()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching recently finished flows", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String userNameContains, int status, long startTime, long endTime, int skip, int num) throws ExecutorManagerException {
        String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
        ArrayList<Object> params = new ArrayList<Object>();
        boolean first = true;
        if (projContain != null && !projContain.isEmpty()) {
            query = query + " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
            params.add('%' + projContain + '%');
            first = false;
        }
        if (flowContains != null && !flowContains.isEmpty()) {
            if (first) {
                query = query + " WHERE ";
                first = false;
            } else {
                query = query + " AND ";
            }
            query = query + " flow_id LIKE ?";
            params.add('%' + flowContains + '%');
        }
        if (userNameContains != null && !userNameContains.isEmpty()) {
            if (first) {
                query = query + " WHERE ";
                first = false;
            } else {
                query = query + " AND ";
            }
            query = query + " submit_user LIKE ?";
            params.add('%' + userNameContains + '%');
        }
        if (status != 0) {
            if (first) {
                query = query + " WHERE ";
                first = false;
            } else {
                query = query + " AND ";
            }
            query = query + " status = ?";
            params.add(status);
        }
        if (startTime > 0L) {
            if (first) {
                query = query + " WHERE ";
                first = false;
            } else {
                query = query + " AND ";
            }
            query = query + " start_time > ?";
            params.add(startTime);
        }
        if (endTime > 0L) {
            query = first ? query + " WHERE " : query + " AND ";
            query = query + " end_time < ?";
            params.add(endTime);
        }
        if (skip > -1 && num > 0) {
            query = query + "  ORDER BY exec_id DESC LIMIT ?, ?";
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
        this.updateExecutableFlow(flow, EncodingType.GZIP);
    }

    private void updateExecutableFlow(ExecutableFlow flow, EncodingType encType) throws ExecutorManagerException {
        String UPDATE_EXECUTABLE_FLOW_DATA = "UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?";
        String json = JSONUtils.toJSON(flow.toObject());
        byte[] data = null;
        try {
            byte[] stringData;
            data = stringData = json.getBytes("UTF-8");
            if (encType == EncodingType.GZIP) {
                data = GZIPUtils.gzipBytes(stringData);
            }
        }
        catch (IOException e) {
            throw new ExecutorManagerException("Error encoding the execution flow.");
        }
        try {
            this.dbOperator.update("UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?", new Object[]{flow.getStatus().getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow.getEndTime(), encType.getNumVal(), data, flow.getExecutionId()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error updating flow.", e);
        }
    }

    public ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
        FetchExecutableFlows flowHandler = new FetchExecutableFlows();
        try {
            List properties = (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, (ResultSetHandler)flowHandler, new Object[]{execId});
            if (properties.isEmpty()) {
                return null;
            }
            return (ExecutableFlow)properties.get(0);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow id " + execId, e);
        }
    }

    private static class FetchRecentlyFinishedFlows
    implements ResultSetHandler<List<ExecutableFlow>> {
        private static final String FETCH_RECENTLY_FINISHED_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE end_time > ? AND status IN (?, ?, ?)";

        private FetchRecentlyFinishedFlows() {
        }

        public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                if (data == null) continue;
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    execFlows.add(exFlow);
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }

    private static class FetchQueuedExecutableFlows
    implements ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
        private static final String FETCH_QUEUED_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows Where executor_id is NULL AND status = " + Status.PREPARING.getNumVal();

        private FetchQueuedExecutableFlows() {
        }

        public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<Pair<ExecutionReference, ExecutableFlow>> execFlows = new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                if (data == null) {
                    logger.error((Object)("Found a flow with empty data blob exec_id: " + id));
                    continue;
                }
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    ExecutionReference ref = new ExecutionReference(id);
                    execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, exFlow));
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }

    public static class FetchExecutableFlows
    implements ResultSetHandler<List<ExecutableFlow>> {
        static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
        static String FETCH_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?";
        static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";

        public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                if (data == null) continue;
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    execFlows.add(exFlow);
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }
}

