CorpusBatchStep.java

/*******************************************************************************
   * Copyright (C) 2020 Ram Sadasiv
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 ******************************************************************************/
package io.outofprintmagazine.corpus.batch;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.UUID;

import javax.activation.MimetypesFileTypeMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tika.mime.MimeType;
import org.apache.tika.mime.MimeTypeException;
import org.apache.tika.mime.MimeTypes;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.parser.Parser;
import org.jsoup.select.Elements;

import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import edu.stanford.nlp.util.StringUtils;
import io.outofprintmagazine.corpus.batch.model.CorpusBatchStepModel;
import io.outofprintmagazine.corpus.storage.IScratchStorage;
import io.outofprintmagazine.corpus.storage.s3.S3ScratchStorage;
import io.outofprintmagazine.util.IParameterStore;
import io.outofprintmagazine.util.ParameterStorePropertiesFile;

public abstract class CorpusBatchStep implements ICorpusBatchStep {

	@SuppressWarnings("unused")
	private static final Logger logger = LogManager.getLogger(CorpusBatchStep.class);

	@SuppressWarnings("unused")
	private Logger getLogger() {
		return logger;
	}

	protected List<String> dictionaryPOS = Arrays.asList(
			"CC",
			"DT",
			"EX",
			"IN",
			"JJ",
			"JJR",
			"JJS",
			"MD",
			"NN",
			"NNS",
			"PRP",
			"PRP$",
			"RB",
			"RBR",
			"RBS",
			"RP",
			"TO",
			"UH",
			"VB",
			"VBD",
			"VBG",
			"VBN",
			"VBP",
			"VBZ",
			"WDT",
			"WP",
			"WP$",
			"WRB"
	);

	public CorpusBatchStep() {
		super();
	}

	private CorpusBatchStepModel data;

	private ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT).configure(Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);

	protected ObjectMapper getMapper() {
		return mapper;
	}

	private IScratchStorage storage = null;

	private IParameterStore parameterStore = null;

	protected IParameterStore getParameterStore() throws IOException {
		if (parameterStore == null) {
			parameterStore = new ParameterStorePropertiesFile("data", "oopcorenlp.properties");
		}
		return parameterStore;
	}

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#setParameterStore(io.outofprintmagazine.util.IParameterStore)
	 */
	@Override
	public void setParameterStore(IParameterStore parameterStore) {
		this.parameterStore = parameterStore;
	}

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#setStorage(io.outofprintmagazine.corpus.storage.IScratchStorage)
	 */
	@Override
	public void setStorage(IScratchStorage storage) {
		this.storage = storage;
	}

	protected IScratchStorage getStorage() throws IOException {
		if (storage == null) {
			storage = new S3ScratchStorage();
		}
		return storage;
	}

	private SimpleDateFormat fmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z");

	protected SimpleDateFormat getDateFormat() {
		return fmt;
	}

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#getData()
	 */
	@Override
	public CorpusBatchStepModel getData() {
		return data;
	}

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#setData(io.outofprintmagazine.corpus.batch.model.CorpusBatchStepModel)
	 */
	@Override
	public void setData(CorpusBatchStepModel data) {
		this.data = data;
		if (data.getProperties() != null) {
			this.data.setProperties(data.getProperties().deepCopy());
		}
		else {
			this.data.setProperties(getMapper().createObjectNode());
		}
		ObjectNode defaultProperties = getDefaultProperties();
		if (defaultProperties != null) {
			Iterator<Entry<String,JsonNode>> defaultPropsIter = defaultProperties.fields();
			while (defaultPropsIter.hasNext()) {
				Entry<String,JsonNode> defaultProp = defaultPropsIter.next();
				if (!this.data.getProperties().hasNonNull(defaultProp.getKey())) {
					this.data.getProperties().set(defaultProp.getKey(), defaultProp.getValue());
				}
			}
		}
	}

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#getDefaultProperties()
	 */
	@Override
	public ObjectNode getDefaultProperties() {
		return null;
	}

	protected void copyInputToOutput(ObjectNode inputStepItem, ObjectNode outputStepItem) throws IOException {
		ObjectReader objectReader = mapper.readerForUpdating(outputStepItem);
		objectReader.readValue(inputStepItem);
	}

	protected ObjectNode copyInputToOutput(ObjectNode inputStepItem) throws IOException {
		ObjectNode outputStepItem = mapper.createObjectNode();
		copyInputToOutput(inputStepItem, outputStepItem);
		return outputStepItem;

	}

	/*
	 * for each ObjectNode inputItem in input
	 * if inputItem not in Input
	 * try {
	 * 	getOutput().addAll(runOne(inputItem))
	 * 	getInput().add(inputItem)
	 * }
	 * return getOutput()
	 */

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#run(com.fasterxml.jackson.databind.node.ArrayNode)
	 */
	@Override
	public ArrayNode run(ArrayNode input) {
		int count = 0;
		for (JsonNode inputItem : input) {
			//only process masInput inputs
			if (getData().getProperties().has("maxInput") && getData().getProperties().get("maxInput").asInt() < count ) {
				break;
			}
			count++;

			//is this input already in the input array?
			boolean foundInputItem = false;
			for (JsonNode existingInputItem : getData().getInput()) {
				if (existingInputItem.equals(inputItem)) {
					foundInputItem = true;
					break;
				}
			}

			//if it is not in the input array, add it
			if (!foundInputItem) {
				getData().getInput().add(inputItem);
			}

			//if it was not in the input array, or if noCache=true, process it
			if (
				!foundInputItem
				||
				(getData().getProperties().has("noCache") && getData().getProperties().get("noCache").asBoolean())
			) {
				try {
					ArrayNode generatedOutput = runOne((ObjectNode)inputItem);

					//is this output already in the output array?
					for (JsonNode generatedOutputItem : generatedOutput) {
						boolean foundOutputItem = false;
						for (JsonNode existingOutputItem : getData().getOutput()) {
							if (existingOutputItem.equals(generatedOutputItem)) {
								foundOutputItem = true;
								break;
							}
						}

						//if it was not in the output array, add it
						if (!foundOutputItem) {
							getData().getOutput().add(generatedOutputItem);
						}
					}
				}
				catch (Throwable t) {
					t.printStackTrace();
					getLogger().error(t);
				}
			}
		}
		return getData().getOutput();
	}

	/* (non-Javadoc)
	 * @see io.outofprintmagazine.corpus.batch.ICorpusBatchStep#runOne(com.fasterxml.jackson.databind.node.ObjectNode)
	 */
	@Override
	public abstract ArrayNode runOne(ObjectNode input) throws Exception;

//	protected String getText(Document doc) {
//		StringBuffer buf = new StringBuffer();
//		Elements paragraphs = doc.select(getData().getProperties().get("oop_Text").asText());
//		for (Element paragraph : paragraphs) {
//			buf.append(paragraph.wholeText().trim());
//			buf.append('\n');
//			buf.append('\n');
//		}
//		return buf.toString();
//	}

	protected String getText(Element element) {
		StringBuffer buf = new StringBuffer();
		Elements paragraphs = element.select(getData().getProperties().get("oop_Text").asText());
		for (Element paragraph : paragraphs) {
			buf.append(Parser.unescapeEntities(paragraph.wholeText(), false).trim());
			buf.append('\n');
			buf.append('\n');
		}
		return buf.toString();
	}

	protected String getText(ObjectNode outputStepItem) {
		return outputStepItem.get(
				"oop_Text"
		).asText();
	}

	protected String getTextWithSelector(Element element, String selector) {
		try {
			return
					StringUtils.toAscii(
							StringUtils.normalize(
									Parser.unescapeEntities(
											element.select(
													selector
											).text(),
											true
									).trim()
							)
					);
		}
		catch (Exception e) {
			getLogger().error(e);
		}
		return "";
	}

	protected String getAuthor(Document doc) {
		return getTextWithSelector(
				doc,
				getData().getProperties().get("esnlc_AuthorAnnotation").asText()
		);
	}

	protected String getAuthor(ObjectNode outputStepItem) {
		if (outputStepItem.has("esnlc_AuthorAnnotation")) {
			return outputStepItem.get("esnlc_AuthorAnnotation").asText();
		}
		else {
			return "Anonymous";
		}
	}

	protected void setAuthor(String author, ObjectNode outputStepItem) {
		outputStepItem.put(
				"esnlc_AuthorAnnotation",
				author
		);
	}

	protected void setAuthor(Document doc, ObjectNode outputStepItem) {
		setAuthor(getAuthor(doc), outputStepItem);
	}

	protected String getTitle(Document doc) {
		return getTextWithSelector(
				doc,
				getData().getProperties().get("esnlc_DocTitleAnnotation").asText()
		);
	}

	protected String getTitle(ObjectNode outputStepItem) {
		return outputStepItem.get(
				"esnlc_DocTitleAnnotation"
		).asText();
	}

	protected void setTitle(String title, ObjectNode outputStepItem) {
		outputStepItem.put(
				"esnlc_DocTitleAnnotation",
				title
		);
	}

	protected void setTitle(Document doc, ObjectNode outputStepItem) {
		setTitle(getTitle(doc), outputStepItem);
	}

	protected String getThumbnail(Document doc) {
		return getTextWithSelector(
				doc,
				getData().getProperties().get("oop_DocThumbnail").asText()
		);
	}

	protected void setThumbnail(String thumbnail, ObjectNode outputStepItem) {
		outputStepItem.put(
				"oop_DocThumbnail",
				thumbnail
		);
	}

	protected void setThumbnail(Document doc, ObjectNode outputStepItem) {
		setThumbnail(getThumbnail(doc), outputStepItem);
	}

	protected String getDate(Document doc) {
		return
				doc.selectFirst(
						getData().getProperties().get("esnlc_DocDateAnnotation").asText()
				).ownText();
	}

	protected String getDate(ObjectNode outputStepItem) {
		return outputStepItem.get(
				"esnlc_DocDateAnnotation"
		).asText();
	}

	protected void setDate(String date, ObjectNode outputStepItem) {
		outputStepItem.put(
				"esnlc_DocDateAnnotation",
				date
		);
	}

	protected void setDate(Document doc, ObjectNode outputStepItem) {
		setDate(getDate(doc), outputStepItem);
	}

	protected void setDate(ObjectNode outputStepItem) {
		setDate(getDateFormat().format(new Date(System.currentTimeMillis())), outputStepItem);
	}

	protected void setLink(String link, ObjectNode outputStepItem) {
		outputStepItem.put(
				"link",
				link
		);
	}

	protected String getLink(ObjectNode outputStepItem) {
		return outputStepItem.get("link").asText();
	}

	protected void setStorageLink(String storage, ObjectNode outputStepItem) {
		outputStepItem.put(
				"stagingLinkStorage",
				storage
		);
	}

	protected void setDocID(ObjectNode outputStepItem, String docID) {
		outputStepItem.put("esnlc_DocIDAnnotation", docID);
	}

	protected String getDocID(ObjectNode outputStepItem) {
		return outputStepItem.get("esnlc_DocIDAnnotation").asText();
	}

	protected String getStorageLink(ObjectNode outputStepItem) {
		return outputStepItem.get("stagingLinkStorage").asText();
	}

//	protected ObjectNode getPlainTextStorageProperties() {
//		ObjectNode storageProperties = mapper.createObjectNode();
//		//"Sun, 16 Feb 2020 23:17:38 GMT"
//		storageProperties.put("Content-Type", "text/plain");
//		storageProperties.put("mimeType", "text/plain");
//		storageProperties.put("charset",  StandardCharsets.UTF_8.name());
//		storageProperties.put("Date", fmt.format(new Date(System.currentTimeMillis())));
//
//		return storageProperties;
//	}

	protected Document getJsoupDocumentFromStorage(ObjectNode inputStepItem) throws Exception {
		InputStream in = null;
		try {
			in = getStorage().getScratchFileStream(
					getData().getCorpusId(),
					getStorageLink(inputStepItem)
			);

			return Jsoup.parse(
					in,
					StandardCharsets.UTF_8.name(),
					inputStepItem.get("link").asText()
			);
		}
		finally {
			if (in != null) {
				in.close();
				in = null;
			}
		}
	}

	protected Document getJsoupDocumentFromStorageNormalized(ObjectNode inputStepItem) throws Exception {
		return Jsoup.parse(
				getTextDocumentFromStorage(inputStepItem).replace("<br/>", "</p><p>"),
				inputStepItem.get("link").asText()
		);
	}

	protected String getTextDocumentFromStorage(ObjectNode inputStepItem) throws Exception {
		return getStorage().getScratchFileString(
				getData().getCorpusId(),
				getStorageLink(inputStepItem)
		);
	}

	protected String getTextDocumentFromStorage(ObjectNode inputStepItem, String property) throws Exception {
		return getStorage().getScratchFileString(
				getData().getCorpusId(),
				inputStepItem.get(property).asText()
		);
	}

	protected JsonNode getJsonNodeFromStorage(ObjectNode inputStepItem) throws Exception {
		return getMapper().readTree(
				getStorage().getScratchFileStream(
						getData().getCorpusId(),
						getStorageLink(inputStepItem)
				)
		);
	}

	protected JsonNode getJsonNodeFromStorage(ObjectNode inputStepItem, String property) throws Exception {
		return getMapper().readTree(
				getStorage().getScratchFileStream(
						getData().getCorpusId(),
						inputStepItem.get(property).asText()
				)
		);
	}

//	protected JsonNode getStagedStorageProperties(ObjectNode inputStepItem) throws Exception {
//		return getMapper().readTree(
//				storage.getScratchFilePropertiesStream(
//						getData().getCorpusId(),
//						getStorageLink(inputStepItem)
//				)
//		);
//	}

	protected String getOutputScratchFilePathFromInput(ObjectNode inputStepItem, String extension) throws Exception {
		String fileName = UUID.randomUUID().toString();
		if (inputStepItem.has("esnlc_DocIDAnnotation")) {
			fileName = inputStepItem.get("esnlc_DocIDAnnotation").asText();
		}
		else if (inputStepItem.has("stagingLinkStorage")) {
			fileName =
					getStorage().trimFileExtension(
							getStorage().getFileNameFromPath(
									getStorageLink(inputStepItem)
							)
					);
		}
		else if (inputStepItem.has("link")) {
			fileName = URLEncoder.encode(inputStepItem.get("link").asText(), StandardCharsets.UTF_8.name());
		}
		return getOutputScratchFilePath(fileName, extension);

	}

	protected String getOutputScratchFilePath(String fileName) throws Exception {
		return getStorage().getScratchFilePath(
				getData().getCorpusBatchId(),
				getData().getCorpusBatchStepId(),
				fileName
		);
	}

	protected String getOutputScratchFilePath(String fileName, String extension) throws Exception {
		return getStorage().getScratchFilePath(
				getData().getCorpusBatchId(),
				getData().getCorpusBatchStepId(),
				String.format("%s.%s",
						fileName,
						extension
				)
		);
	}

	protected String getMimeTypeFromExtension(String extension) {
		File file = new File("tmp."+extension);
		MimetypesFileTypeMap fileTypeMap = new MimetypesFileTypeMap(this.getClass().getClassLoader().getResourceAsStream("mime.types"));
		return fileTypeMap.getContentType(file.getName());
	}

	protected String getExtensionFromMimeType(String mimeType) throws MimeTypeException {
		MimeTypes allTypes = MimeTypes.getDefaultMimeTypes();
		MimeType tmp = allTypes.forName(mimeType);
		return tmp.getExtension();
	}

	protected boolean isDictionaryWord(String pos) {
		if (pos.equals("NNP") || pos.equals("NNPS")) {
			return true;
		}
		else {
			return dictionaryPOS.contains(pos);
		}
	}

}