重构 FetchTask 以使用 HttpClient 和虚拟线程 (#4546)
This commit is contained in:
@@ -34,7 +34,7 @@ import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
@@ -244,7 +244,7 @@ public class Skin {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context getContext(URLConnection connection, boolean checkETag, String bmclapiHash) throws IOException {
|
||||
protected Context getContext(HttpResponse<?> response, boolean checkETag, String bmclapiHash) throws IOException {
|
||||
return new Context() {
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
||||
@@ -260,7 +260,7 @@ public class Skin {
|
||||
setResult(new ByteArrayInputStream(baos.toByteArray()));
|
||||
|
||||
if (checkETag) {
|
||||
repository.cacheBytes(connection, baos.toByteArray());
|
||||
repository.cacheBytes(response, baos.toByteArray());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
@@ -45,6 +45,9 @@ public final class CacheFileTask extends FetchTask<Path> {
|
||||
public CacheFileTask(@NotNull URI uri) {
|
||||
super(List.of(uri));
|
||||
setName(uri.toString());
|
||||
|
||||
if (!NetworkUtils.isHttpUri(uri))
|
||||
throw new IllegalArgumentException(uri.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -69,8 +72,9 @@ public final class CacheFileTask extends FetchTask<Path> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context getContext(URLConnection connection, boolean checkETag, String bmclapiHash) throws IOException {
|
||||
protected Context getContext(HttpResponse<?> response, boolean checkETag, String bmclapiHash) throws IOException {
|
||||
assert checkETag;
|
||||
assert response != null;
|
||||
|
||||
Path temp = Files.createTempFile("hmcl-download-", null);
|
||||
OutputStream fileOutput = Files.newOutputStream(temp);
|
||||
@@ -99,7 +103,7 @@ public final class CacheFileTask extends FetchTask<Path> {
|
||||
}
|
||||
|
||||
try {
|
||||
setResult(repository.cacheRemoteFile(connection, temp));
|
||||
setResult(repository.cacheRemoteFile(response, temp));
|
||||
} finally {
|
||||
try {
|
||||
Files.deleteIfExists(temp);
|
||||
|
||||
@@ -19,31 +19,42 @@ package org.jackhuang.hmcl.task;
|
||||
|
||||
import org.jackhuang.hmcl.event.Event;
|
||||
import org.jackhuang.hmcl.event.EventBus;
|
||||
import org.jackhuang.hmcl.util.CacheRepository;
|
||||
import org.jackhuang.hmcl.util.DigestUtils;
|
||||
import org.jackhuang.hmcl.util.StringUtils;
|
||||
import org.jackhuang.hmcl.util.ToStringBuilder;
|
||||
import org.jackhuang.hmcl.util.*;
|
||||
import org.jackhuang.hmcl.util.io.ContentEncoding;
|
||||
import org.jackhuang.hmcl.util.io.IOUtils;
|
||||
import org.jackhuang.hmcl.util.io.NetworkUtils;
|
||||
import org.jackhuang.hmcl.util.io.ResponseCodeException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.jackhuang.hmcl.util.Lang.threadPool;
|
||||
import static org.jackhuang.hmcl.util.logging.Logger.LOG;
|
||||
|
||||
public abstract class FetchTask<T> extends Task<T> {
|
||||
private static final HttpClient HTTP_CLIENT;
|
||||
|
||||
static {
|
||||
boolean useHttp2 = !"false".equalsIgnoreCase(System.getProperty("hmcl.http2"));
|
||||
|
||||
HTTP_CLIENT = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofMillis(NetworkUtils.TIME_OUT))
|
||||
.version(useHttp2 ? HttpClient.Version.HTTP_2 : HttpClient.Version.HTTP_1_1)
|
||||
.build();
|
||||
}
|
||||
|
||||
protected static final int DEFAULT_RETRY = 3;
|
||||
|
||||
protected final List<URI> uris;
|
||||
@@ -58,7 +69,7 @@ public abstract class FetchTask<T> extends Task<T> {
|
||||
if (this.uris.isEmpty())
|
||||
throw new IllegalArgumentException("At least one URL is required");
|
||||
|
||||
setExecutor(download());
|
||||
setExecutor(DOWNLOAD_EXECUTOR);
|
||||
}
|
||||
|
||||
public void setRetry(int retry) {
|
||||
@@ -79,12 +90,14 @@ public abstract class FetchTask<T> extends Task<T> {
|
||||
|
||||
protected abstract EnumCheckETag shouldCheckETag();
|
||||
|
||||
protected abstract Context getContext(URLConnection connection, boolean checkETag, String bmclapiHash) throws IOException;
|
||||
private Context getContext() throws IOException {
|
||||
return getContext(null, false, null);
|
||||
}
|
||||
|
||||
protected abstract Context getContext(@Nullable HttpResponse<?> response, boolean checkETag, String bmclapiHash) throws IOException;
|
||||
|
||||
@Override
|
||||
public void execute() throws Exception {
|
||||
Exception exception = null;
|
||||
URI failedURI = null;
|
||||
boolean checkETag;
|
||||
switch (shouldCheckETag()) {
|
||||
case CHECK_E_TAG:
|
||||
@@ -97,163 +110,247 @@ public abstract class FetchTask<T> extends Task<T> {
|
||||
return;
|
||||
}
|
||||
|
||||
int repeat = 0;
|
||||
download:
|
||||
for (URI uri : uris) {
|
||||
if (checkETag) {
|
||||
// Handle cache
|
||||
ArrayList<DownloadException> exceptions = null;
|
||||
|
||||
if (SEMAPHORE != null)
|
||||
SEMAPHORE.acquire();
|
||||
try {
|
||||
for (URI uri : uris) {
|
||||
try {
|
||||
Path cache = repository.getCachedRemoteFile(uri, true);
|
||||
useCachedResult(cache);
|
||||
LOG.info("Using cached file for " + NetworkUtils.dropQuery(uri));
|
||||
if (NetworkUtils.isHttpUri(uri))
|
||||
downloadHttp(uri, checkETag);
|
||||
else
|
||||
downloadNotHttp(uri);
|
||||
return;
|
||||
} catch (IOException ignored) {
|
||||
} catch (DownloadException e) {
|
||||
if (exceptions == null)
|
||||
exceptions = new ArrayList<>();
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
// Cancelled
|
||||
} finally {
|
||||
if (SEMAPHORE != null)
|
||||
SEMAPHORE.release();
|
||||
}
|
||||
|
||||
for (int retryTime = 0; retryTime < retry; retryTime++) {
|
||||
if (isCancelled()) {
|
||||
break download;
|
||||
if (exceptions != null) {
|
||||
DownloadException last = exceptions.remove(exceptions.size() - 1);
|
||||
for (DownloadException exception : exceptions) {
|
||||
last.addSuppressed(exception);
|
||||
}
|
||||
throw last;
|
||||
}
|
||||
}
|
||||
|
||||
private void download(Context context,
|
||||
InputStream inputStream,
|
||||
long contentLength,
|
||||
ContentEncoding contentEncoding) throws IOException, InterruptedException {
|
||||
try (var ignored = context;
|
||||
var counter = new CounterInputStream(inputStream);
|
||||
var input = contentEncoding.wrap(counter)) {
|
||||
long lastDownloaded = 0L;
|
||||
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
|
||||
while (true) {
|
||||
if (isCancelled()) break;
|
||||
|
||||
int len = input.read(buffer);
|
||||
if (len == -1) break;
|
||||
|
||||
context.write(buffer, 0, len);
|
||||
|
||||
if (contentLength >= 0) {
|
||||
// Update progress information per second
|
||||
updateProgress(counter.downloaded, contentLength);
|
||||
}
|
||||
|
||||
List<URI> redirects = null;
|
||||
String bmclapiHash = null;
|
||||
try {
|
||||
beforeDownload(uri);
|
||||
updateProgress(0);
|
||||
updateDownloadSpeed(counter.downloaded - lastDownloaded);
|
||||
lastDownloaded = counter.downloaded;
|
||||
}
|
||||
|
||||
URLConnection conn = NetworkUtils.createConnection(uri);
|
||||
if (isCancelled())
|
||||
throw new InterruptedException();
|
||||
|
||||
if (conn instanceof HttpURLConnection httpConnection) {
|
||||
httpConnection.setRequestProperty("Accept-Encoding", "gzip");
|
||||
updateDownloadSpeed(counter.downloaded - lastDownloaded);
|
||||
|
||||
if (checkETag) repository.injectConnection(httpConnection);
|
||||
Map<String, List<String>> requestProperties = httpConnection.getRequestProperties();
|
||||
if (contentLength >= 0 && counter.downloaded != contentLength)
|
||||
throw new IOException("Unexpected file size: " + counter.downloaded + ", expected: " + contentLength);
|
||||
|
||||
bmclapiHash = httpConnection.getHeaderField("x-bmclapi-hash");
|
||||
if (DigestUtils.isSha1Digest(bmclapiHash)) {
|
||||
Optional<Path> cache = repository.checkExistentFile(null, "SHA-1", bmclapiHash);
|
||||
if (cache.isPresent()) {
|
||||
useCachedResult(cache.get());
|
||||
LOG.info("Using cached file for " + NetworkUtils.dropQuery(uri));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
bmclapiHash = null;
|
||||
}
|
||||
context.withResult(true);
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
int code = httpConnection.getResponseCode();
|
||||
if (code >= 300 && code <= 308 && code != 306 && code != 304) {
|
||||
if (redirects == null) {
|
||||
redirects = new ArrayList<>();
|
||||
} else if (redirects.size() >= 20) {
|
||||
httpConnection.disconnect();
|
||||
throw new IOException("Too much redirects");
|
||||
}
|
||||
|
||||
String location = httpConnection.getHeaderField("Location");
|
||||
|
||||
httpConnection.disconnect();
|
||||
if (StringUtils.isBlank(location))
|
||||
throw new IOException("Redirected to an empty location");
|
||||
|
||||
URI target = NetworkUtils.toURI(httpConnection.getURL())
|
||||
.resolve(NetworkUtils.toURI(location));
|
||||
redirects.add(target);
|
||||
|
||||
if (!NetworkUtils.isHttpUri(target))
|
||||
throw new IOException("Redirected to not http URI: " + target);
|
||||
|
||||
HttpURLConnection redirected = NetworkUtils.createHttpConnection(target);
|
||||
redirected.setUseCaches(checkETag);
|
||||
requestProperties
|
||||
.forEach((key, values) ->
|
||||
values.forEach(element ->
|
||||
redirected.addRequestProperty(key, element)));
|
||||
httpConnection = redirected;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
conn = httpConnection;
|
||||
int responseCode = ((HttpURLConnection) conn).getResponseCode();
|
||||
|
||||
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
|
||||
// Handle cache
|
||||
try {
|
||||
Path cache = repository.getCachedRemoteFile(NetworkUtils.toURI(conn.getURL()), false);
|
||||
useCachedResult(cache);
|
||||
LOG.info("Using cached file for " + NetworkUtils.dropQuery(uri));
|
||||
return;
|
||||
} catch (CacheRepository.CacheExpiredException e) {
|
||||
LOG.info("Cache expired for " + NetworkUtils.dropQuery(uri));
|
||||
} catch (IOException e) {
|
||||
LOG.warning("Unable to use cached file, redownload " + NetworkUtils.dropQuery(uri), e);
|
||||
repository.removeRemoteEntry(conn.getURL().toURI());
|
||||
// Now we must reconnect the server since 304 may result in empty content,
|
||||
// if we want to redownload the file, we must reconnect the server without etag settings.
|
||||
retryTime--;
|
||||
continue;
|
||||
}
|
||||
} else if (responseCode / 100 == 4) {
|
||||
throw new FileNotFoundException(uri.toString());
|
||||
} else if (responseCode / 100 != 2) {
|
||||
throw new ResponseCodeException(uri, responseCode);
|
||||
}
|
||||
}
|
||||
|
||||
long contentLength = conn.getContentLengthLong();
|
||||
var encoding = ContentEncoding.fromConnection(conn);
|
||||
try (var context = getContext(conn, checkETag, bmclapiHash);
|
||||
var counter = new CounterInputStream(conn.getInputStream());
|
||||
var input = encoding.wrap(counter)) {
|
||||
long lastDownloaded = 0L;
|
||||
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
|
||||
while (true) {
|
||||
if (isCancelled()) break;
|
||||
|
||||
int len = input.read(buffer);
|
||||
if (len == -1) break;
|
||||
|
||||
context.write(buffer, 0, len);
|
||||
|
||||
if (contentLength >= 0) {
|
||||
// Update progress information per second
|
||||
updateProgress(counter.downloaded, contentLength);
|
||||
}
|
||||
|
||||
updateDownloadSpeed(counter.downloaded - lastDownloaded);
|
||||
lastDownloaded = counter.downloaded;
|
||||
}
|
||||
|
||||
if (isCancelled()) break download;
|
||||
|
||||
updateDownloadSpeed(counter.downloaded - lastDownloaded);
|
||||
|
||||
if (contentLength >= 0 && counter.downloaded != contentLength)
|
||||
throw new IOException("Unexpected file size: " + counter.downloaded + ", expected: " + contentLength);
|
||||
|
||||
context.withResult(true);
|
||||
}
|
||||
|
||||
return;
|
||||
} catch (FileNotFoundException ex) {
|
||||
failedURI = uri;
|
||||
exception = ex;
|
||||
LOG.warning("Failed to download " + uri + ", not found" + (redirects == null ? "" : ", redirects: " + redirects), ex);
|
||||
|
||||
break; // we will not try this URL again
|
||||
} catch (IOException ex) {
|
||||
failedURI = uri;
|
||||
exception = ex;
|
||||
LOG.warning("Failed to download " + uri + ", repeat times: " + (++repeat) + (redirects == null ? "" : ", redirects: " + redirects), ex);
|
||||
}
|
||||
private void downloadHttp(URI uri, boolean checkETag) throws DownloadException, InterruptedException {
|
||||
if (checkETag) {
|
||||
// Handle cache
|
||||
try {
|
||||
Path cache = repository.getCachedRemoteFile(uri, true);
|
||||
useCachedResult(cache);
|
||||
LOG.info("Using cached file for " + NetworkUtils.dropQuery(uri));
|
||||
return;
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
if (exception != null)
|
||||
throw new DownloadException(failedURI, exception);
|
||||
ArrayList<IOException> exceptions = null;
|
||||
|
||||
for (int retryTime = 0; retryTime < retry; retryTime++) {
|
||||
if (isCancelled()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
List<URI> redirects = null;
|
||||
try {
|
||||
beforeDownload(uri);
|
||||
updateProgress(0);
|
||||
|
||||
HttpResponse<InputStream> response;
|
||||
String bmclapiHash;
|
||||
|
||||
URI currentURI = uri;
|
||||
|
||||
LinkedHashMap<String, String> headers = new LinkedHashMap<>();
|
||||
headers.put("accept-encoding", "gzip");
|
||||
if (checkETag)
|
||||
headers.putAll(repository.injectConnection(uri));
|
||||
|
||||
do {
|
||||
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(currentURI);
|
||||
requestBuilder.timeout(Duration.ofMillis(NetworkUtils.TIME_OUT));
|
||||
headers.forEach(requestBuilder::header);
|
||||
response = HTTP_CLIENT.send(requestBuilder.build(), BODY_HANDLER);
|
||||
|
||||
bmclapiHash = response.headers().firstValue("x-bmclapi-hash").orElse(null);
|
||||
if (DigestUtils.isSha1Digest(bmclapiHash)) {
|
||||
Optional<Path> cache = repository.checkExistentFile(null, "SHA-1", bmclapiHash);
|
||||
if (cache.isPresent()) {
|
||||
useCachedResult(cache.get());
|
||||
LOG.info("Using cached file for " + NetworkUtils.dropQuery(uri));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
int code = response.statusCode();
|
||||
if (code >= 300 && code <= 308 && code != 306 && code != 304) {
|
||||
if (redirects == null) {
|
||||
redirects = new ArrayList<>();
|
||||
} else if (redirects.size() >= 20) {
|
||||
throw new IOException("Too much redirects");
|
||||
}
|
||||
|
||||
String location = response.headers().firstValue("Location").orElse(null);
|
||||
if (StringUtils.isBlank(location))
|
||||
throw new IOException("Redirected to an empty location");
|
||||
|
||||
URI target = currentURI.resolve(NetworkUtils.encodeLocation(location));
|
||||
redirects.add(target);
|
||||
|
||||
if (!NetworkUtils.isHttpUri(target))
|
||||
throw new IOException("Redirected to not http URI: " + target);
|
||||
|
||||
currentURI = target;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
|
||||
int responseCode = response.statusCode();
|
||||
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
|
||||
// Handle cache
|
||||
try {
|
||||
Path cache = repository.getCachedRemoteFile(currentURI, false);
|
||||
useCachedResult(cache);
|
||||
LOG.info("Using cached file for " + NetworkUtils.dropQuery(uri));
|
||||
return;
|
||||
} catch (CacheRepository.CacheExpiredException e) {
|
||||
LOG.info("Cache expired for " + NetworkUtils.dropQuery(uri));
|
||||
} catch (IOException e) {
|
||||
LOG.warning("Unable to use cached file, redownload " + NetworkUtils.dropQuery(uri), e);
|
||||
repository.removeRemoteEntry(currentURI);
|
||||
// Now we must reconnect the server since 304 may result in empty content,
|
||||
// if we want to redownload the file, we must reconnect the server without etag settings.
|
||||
retryTime--;
|
||||
continue;
|
||||
}
|
||||
} else if (responseCode / 100 == 4) {
|
||||
throw new FileNotFoundException(uri.toString());
|
||||
} else if (responseCode / 100 != 2) {
|
||||
throw new ResponseCodeException(uri, responseCode);
|
||||
}
|
||||
|
||||
long contentLength = response.headers().firstValueAsLong("content-length").orElse(-1L);
|
||||
var contentEncoding = ContentEncoding.fromResponse(response);
|
||||
|
||||
download(getContext(response, checkETag, bmclapiHash),
|
||||
response.body(),
|
||||
contentLength,
|
||||
contentEncoding);
|
||||
return;
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warning("Failed to download " + uri + ", not found" + (redirects == null ? "" : ", redirects: " + redirects), ex);
|
||||
throw toDownloadException(uri, ex, exceptions); // we will not try this URL again
|
||||
} catch (IOException ex) {
|
||||
if (exceptions == null)
|
||||
exceptions = new ArrayList<>();
|
||||
|
||||
exceptions.add(ex);
|
||||
|
||||
LOG.warning("Failed to download " + uri + ", repeat times: " + retryTime + (redirects == null ? "" : ", redirects: " + redirects), ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw toDownloadException(uri, null, exceptions);
|
||||
}
|
||||
|
||||
private void downloadNotHttp(URI uri) throws DownloadException, InterruptedException {
|
||||
ArrayList<IOException> exceptions = null;
|
||||
for (int retryTime = 0; retryTime < retry; retryTime++) {
|
||||
if (isCancelled()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
try {
|
||||
beforeDownload(uri);
|
||||
updateProgress(0);
|
||||
|
||||
URLConnection conn = NetworkUtils.createConnection(uri);
|
||||
download(getContext(),
|
||||
conn.getInputStream(),
|
||||
conn.getContentLengthLong(),
|
||||
ContentEncoding.fromConnection(conn));
|
||||
return;
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warning("Failed to download " + uri + ", not found", ex);
|
||||
|
||||
throw toDownloadException(uri, ex, exceptions); // we will not try this URL again
|
||||
} catch (IOException ex) {
|
||||
if (exceptions == null)
|
||||
exceptions = new ArrayList<>();
|
||||
|
||||
exceptions.add(ex);
|
||||
LOG.warning("Failed to download " + uri + ", repeat times: " + retryTime, ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw toDownloadException(uri, null, exceptions);
|
||||
}
|
||||
|
||||
private static DownloadException toDownloadException(URI uri, @Nullable IOException last, @Nullable ArrayList<IOException> exceptions) {
|
||||
if (exceptions == null || exceptions.isEmpty()) {
|
||||
return new DownloadException(uri, last != null
|
||||
? last
|
||||
: new IOException("No exceptions"));
|
||||
} else {
|
||||
if (last == null)
|
||||
last = exceptions.remove(exceptions.size() - 1);
|
||||
|
||||
for (IOException e : exceptions) {
|
||||
last.addSuppressed(e);
|
||||
}
|
||||
return new DownloadException(uri, last);
|
||||
}
|
||||
}
|
||||
|
||||
private static final Timer timer = new Timer("DownloadSpeedRecorder", true);
|
||||
@@ -341,48 +438,74 @@ public abstract class FetchTask<T> extends Task<T> {
|
||||
CACHED
|
||||
}
|
||||
|
||||
private static final HttpResponse.BodyHandler<InputStream> BODY_HANDLER = responseInfo -> {
|
||||
if (responseInfo.statusCode() / 100 == 2)
|
||||
return HttpResponse.BodySubscribers.ofInputStream();
|
||||
else
|
||||
return HttpResponse.BodySubscribers.replacing(null);
|
||||
};
|
||||
|
||||
public static int DEFAULT_CONCURRENCY = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);
|
||||
private static int downloadExecutorConcurrency = DEFAULT_CONCURRENCY;
|
||||
private static volatile ThreadPoolExecutor DOWNLOAD_EXECUTOR;
|
||||
|
||||
/**
|
||||
* Get singleton instance of the thread pool for file downloading.
|
||||
*
|
||||
* @return Thread pool for FetchTask
|
||||
*/
|
||||
protected static ExecutorService download() {
|
||||
if (DOWNLOAD_EXECUTOR == null) {
|
||||
synchronized (Schedulers.class) {
|
||||
if (DOWNLOAD_EXECUTOR == null) {
|
||||
DOWNLOAD_EXECUTOR = threadPool("Download", true, downloadExecutorConcurrency, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
// For Java 21 or later, DOWNLOAD_EXECUTOR dispatches tasks to virtual threads, and concurrency is controlled by SEMAPHORE.
|
||||
// For versions earlier than Java 21, DOWNLOAD_EXECUTOR is a ThreadPoolExecutor, SEMAPHORE is null, and concurrency is controlled by the thread pool size.
|
||||
|
||||
private static final ExecutorService DOWNLOAD_EXECUTOR;
|
||||
private static final @Nullable Semaphore SEMAPHORE;
|
||||
|
||||
static {
|
||||
ExecutorService executorService = Schedulers.newVirtualThreadPerTaskExecutor("Download");
|
||||
if (executorService != null) {
|
||||
DOWNLOAD_EXECUTOR = executorService;
|
||||
SEMAPHORE = new Semaphore(DEFAULT_CONCURRENCY);
|
||||
} else {
|
||||
DOWNLOAD_EXECUTOR = threadPool("Download", true, downloadExecutorConcurrency, 10, TimeUnit.SECONDS);
|
||||
SEMAPHORE = null;
|
||||
}
|
||||
|
||||
return DOWNLOAD_EXECUTOR;
|
||||
}
|
||||
|
||||
@FXThread
|
||||
public static void setDownloadExecutorConcurrency(int concurrency) {
|
||||
concurrency = Math.max(concurrency, 1);
|
||||
synchronized (Schedulers.class) {
|
||||
downloadExecutorConcurrency = concurrency;
|
||||
|
||||
ThreadPoolExecutor downloadExecutor = DOWNLOAD_EXECUTOR;
|
||||
if (downloadExecutor != null) {
|
||||
if (downloadExecutor.getMaximumPoolSize() <= concurrency) {
|
||||
downloadExecutor.setMaximumPoolSize(concurrency);
|
||||
downloadExecutor.setCorePoolSize(concurrency);
|
||||
} else {
|
||||
downloadExecutor.setCorePoolSize(concurrency);
|
||||
downloadExecutor.setMaximumPoolSize(concurrency);
|
||||
int prevDownloadExecutorConcurrency = downloadExecutorConcurrency;
|
||||
int change = concurrency - prevDownloadExecutorConcurrency;
|
||||
if (change == 0)
|
||||
return;
|
||||
|
||||
downloadExecutorConcurrency = concurrency;
|
||||
if (SEMAPHORE != null) {
|
||||
if (change > 0) {
|
||||
SEMAPHORE.release(change);
|
||||
} else {
|
||||
int permits = -change;
|
||||
if (!SEMAPHORE.tryAcquire(permits)) {
|
||||
Schedulers.io().execute(() -> {
|
||||
try {
|
||||
for (int i = 0; i < permits; i++) {
|
||||
SEMAPHORE.acquire();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError("Unreachable", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var downloadExecutor = (ThreadPoolExecutor) DOWNLOAD_EXECUTOR;
|
||||
|
||||
if (downloadExecutor.getMaximumPoolSize() <= concurrency) {
|
||||
downloadExecutor.setMaximumPoolSize(concurrency);
|
||||
downloadExecutor.setCorePoolSize(concurrency);
|
||||
} else {
|
||||
downloadExecutor.setCorePoolSize(concurrency);
|
||||
downloadExecutor.setMaximumPoolSize(concurrency);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static int getDownloadExecutorConcurrency() {
|
||||
synchronized (Schedulers.class) {
|
||||
return downloadExecutorConcurrency;
|
||||
}
|
||||
return downloadExecutorConcurrency;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.jackhuang.hmcl.util.io.NetworkUtils;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.file.FileSystem;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@@ -185,7 +185,7 @@ public class FileDownloadTask extends FetchTask<Void> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context getContext(URLConnection connection, boolean checkETag, String bmclapiHash) throws IOException {
|
||||
protected Context getContext(HttpResponse<?> response, boolean checkETag, String bmclapiHash) throws IOException {
|
||||
Path temp = Files.createTempFile(null, null);
|
||||
|
||||
String algorithm;
|
||||
@@ -260,7 +260,7 @@ public class FileDownloadTask extends FetchTask<Void> {
|
||||
}
|
||||
|
||||
if (checkETag) {
|
||||
repository.cacheRemoteFile(connection, file);
|
||||
repository.cacheRemoteFile(response, file);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -24,7 +24,9 @@ import org.jackhuang.hmcl.util.io.NetworkUtils;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
@@ -59,9 +61,11 @@ public final class GetTask extends FetchTask<String> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context getContext(URLConnection connection, boolean checkETag, String bmclapiHash) {
|
||||
int length = connection.getContentLength();
|
||||
final var baos = new ByteArrayOutputStream(length <= 0 ? 8192 : length);
|
||||
protected Context getContext(HttpResponse<?> response, boolean checkETag, String bmclapiHash) {
|
||||
long length = -1;
|
||||
if (response != null)
|
||||
length = response.headers().firstValueAsLong("content-length").orElse(-1L);
|
||||
final var baos = new ByteArrayOutputStream(length <= 0 ? 8192 : (int) length);
|
||||
|
||||
return new Context() {
|
||||
@Override
|
||||
@@ -73,11 +77,15 @@ public final class GetTask extends FetchTask<String> {
|
||||
public void close() throws IOException {
|
||||
if (!isSuccess()) return;
|
||||
|
||||
String result = baos.toString(NetworkUtils.getCharsetFromContentType(connection.getContentType()));
|
||||
Charset charset = StandardCharsets.UTF_8;
|
||||
if (response != null)
|
||||
charset = NetworkUtils.getCharsetFromContentType(response.headers().firstValue("content-type").orElse(null));
|
||||
|
||||
String result = baos.toString(charset);
|
||||
setResult(result);
|
||||
|
||||
if (checkETag) {
|
||||
repository.cacheText(connection, result);
|
||||
repository.cacheText(response, result);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -23,12 +23,13 @@ import org.jackhuang.hmcl.util.function.ExceptionalSupplier;
|
||||
import org.jackhuang.hmcl.util.gson.JsonUtils;
|
||||
import org.jackhuang.hmcl.util.io.FileUtils;
|
||||
import org.jackhuang.hmcl.util.io.NetworkUtils;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileLock;
|
||||
@@ -184,15 +185,35 @@ public class CacheRepository {
|
||||
}
|
||||
}
|
||||
|
||||
public void injectConnection(HttpURLConnection conn) {
|
||||
conn.setUseCaches(true);
|
||||
|
||||
URI uri;
|
||||
public @NotNull Map<String, String> injectConnection(URI uri) {
|
||||
try {
|
||||
uri = NetworkUtils.dropQuery(NetworkUtils.toURI(conn.getURL()));
|
||||
uri = NetworkUtils.dropQuery(uri);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
ETagItem eTagItem;
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
eTagItem = index.get(uri);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
if (eTagItem == null) return Map.of();
|
||||
if (eTagItem.eTag != null)
|
||||
return Map.of("if-none-match", eTagItem.eTag);
|
||||
// if (eTagItem.getRemoteLastModified() != null)
|
||||
// conn.setRequestProperty("If-Modified-Since", eTagItem.getRemoteLastModified());
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
public void injectConnection(URI uri, HttpRequest.Builder requestBuilder) {
|
||||
try {
|
||||
uri = NetworkUtils.dropQuery(uri);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return;
|
||||
}
|
||||
|
||||
ETagItem eTagItem;
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
@@ -202,25 +223,25 @@ public class CacheRepository {
|
||||
}
|
||||
if (eTagItem == null) return;
|
||||
if (eTagItem.eTag != null)
|
||||
conn.setRequestProperty("If-None-Match", eTagItem.eTag);
|
||||
requestBuilder.header("if-none-match", eTagItem.eTag);
|
||||
// if (eTagItem.getRemoteLastModified() != null)
|
||||
// conn.setRequestProperty("If-Modified-Since", eTagItem.getRemoteLastModified());
|
||||
}
|
||||
|
||||
public Path cacheRemoteFile(URLConnection connection, Path downloaded) throws IOException {
|
||||
return cacheData(connection, () -> {
|
||||
public Path cacheRemoteFile(HttpResponse<?> response, Path downloaded) throws IOException {
|
||||
return cacheData(response, () -> {
|
||||
String hash = DigestUtils.digestToString(SHA1, downloaded);
|
||||
Path cached = cacheFile(downloaded, SHA1, hash);
|
||||
return new CacheResult(hash, cached);
|
||||
});
|
||||
}
|
||||
|
||||
public Path cacheText(URLConnection connection, String text) throws IOException {
|
||||
return cacheBytes(connection, text.getBytes(UTF_8));
|
||||
public Path cacheText(HttpResponse<?> response, String text) throws IOException {
|
||||
return cacheBytes(response, text.getBytes(UTF_8));
|
||||
}
|
||||
|
||||
public Path cacheBytes(URLConnection connection, byte[] bytes) throws IOException {
|
||||
return cacheData(connection, () -> {
|
||||
public Path cacheBytes(HttpResponse<?> response, byte[] bytes) throws IOException {
|
||||
return cacheData(response, () -> {
|
||||
String hash = DigestUtils.digestToString(SHA1, bytes);
|
||||
Path cached = getFile(SHA1, hash);
|
||||
Files.createDirectories(cached.getParent());
|
||||
@@ -231,20 +252,15 @@ public class CacheRepository {
|
||||
|
||||
private static final Pattern MAX_AGE = Pattern.compile("(s-maxage|max-age)=(?<time>[0-9]+)");
|
||||
|
||||
private Path cacheData(URLConnection connection, ExceptionalSupplier<CacheResult, IOException> cacheSupplier) throws IOException {
|
||||
String eTag = connection.getHeaderField("ETag");
|
||||
private Path cacheData(HttpResponse<?> response, ExceptionalSupplier<CacheResult, IOException> cacheSupplier) throws IOException {
|
||||
String eTag = response.headers().firstValue("etag").orElse(null);
|
||||
if (StringUtils.isBlank(eTag)) return null;
|
||||
URI uri;
|
||||
try {
|
||||
uri = NetworkUtils.dropQuery(NetworkUtils.toURI(connection.getURL()));
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
URI uri = NetworkUtils.dropQuery(response.uri());
|
||||
long expires = 0L;
|
||||
|
||||
expires:
|
||||
try {
|
||||
String cacheControl = connection.getHeaderField("Cache-Control");
|
||||
String cacheControl = response.headers().firstValue("cache-control").orElse(null);
|
||||
if (StringUtils.isNotBlank(cacheControl)) {
|
||||
if (cacheControl.contains("no-store"))
|
||||
return null;
|
||||
@@ -257,7 +273,7 @@ public class CacheRepository {
|
||||
}
|
||||
}
|
||||
|
||||
String expiresHeader = connection.getHeaderField("Expires");
|
||||
String expiresHeader = response.headers().firstValue("expires").orElse(null);
|
||||
if (StringUtils.isNotBlank(expiresHeader)) {
|
||||
expires = ZonedDateTime.parse(expiresHeader.trim(), DateTimeFormatter.RFC_1123_DATE_TIME)
|
||||
.toInstant().toEpochMilli();
|
||||
@@ -266,7 +282,7 @@ public class CacheRepository {
|
||||
LOG.warning("Failed to parse expires time", e);
|
||||
}
|
||||
|
||||
String lastModified = connection.getHeaderField("Last-Modified");
|
||||
String lastModified = response.headers().firstValue("last-modified").orElse(null);
|
||||
|
||||
CacheResult cacheResult = cacheSupplier.get();
|
||||
ETagItem eTagItem = new ETagItem(uri.toString(),
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URLConnection;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/**
|
||||
@@ -52,5 +53,16 @@ public enum ContentEncoding {
|
||||
}
|
||||
}
|
||||
|
||||
public static @NotNull ContentEncoding fromResponse(HttpResponse<?> connection) throws IOException {
|
||||
String encoding = connection.headers().firstValue("content-encoding").orElse("");
|
||||
if (encoding.isEmpty() || "identity".equals(encoding)) {
|
||||
return IDENTITY;
|
||||
} else if ("gzip".equalsIgnoreCase(encoding)) {
|
||||
return GZIP;
|
||||
} else {
|
||||
throw new IOException("Unsupported content encoding: " + encoding);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract InputStream wrap(InputStream inputStream) throws IOException;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user