fix(task): wrong cancellation implementation. Closes #1035.

This commit is contained in:
huanghongxun
2021-09-17 17:35:24 +08:00
parent e10e1e0613
commit 390f6e016d
6 changed files with 38 additions and 22 deletions

View File

@@ -95,7 +95,6 @@ public final class AsyncTaskExecutor extends TaskExecutor {
}
cancelled.set(true);
future.cancel(true);
}
private CompletableFuture<?> executeTasksExceptionally(Task<?> parentTask, Collection<Task<?>> tasks) {
@@ -229,13 +228,15 @@ public final class AsyncTaskExecutor extends TaskExecutor {
.thenComposeAsync(dependentsException -> {
boolean isDependentsSucceeded = dependentsException == null;
if (!isDependentsSucceeded && task.isRelyingOnDependents()) {
task.setException(dependentsException);
rethrow(dependentsException);
}
if (isDependentsSucceeded)
if (isDependentsSucceeded) {
task.setDependentsSucceeded();
} else {
task.setException(dependentsException);
if (task.isRelyingOnDependents()) {
rethrow(dependentsException);
}
}
return CompletableFuture.runAsync(wrap(() -> {
task.setState(Task.TaskState.RUNNING);
@@ -263,10 +264,12 @@ public final class AsyncTaskExecutor extends TaskExecutor {
.thenApplyAsync(dependenciesException -> {
boolean isDependenciesSucceeded = dependenciesException == null;
if (!isDependenciesSucceeded && task.isRelyingOnDependencies()) {
if (!isDependenciesSucceeded) {
Logging.LOG.severe("Subtasks failed for " + task.getName());
task.setException(dependenciesException);
rethrow(dependenciesException);
if (task.isRelyingOnDependencies()) {
rethrow(dependenciesException);
}
}
checkCancellation();
@@ -285,23 +288,20 @@ public final class AsyncTaskExecutor extends TaskExecutor {
.exceptionally(throwable -> {
Throwable resolved = resolveException(throwable);
if (resolved instanceof Exception) {
Exception e = (Exception) resolved;
if (e instanceof InterruptedException || e instanceof CancellationException) {
task.setException(null);
Exception e = convertInterruptedException((Exception) resolved);
task.setException(e);
exception = e;
if (e instanceof CancellationException) {
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task aborted: " + task.getName());
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
} else {
task.setException(e);
exception = e;
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task failed: " + task.getName(), e);
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
task.setState(Task.TaskState.FAILED);
}
@@ -331,6 +331,14 @@ public final class AsyncTaskExecutor extends TaskExecutor {
}
}
private static Exception convertInterruptedException(Exception e) {
if (e instanceof InterruptedException) {
return new CancellationException(e.getMessage());
} else {
return e;
}
}
private static Thread.UncaughtExceptionHandler uncaughtExceptionHandler = null;
public static void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

View File

@@ -665,14 +665,14 @@ public abstract class Task<T> {
@Override
public void execute() throws Exception {
if (isDependentsSucceeded() != (Task.this.getException() == null))
throw new AssertionError("When dependents succeeded, Task.exception must be nonnull.");
throw new AssertionError("When whenComplete succeeded, Task.exception must be null.");
action.execute(Task.this.getException());
if (!isDependentsSucceeded()) {
setSignificance(TaskSignificance.MINOR);
if (Task.this.getException() == null)
throw new CancellationException();
throw new AssertionError("When failed, exception cannot be null");
else
throw Task.this.getException();
}