This commit is contained in:
huanghongxun
2020-02-05 14:36:44 +08:00
parent 5a8446524a
commit bdee33456b
2 changed files with 78 additions and 3 deletions

View File

@@ -87,7 +87,12 @@ public final class AsyncTaskExecutor extends TaskExecutor {
@Override
public synchronized void cancel() {
// AsyncTaskExecutor does not support cancellation.
if (future == null) {
throw new IllegalStateException("Cannot cancel a not started TaskExecutor");
}
cancelled.set(true);
future.cancel(true);
}
private CompletableFuture<Exception> executeTasks(Collection<Task<?>> tasks) {

View File

@@ -26,6 +26,9 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -89,20 +92,87 @@ public class TaskTest {
}
@Test
public void testCancellation() {
Task<?> task = Task.runAsync(() -> Thread.sleep(200));
public void testCancellation() throws InterruptedException {
AtomicBoolean flag = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
Task<?> task = Task.runAsync(() -> {
latch.countDown();
Thread.sleep(200);
// default executor cannot interrupt task.
flag.getAndSet(true);
}).thenRunAsync(() -> {
System.out.println("No way!");
Thread.sleep(200);
Assert.fail("Cannot reach here");
});
TaskExecutor executor = task.executor();
Lang.thread(() -> {
try {
latch.await();
System.out.println("Main thread start waiting");
Thread.sleep(100);
System.out.println("Cancel");
executor.cancel();
} catch (InterruptedException e) {
Assume.assumeNoException(e);
}
});
System.out.println("Start");
Assert.assertFalse("Task should fail because we have cancelled it", executor.test());
Thread.sleep(3000);
Assert.assertNull("CancellationException should not be recorded.", executor.getException());
Assert.assertNull("CancellationException should not be recorded.", task.getException());
Assert.assertTrue("Thread.sleep cannot be interrupted", flag.get());
}
@Test
public void testCompletableFutureCancellation() throws Throwable {
AtomicBoolean flag = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<?> task = CompletableFuture.runAsync(() -> {
latch.countDown();
System.out.println("Sleep");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// default executor cannot interrupt task.
flag.getAndSet(true);
System.out.println("End");
}).thenComposeAsync(non -> {
System.out.println("compose");
return CompletableFuture.allOf(CompletableFuture.runAsync(() -> {
System.out.println("No way!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Assert.fail("Cannot reach here");
}));
});
Lang.thread(() -> {
try {
latch.await();
System.out.println("Main thread start waiting");
Thread.sleep(100);
System.out.println("Cancel");
task.cancel(true);
} catch (InterruptedException e) {
Assume.assumeNoException(e);
}
});
System.out.println("Start");
try {
task.get();
} catch (CancellationException e) {
System.out.println("Successfully cancelled");
}
//Assert.assertFalse("Task should fail because we have cancelled it", );
Thread.sleep(4000);
//Assert.assertNull("CancellationException should not be recorded.", executor.getException());
//Assert.assertTrue("Thread.sleep cannot be interrupted", flag.get());
}
public void testRejectedExecutionException() {