CorpusBatch.java

/*******************************************************************************
 * Copyright (C) 2020 Ram Sadasiv
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 ******************************************************************************/
package io.outofprintmagazine.corpus.batch;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ArrayNode;

import io.outofprintmagazine.corpus.batch.model.CorpusBatchModel;
import io.outofprintmagazine.corpus.batch.model.CorpusBatchStepModel;
import io.outofprintmagazine.corpus.storage.IBatchStorage;
import io.outofprintmagazine.corpus.storage.IScratchStorage;
import io.outofprintmagazine.util.IParameterStore;


public class CorpusBatch {
	
	private static final Logger logger = LogManager.getLogger(CorpusBatch.class);
	
	private Logger getLogger() {
		return logger;
	}
	
	public CorpusBatch() {
		super();
	}
	
	private CorpusBatchModel data;
	
	public CorpusBatchModel getData() {
		return data;
	}

	public void setData(CorpusBatchModel data) {
		this.data = data;
	}
	
	private IScratchStorage scratchStorage = null;
	
	public IScratchStorage getScratchStorage() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException {
		if (scratchStorage == null) {
			scratchStorage = (IScratchStorage) Class.forName(getData().getScratchStorageClass()).getConstructor().newInstance();
			scratchStorage.setParameterStore(getParameterStore());
		}
		return scratchStorage;
	}
	
	public void setScratchStorage(IScratchStorage storage) {
		this.scratchStorage = storage;
	}
	
	private IBatchStorage batchStorage = null;
	
	public IBatchStorage getBatchStorage() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException {
		if (batchStorage == null) {
			batchStorage = (IBatchStorage) Class.forName(getData().getBatchStorageClass()).getConstructor().newInstance();
			batchStorage.setParameterStore(getParameterStore());
		}
		return batchStorage;
	}
	
	public void setBatchStorage(IBatchStorage batchStorage) {
		this.batchStorage = batchStorage;
	}
	
	private IParameterStore parameterStore = null;
	
	public IParameterStore getParameterStore() throws IOException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, ClassNotFoundException {
		if (parameterStore == null) {
			parameterStore = (IParameterStore) Class.forName(getData().getParameterStoreClass()).getConstructor().newInstance();
			parameterStore.init(getData().getProperties());
		}
		return parameterStore;
	}
	
	public void setBatchStorage(IParameterStore parameterStore) {
		this.parameterStore = parameterStore;
	}
	
	public static CorpusBatch buildFromTemplate(String templateLocation) throws IOException {
    	CorpusBatch corpusBatch = new CorpusBatch();
    	ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    	corpusBatch.setData(
    			mapper.readValue(
    					corpusBatch.getClass().getClassLoader().getResourceAsStream(templateLocation),
    					CorpusBatchModel.class
    			)
    	);
    	return corpusBatch;
	}
	
	private List<String> readStreamToList(InputStream in) throws IOException {
		try {
			List<String> allLines = IOUtils.readLines(
					in,
					StandardCharsets.UTF_8.name()
			);
			List<String> retval = new ArrayList<String>();
			for (String line : allLines) {
				if (!line.startsWith("#") && !line.startsWith("/")) {
					retval.add(line);
				}
			}
			return retval;
		}
		finally {
			in.close();
		}
	}
	
	public void appendAnalyzeStep() throws IOException {
		CorpusBatchStepModel analyzeStep = new CorpusBatchStepModel();
		analyzeStep.setCorpusBatchId(getData().getCorpusBatchId());
		analyzeStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		analyzeStep.setCorpusBatchStepId("Analyze");
		analyzeStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.Analyze");
		ArrayNode customAnnotators = analyzeStep.getProperties().arrayNode();
		List<String> customAnnotatorList = readStreamToList(this.getClass().getClassLoader().getResourceAsStream("io/outofprintmagazine/util/annotators.txt"));
		for (String customAnnotator : customAnnotatorList) {
			customAnnotators.add(customAnnotator);
		}
		analyzeStep.getProperties().set("customAnnotators", customAnnotators);
		getData().getCorpusBatchSteps().add(analyzeStep);
	}
	
	public void appendAggregateStep() throws IOException {
		CorpusBatchStepModel aggregateStep = new CorpusBatchStepModel();
		aggregateStep.setCorpusBatchId(getData().getCorpusBatchId());
		aggregateStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		aggregateStep.setCorpusBatchStepId("CorpusAggregate");
		aggregateStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.CorpusAggregate");
		getData().getCorpusBatchSteps().add(aggregateStep);

		CorpusBatchStepModel aggregateIdfStep = new CorpusBatchStepModel();
		aggregateIdfStep.setCorpusBatchId(getData().getCorpusBatchId());
		aggregateIdfStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		aggregateIdfStep.setCorpusBatchStepId("CoreNLPTfidf");
		aggregateIdfStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.CoreNLPTfidfScores");
		getData().getCorpusBatchSteps().add(aggregateIdfStep);		

		CorpusBatchStepModel aggregateZStep = new CorpusBatchStepModel();
		aggregateZStep.setCorpusBatchId(getData().getCorpusBatchId());
		aggregateZStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		aggregateZStep.setCorpusBatchStepId("CoreNLPZ");
		aggregateZStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.CoreNLPZScores");
		getData().getCorpusBatchSteps().add(aggregateZStep);	

		CorpusBatchStepModel aggregateMBStep = new CorpusBatchStepModel();
		aggregateMBStep.setCorpusBatchId(getData().getCorpusBatchId());
		aggregateMBStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		aggregateMBStep.setCorpusBatchStepId("CoreNLPMB");
		aggregateMBStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.CoreNLPMyersBriggsScores");
		getData().getCorpusBatchSteps().add(aggregateMBStep);		
		
		CorpusBatchStepModel word2vecStep = new CorpusBatchStepModel();
		word2vecStep.setCorpusBatchId(getData().getCorpusBatchId());
		word2vecStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		word2vecStep.setCorpusBatchStepId("DocumentWord2Vec");
		word2vecStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.DocumentWord2Vec");
		getData().getCorpusBatchSteps().add(word2vecStep);
		
		CorpusBatchStepModel corpusWord2vecStep = new CorpusBatchStepModel();
		corpusWord2vecStep.setCorpusBatchId(getData().getCorpusBatchId());
		corpusWord2vecStep.setCorpusBatchStepSequenceId(Integer.valueOf(getData().getCorpusBatchSteps().size()));
		corpusWord2vecStep.setCorpusBatchStepId("CorpusWord2Vec");
		corpusWord2vecStep.setCorpusBatchStepClass("io.outofprintmagazine.corpus.batch.impl.CorpusWord2Vec");
		getData().getCorpusBatchSteps().add(corpusWord2vecStep);
	}
	
	public static CorpusBatch buildFromStagingBatch(String corpusName, String batchName) throws Exception {
		return buildFromStagingBatch(corpusName, batchName, "io.outofprintmagazine.corpus.storage.file.FileCorpora");
	}
	
	public static CorpusBatch buildFromStagingBatch(String corpusName, String batchName, String batchStorageClass) throws Exception {
    	IBatchStorage batchStorage = (IBatchStorage) Class.forName(batchStorageClass).getConstructor().newInstance();
    	CorpusBatch corpusBatch = new CorpusBatch();
    	corpusBatch.setBatchStorage(batchStorage);
    	ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    	corpusBatch.setData(
    			mapper.treeToValue(
    					batchStorage.getStagingBatch(corpusName, batchName), 
    					CorpusBatchModel.class
    			)
    	);
    	return corpusBatch;
	}
	
	public static CorpusBatch buildFromFile(String fileName) throws Exception {
    	CorpusBatch corpusBatch = new CorpusBatch();
    	ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    	corpusBatch.setData(
    			mapper.readValue(
    					new File(fileName), 
    					CorpusBatchModel.class
    			)
    	);
    	return corpusBatch;
	}
	
	public static CorpusBatch buildFromJson(String corpusName, JsonNode data) throws Exception {
    	CorpusBatch corpusBatch = new CorpusBatch();
    	ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    	corpusBatch.setData(
    			mapper.treeToValue(
    					data, 
    					CorpusBatchModel.class
    			)
    	);
    	return corpusBatch;
	}
	
	public static CorpusBatch buildFromString(String corpusName, String data) throws Exception {
    	CorpusBatch corpusBatch = new CorpusBatch();
    	ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    	corpusBatch.setData(
    			mapper.readValue(
    					data, 
    					CorpusBatchModel.class
    			)
    	);
    	return corpusBatch;
	}
	
	public void run() throws Exception {
		List<CorpusBatchStepModel> sortedSteps = new ArrayList<CorpusBatchStepModel>(getData().getCorpusBatchSteps());
		getLogger().debug(sortedSteps.size());
		Collections.sort(sortedSteps);
		ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
		ICorpusBatchStep previousBatchStep = null;
		ICorpusBatchStep currentBatchStep = null;
    	for (CorpusBatchStepModel corpusBatchStepModel : sortedSteps) {

    		currentBatchStep = (ICorpusBatchStep) Class.forName(corpusBatchStepModel.getCorpusBatchStepClass()).getConstructor().newInstance();
    		currentBatchStep.setData(corpusBatchStepModel);
    		currentBatchStep.setStorage(getScratchStorage());
    		currentBatchStep.setParameterStore(getParameterStore());
    		currentBatchStep.getData().setCorpusId(getData().getCorpusId());
    		currentBatchStep.getData().setCorpusBatchId(getData().getCorpusBatchId());
    		getLogger().debug(getData().getCorpusId() + " " + getData().getCorpusBatchId() + " " + currentBatchStep.getData().getCorpusBatchStepId());
    		if (previousBatchStep != null) {
    	    	currentBatchStep.run(previousBatchStep.getData().getOutput());
    		}
    		else {
    			currentBatchStep.run(currentBatchStep.getData().getInput().deepCopy());
    		}
    		getBatchStorage().storeStagingBatchString(getData().getCorpusId(), getData().getCorpusBatchId(), mapper.writeValueAsString(getData()));
    		previousBatchStep = currentBatchStep;
    	}
    	//save incrementally
    	//getBatchStorage().storeStagingBatchString(getData().getCorpusId(), getData().getCorpusBatchId(), mapper.writeValueAsString(getData()));
	}
	
	public void runStep(String stepId) throws Exception {
		List<CorpusBatchStepModel> sortedSteps = new ArrayList<CorpusBatchStepModel>(getData().getCorpusBatchSteps());
		Collections.sort(sortedSteps);
		ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
		ICorpusBatchStep previousBatchStep = null;
		ICorpusBatchStep currentBatchStep = null;
    	for (CorpusBatchStepModel corpusBatchStepModel : sortedSteps) {
    		previousBatchStep = currentBatchStep;
    		currentBatchStep = (ICorpusBatchStep) Class.forName(corpusBatchStepModel.getCorpusBatchStepClass()).getConstructor().newInstance();
    		currentBatchStep.setData(corpusBatchStepModel);
    		if (corpusBatchStepModel.getCorpusBatchStepId().equals(stepId)) {
	    		if (previousBatchStep != null) {
	    			logger.debug(getData().getCorpusId() + " " + getData().getCorpusBatchId() + " " + currentBatchStep.getData().getCorpusBatchStepId());
	    	    	currentBatchStep.run(previousBatchStep.getData().getOutput());
	    		}
	    		else {
	    			currentBatchStep.run(currentBatchStep.getData().getInput());
	    		}
    		}
    	}
    	getBatchStorage().storeStagingBatchString(getData().getCorpusId(), getData().getCorpusBatchId(), mapper.writeValueAsString(getData()));
	}

}