fix: unable to cancel tasks

This commit is contained in:
huanghongxun
2020-02-02 23:34:42 +08:00
parent 214ff9de94
commit 1d579c52eb
4 changed files with 38 additions and 11 deletions

View File

@@ -79,7 +79,7 @@ public final class GameAssetDownloadTask extends Task<Void> {
int progress = 0;
if (index != null)
for (AssetObject assetObject : index.getObjects().values()) {
if (Thread.interrupted())
if (isCancelled())
throw new InterruptedException();
File file = dependencyManager.getGameRepository().getAssetObject(version.getId(), assetIndexInfo.getId(), assetObject);

View File

@@ -17,11 +17,13 @@
*/
package org.jackhuang.hmcl.task;
import org.jackhuang.hmcl.event.EventManager;
import org.jackhuang.hmcl.event.FailedEvent;
import org.jackhuang.hmcl.util.CacheRepository;
import org.jackhuang.hmcl.util.Logging;
import org.jackhuang.hmcl.util.io.*;
import org.jackhuang.hmcl.util.io.ChecksumMismatchException;
import org.jackhuang.hmcl.util.io.FileUtils;
import org.jackhuang.hmcl.util.io.IOUtils;
import org.jackhuang.hmcl.util.io.NetworkUtils;
import org.jackhuang.hmcl.util.io.ResponseCodeException;
import java.io.File;
import java.io.IOException;
@@ -208,8 +210,7 @@ public class FileDownloadTask extends Task<Void> {
for (int repeat = 0; repeat < retry; repeat++) {
URL url = urls.get(repeat % urls.size());
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
if (isCancelled()) {
break;
}
@@ -253,8 +254,7 @@ public class FileDownloadTask extends Task<Void> {
long lastTime = System.currentTimeMillis();
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
if (isCancelled()) {
break;
}
@@ -283,9 +283,8 @@ public class FileDownloadTask extends Task<Void> {
closeFiles();
// Restore temp file to original name.
if (Thread.interrupted()) {
if (isCancelled()) {
temp.toFile().delete();
Thread.currentThread().interrupt();
break;
} else {
Files.deleteIfExists(file.toPath());

View File

@@ -38,6 +38,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
/**
@@ -62,6 +63,22 @@ public abstract class Task<T> {
this.significance = significance;
}
// cancel
private Supplier<Boolean> cancelled;
final void setCancelled(Supplier<Boolean> cancelled) {
this.cancelled = cancelled;
}
protected final boolean isCancelled() {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
}
return cancelled != null ? cancelled.get() : false;
}
// state
private TaskState state = TaskState.READY;

View File

@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@@ -41,6 +42,7 @@ public final class TaskExecutor {
private Exception exception;
private final AtomicInteger totTask = new AtomicInteger(0);
private CompletableFuture<Boolean> future;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
public TaskExecutor(Task<?> task) {
this.firstTask = task;
@@ -106,9 +108,14 @@ public final class TaskExecutor {
throw new IllegalStateException("Cannot cancel a not started TaskExecutor");
}
cancelled.set(true);
future.cancel(true);
}
public boolean isCancelled() {
return cancelled.get();
}
private CompletableFuture<Exception> executeTasks(Collection<Task<?>> tasks) {
if (tasks == null || tasks.isEmpty())
return CompletableFuture.completedFuture(null);
@@ -125,6 +132,9 @@ public final class TaskExecutor {
.thenApplyAsync(unused -> (Exception) null)
.exceptionally(throwable -> {
Throwable resolved = resolveException(throwable);
if (resolved instanceof CancellationException) {
throw (CancellationException)resolved;
}
if (resolved instanceof Exception) {
return (Exception) resolved;
} else {
@@ -137,6 +147,7 @@ public final class TaskExecutor {
private CompletableFuture<?> executeTask(Task<?> task) {
return CompletableFuture.completedFuture(null)
.thenComposeAsync(unused -> {
task.setCancelled(this::isCancelled);
task.setState(Task.TaskState.READY);
if (task.getSignificance().shouldLog())
@@ -207,7 +218,7 @@ public final class TaskExecutor {
Throwable resolved = resolveException(throwable);
if (resolved instanceof Exception) {
Exception e = (Exception) resolved;
if (e instanceof InterruptedException) {
if (e instanceof InterruptedException || e instanceof CancellationException) {
task.setException(e);
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task aborted: " + task.getName());