HttpDownload.java
/*******************************************************************************
* Copyright (C) 2020 Ram Sadasiv
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
package io.outofprintmagazine.corpus.batch.impl;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.ServiceUnavailableRetryStrategy;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.outofprintmagazine.corpus.batch.CorpusBatchStep;
import io.outofprintmagazine.corpus.batch.ICorpusBatchStep;
public class HttpDownload extends CorpusBatchStep implements ICorpusBatchStep {
private static final Logger logger = LogManager.getLogger(HttpDownload.class);
@SuppressWarnings("unused")
private Logger getLogger() {
return logger;
}
public HttpDownload() {
super();
}
@Override
public ObjectNode getDefaultProperties() {
ObjectNode properties = getMapper().createObjectNode();
properties.put("user-agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1");
properties.put("accept-language", "en-US;q=0.9,en;q=0.8");
properties.put("sleepMillis", "1000");
properties.put("backoffSeconds", "10");
return properties;
}
@Override
public ArrayNode runOne(ObjectNode inputStepItem) throws Exception {
int sleepMillis = getData().getProperties().get("sleepMillis").asInt();
Thread.sleep(ThreadLocalRandom.current().nextInt(sleepMillis, sleepMillis*2));
ArrayNode retval = getMapper().createArrayNode();
ObjectNode outputStepItem = getMapper().createObjectNode();
ObjectReader objectReader = getMapper().readerForUpdating(outputStepItem);
objectReader.readValue(inputStepItem);
ObjectNode storageProperties = getMapper().createObjectNode();
getLogger().info(inputStepItem.get("link").asText());
String linkContent = httpDownload(
inputStepItem.get("link").asText(),
storageProperties
);
if (storageProperties.has("Last-Modified")) {
setDate(
storageProperties.get("Last-Modified").asText(
getDateFormat().format(
new Date(System.currentTimeMillis())
)
),
outputStepItem
);
}
try {
setStorageLink(
getStorage().storeScratchFileString(
getData().getCorpusId(),
getOutputScratchFilePathFromInput(
inputStepItem,
getExtensionFromMimeType(storageProperties.get("mimeType").asText("html"))
),
linkContent
),
outputStepItem
);
}
catch (IOException ioe) {
setStorageLink(
getStorage().storeScratchFileString(
getData().getCorpusId(),
getStorage().getScratchFilePath(
getData().getCorpusBatchId(),
String.format(
"%s-%s",
getData().getCorpusBatchStepSequenceId().toString(),
getData().getCorpusBatchStepId()
),
String.format("%s.%s",
DigestUtils.md5Hex(inputStepItem.get("link").asText()).toUpperCase(),
"html"
)
),
linkContent
),
outputStepItem
);
}
retval.add(outputStepItem);
return retval;
}
protected String httpDownload(String url, ObjectNode properties) throws IOException {
String responseBody = null;
CloseableHttpClient httpclient = getHttpClient();
try {
HttpGet http = new HttpGet(url);
http.setHeader("user-agent", getData().getProperties().get("user-agent").asText());
http.setHeader("accept-language", getData().getProperties().get("accept-language").asText());
responseBody = httpclient.execute(
http,
new PropertiesResponseHandler(properties)
);
}
finally {
httpclient.close();
}
return responseBody;
}
protected CloseableHttpClient getHttpClient() {
return HttpClients.custom()
.setServiceUnavailableRetryStrategy(
new ServiceUnavailableRetryStrategy() {
public boolean retryRequest(
final HttpResponse response, final int executionCount, final HttpContext context) {
int statusCode = response.getStatusLine().getStatusCode();
return (statusCode == 503 || statusCode == 500 || statusCode == 429) && executionCount < 5;
}
public long getRetryInterval() {
return getData().getProperties().get("backoffSeconds").asLong();
}
})
.setRedirectStrategy(new LaxRedirectStrategy())
.build();
}
class PropertiesResponseHandler implements ResponseHandler<String> {
ObjectNode properties;
public PropertiesResponseHandler(ObjectNode properties) {
super();
this.properties = properties;
}
public String handleResponse(HttpResponse response) throws ClientProtocolException, IOException {
int status = response.getStatusLine().getStatusCode();
if (status >= 200 && status < 300) {
HttpEntity entity = response.getEntity();
for (Header header : response.getAllHeaders()) {
if (!properties.has(header.getName())) {
properties.put(header.getName(), header.getValue());
}
else {
JsonNode existingNode = properties.get(header.getName());
if (existingNode.isArray()) {
ArrayNode aExistingNode = (ArrayNode) existingNode;
aExistingNode.add(header.getValue());
}
else {
java.lang.String existingValue = existingNode.asText();
ArrayNode aExistingNode = properties.putArray(header.getName());
aExistingNode.add(existingValue);
aExistingNode.add(header.getValue());
}
}
}
ContentType contentType = ContentType.getOrDefault(entity);
if (contentType != null && contentType.getMimeType() != null) {
properties.put("mimeType", contentType.getMimeType());
}
if (contentType != null && contentType.getCharset() != null) {
properties.put("charset", contentType.getCharset().name());
}
else {
properties.put("charset", StandardCharsets.UTF_8.name());
}
return EntityUtils.toString(entity, properties.get("charset").asText(StandardCharsets.UTF_8.name()));
}
else {
throw new ClientProtocolException("Unexpected response status: " + status);
}
}
}
}