In VB, for error catching there is
Public Sub MySub()
On Error GoTo Errr
'do stuff
Errr:
'handle error
Resume Next
End Sub
which uses the magnificent Resume Next command.
In Java, you have a try catch block
try
{
//some code
}
catch (Exception e)
{
//handle error
}
which seems to be equivalent to the VB error catching, but specifically without the Resume Next option, so Java just quits the entire code block after the error, instead of trying to run the rest of the code after the error. Is there any way to get the power of Resume Next in Java? Thanks!
asked Apr 19, 2013 at 0:56
0
Just put the code that you want to run regardless of any error after the catch block.
try {
// stuff that could cause error
} catch(Exception e) {
// handle error
}
// do stuff
If you’re going to throw an exception from the catch block but you still want the «do stuff» code to run, you can put it in a finally block like this:
try {
// stuff that could cause error
} catch(Exception e) {
// throw exception here
} finally {
// do stuff that will run even when the exception is thrown
}
answered Apr 19, 2013 at 1:05
clavclav
4,16130 silver badges42 bronze badges
3
There is no equivalent in Java, to VB resume statements; in VB depending on the error case, you can choose to resume at a particular label within your code, in order to re-run the code after fixing the error, similar to a goto statement; this is not possible in java, except when you’re inside a loop, then you can use the continue to a defined label block.
answered Aug 18, 2014 at 16:39
We’ve already seen how we can handle an error in the observer. However, by that time, we are practically outside of the monad. There can be many kinds of errors and not every error is worth pushing all the way to the top. In standard Java, you can catch an exception at any level and decide if you want to handle it there or throw it further. Similarly in Rx, you can define behaviour based on errors without terminating the observable and forcing the observer to deal with everything.
Resume
onErrorReturn
The onErrorReturn
operator allows you to ignore an error and emit one final value before terminating (successfully this time).
In the next example, we will convert an error into a normal value to be printed:
Observable<String> values = Observable.create(o -> { o.onNext("Rx"); o.onNext("is"); o.onError(new Exception("adjective unknown")); }); values .onErrorReturn(e -> "Error: " + e.getMessage()) .subscribe(v -> System.out.println(v));
Output
Rx
is
Error: adjective unknown
onErrorResumeNext
The onErrorResumeNext
allows you to resume a failed sequence with another sequence. The error will not appear in the resulting observable.
public final Observable<T> onErrorResumeNext( Observable<? extends T> resumeSequence) public final Observable<T> onErrorResumeNext( Func1<java.lang.Throwable,? extends Observable<? extends T>> resumeFunction)
The first overload uses the same followup observable in every case. The second overload allows you to decide what the resume sequence should be based on the error that occurred.
Observable<Integer> values = Observable.create(o -> { o.onNext(1); o.onNext(2); o.onError(new Exception("Oops")); }); values .onErrorResumeNext(Observable.just(Integer.MAX_VALUE)) .subscribe(new PrintSubscriber("with onError: "));
Output
with onError: 1
with onError: 2
with onError: 2147483647
with onError: Completed
There’s nothing stopping the resumeSequence from failing as well. In fact, if you wanted to change the type of the error, you can return an observable that fails immediately. In standard Java, components may decide they can’t handle an error and that they should re-throw it. In such cases, it is common wrap a new exception around the original error, thus providing additional context. You can do the same in Rx:
.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))
Now the sequence still fails, but you’ve wrapped the original error in a new error.
onExceptionResumeNext
onExceptionResumeNext
only has one difference to onErrorResumeNext
: it only catches errors that are Exception
s.
Observable<String> values = Observable.create(o -> { o.onNext("Rx"); o.onNext("is"); //o.onError(new Throwable() {}); // this won't be caught o.onError(new Exception()); // this will be caught }); values .onExceptionResumeNext(Observable.just("hard")) .subscribe(v -> System.out.println(v));
Retry
If the error is non-deterministic, it may make sense to retry. retry
re-subscribes to the source and emits everything again from the start.
public final Observable<T> retry() public final Observable<T> retry(long count)
If the error doesn’t go away, retry()
will lock us in an infinite loop of retries. The second overload limits the number of retries. If errors persist and the sequence fails n times, retry(n)
will fail too. Let’s see this in an example
Random random = new Random(); Observable<Integer> values = Observable.create(o -> { o.onNext(random.nextInt() % 20); o.onNext(random.nextInt() % 20); o.onError(new Exception()); }); values .retry(1) .subscribe(v -> System.out.println(v));
Output
0
13
9
15
java.lang.Exception
Here we’ve specified that we want to retry once. Our observable fails after two values, then tries again, fails again. The second time it fails the exception is allowed pass through.
In this example, we have done something naughty: we have made our subscription stateful to demonstrate that the observable is restarted from the source: it produced different values the second time around. retry
does not cache any elements like replay
, nor would it make sense to do so. Retrying makes sense only if there are side effects, or if the observable is hot.
retryWhen
retry
will restart the subscription as soon as the failure happens. If we need more control over this, we can use retryWhen
.
public final Observable<T> retryWhen( Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
The argument to retryWhen
is a function that takes an observable and returns another. The input observable emits all the errors that retryWhen
encounters. The resulting observable signals when to retry:
- if it emits a value,
retryWhen
will retry, - if it terminates with error,
retryWhen
will emit the error and not retry. - if it terminates successfully,
retryWhen
will terminate successfully.
Note that the type of the signaling observable and the actual values emitted don’t matter. The values are discarded and the observable is only used for timing.
In the next example, we will construct a retrying policy where we wait 100ms before retrying.
Observable<Integer> source = Observable.create(o -> { o.onNext(1); o.onNext(2); o.onError(new Exception("Failed")); }); source.retryWhen((o) -> o .take(2) .delay(100, TimeUnit.MILLISECONDS)) .timeInterval() .subscribe( System.out::println, System.out::println);
Output
TimeInterval [intervalInMilliseconds=21, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=104, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=103, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
Our source observable emits 2 values and immediately fails. When that happens, the observable of failures inside retryWhen
emits the error. We delay that emission by 100ms and send it back to signal a retry. take(2)
guarantees that our signaling observable will terminate after we receive two errors. retryWhen
sees the termination and doesn’t retry after the second failures.
using
The using
operator is for creating observables from resources that need to be managed. It guarantees that your resources will be managed regardless of when and how subscriptions are terminated. If you were to just use create
, you would have to do the managing in the traditional Java paradigm and inject it into Rx. using
is a more natural way of managing resources in Rx.
public static final <T,Resource> Observable<T> using( Func0<Resource> resourceFactory, Func1<? super Resource,? extends Observable<? extends T>> observableFactory, Action1<? super Resource> disposeAction)
When a new subscription begins, resourceFactory
leases the necessary resource. observableFactory
uses that resource to produce items. When the resource is no longer needed, it is disposed of with the disposeAction
. The dispose action is executed regardless of the way the subscription terminates (successfully or with a failure).
In the next example, we pretend that a string
is a resource that needs managing.
Observable<Character> values = Observable.using( () -> { String resource = "MyResource"; System.out.println("Leased: " + resource); return resource; }, (resource) -> { return Observable.create(o -> { for (Character c : resource.toCharArray()) o.onNext(c); o.onCompleted(); }); }, (resource) -> System.out.println("Disposed: " + resource)); values .subscribe( v -> System.out.println(v), e -> System.out.println(e));
Output
Leased: MyResource
M
y
R
e
s
o
u
r
c
e
Disposed: MyResource
When we subscribe to values
, the resource factory function is called which returns "MyResource"
. That string is used to produce an observable which emits all of the characters in the string. Once the subscription ends, the resource is disposed of. A String
doesn’t need any more managing than what the garbage collector will do. Resources may actually need such managing, e.g., database connections, opened files etc.
It is important to note here that we are responsible for terminating the observable, just like we were when using the create
method. With create
, terminating is a matter of semantics. With using
, not terminating defeats the point of using it in the first place. Only upon termination the resources will be released. If we had not called o.onCompleted()
, the sequence would be assumed to be still active and needing its resources.
Continue reading
Previous | Next |
---|---|
Leaving the monad | Combining sequences |
This seems one of the hot searches for Reactor, at least when I type onErrorContinue in Google, onErrorResume would pop up beside it. Let me paste my testing code with some of my interpretations below.
Base Function
This is a simple function to multiple 5 consecutive numbers each by 2, then summing them up, with an exception thrown for i==2:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
obviously, the output:
input=1
input=2
Exception in thread "main" java.lang.ArithmeticException: / by zero
First, the code:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorResume(err -> {
log.info("onErrorResume");
return Flux.empty();
})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
Then, the output:
input=1
input=2
17:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResume
sum=2
No surprises. As stated in the doc, onErrorResume
replaces the Flux with what is returned, so nothing after and including 2 will be processed. The only thing worth mentioning is that the onErrorResume()
doesn’t have to be immediately after the error to catch it. This had me thinking because in the docs, only onErrorContinue()
had the word upstream highlighted but apparently it has some other meaning (as you see below).
Just onErrorContinue()
Code:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
and the output:
input=1
input=2
17:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26
Again, no surprises. onErrorContinue
would drop the error element 2, then continue with 3 to 5.
🤩 Our Amazing Sponsors 👇
onErrorResume() then onErrorContinue()
Now this is where it becomes interesting. Have a guess: would the onErrorResume()
take the error or onErrorContinue()
?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorResume(err -> {
log.info("onErrorResume");
return Flux.empty();
})
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
Now, the output:
input=1
input=2
17:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26
Unexpected or not? The onErrorContinue()
would actually take the error before orErrorResume()
could ever get their hands on it. This may look obvious when both error handlers are in the same function, but when you only have onErrorResume()
in your function and that some caller actually has onErrorContinue()
, it may not be obvious why your onErrorResume()
was not called at the first place.
Mimic onErrorContinue() using onErrorResume()
Some posts suggest that we get rid of onErrorContinue()
at all and just use onErrorResume()
in all scenarios. But the above already shows that they yield different results. So how is this done?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.flatMap(i -> Mono.just(i)
.map(j -> j == 2 ? j / 0 : j)
.map(j -> j * 2)
.onErrorResume(err -> {
System.out.println("onErrorResume");
return Mono.empty();
})
)
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
So the essence is to wrap the operations which might throw an error in a flatMap
or concatMap
, and use onErrorResume()
on it. This way, it yields the same result:
input=1
input=2
onErrorResume
input=3
input=4
input=5
sum=26
Mimic onErrorContinue() using onErrorResume() with downstream onErrorContinue()
Sometimes, onErrorContinue()
is put in the caller and you have no control over it. But you still want your onErrorResume()
. What shall you do?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.flatMap(i -> Mono.just(i)
.map(j -> j == 2 ? j / 0 : j)
.map(j -> j * 2)
.onErrorResume(err -> {
System.out.println("onErrorResume");
return Mono.empty();
})
.onErrorStop()
)
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
The secret is to add onErrorStop()
in the end of the onErrorResume()
block — this would block the onErrorContinue()
so that it would not take up the error before onErrorResume()
does. Try removing onErrorStop()
and you will see onErrorContinue()
pop up as before.
Hope this clears the void. Happy coding!
Extended Reading
If you want to know more, Here’s my new article about the limitations of onErrorContinue()
and onErrorResume()
Reading Time: 4 minutes
If errors and failures are passed to the right component, which can handle them as notifications, the application can become more fault-tolerant or resilient. So if we build our system to be event-driven, we can more easily achieve scalability and failure tolerance, and a scalable, decoupled, and error-proof application is fast and responsive to users.”
Nickolay Tsvetinov
Reactive in layman’s language says how quickly the client handles the streaming data sent by the server. According to the Java community, it refers to asynchronous I/O and non-blocking processes. It works on the event-driven system to achieve responsiveness to users.
Reactive Stream API is a product of collaboration between Kaazing, Netflix, Pivotal, Red Hat, Twitter, Typesafe, and many others. It provides a powerful mechanism to address many problem statements in event-driven systems. It provides complete abstraction so that developers can focus only on their business logic.
A quick walk-through of error handling approaches using Spring Boot.
1. Simply Catch the error (onErrorResume)
The following code has a Flux named “stringFlux” that emits “RunTimeException” after creating a flux of three elements. In case of error, control jumps to “onErrorResume” block of code analogous to catch in java exception handling. We are using StepVerifier to subscribe to the events and verify event flow.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void fluxErrorHandling(){ | |
Flux<String> stringFlux = Flux.just(«a»,»b»,»c») | |
.concatWith(Flux.error(new RuntimeException(«Exception Occurred»))) | |
.concatWith(Flux.just(«D»)) | |
.onErrorResume((e) -> { // on error this block gets executed – we are returning a flux on error value | |
System.out.println(e); | |
return Flux.just(«default»); | |
}); | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«default») | |
.expectError(RuntimeException.class) | |
.verify(); | |
} |
Execution of the above code snippet gives:-
java.lang.AssertionError: expectation “expectError(Class)” failed (expected: onError(RuntimeException); actual: onNext(default))
This happens because as soon as an exception occurs control jumps to onErrrorResume block where it prints the exception and returns a flux with default as a value. To correct above exception refer to the below-defined test case:-
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void fluxErrorHandling(){ | |
Flux<String> stringFlux = Flux.just(«a»,»b»,»c») | |
.concatWith(Flux.error(new RuntimeException(«Exception Occurred»))) | |
.concatWith(Flux.just(«D»)) | |
.onErrorResume((e) -> { // on error this block gets executed – we are returning a flux on error value | |
System.out.println(e); | |
return Flux.just(«default»); | |
}); | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«default») | |
.verifyComplete(); | |
} |
Event Sequence :-
15:12:52.127 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:12:52.135 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:12:52.137 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:12:52.137 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:12:52.138 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
java.lang.RuntimeException: Exception Occurred
15:12:52.139 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(default)
15:12:52.140 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
Note: We are using verifyComplete() instead of verify() in updated test case.
This is because the onComplete() event terminates the error sequence. When there is no onComplete() event, it simply means some error occurred and the event sequence did not complete. In the above example, since we are returning a flux in error handling code, the event stream completes with onComplete().
Now, let’s have a look at other ways of error handling in reactive streams.
2. Catch the error and return static value using onErrorReturn()
This is a classic technique where we detect the error and return a static value on an error as our handling mechanism.In Contrast to above technique, onErrorReturn() returns a static value instead of Flux.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void fluxErrorHandling_onErrorReturn(){ | |
Flux<String> stringFlux = Flux.just(«a»,»b»,»c») | |
.concatWith(Flux.error(new RuntimeException(«Exception Occurred»))) | |
.concatWith(Flux.just(«D»)) | |
.onErrorReturn(«default»); // here returning a simple string on any errors | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«default») | |
.verifyComplete(); | |
} |
Event Sequence :-
15:13:48.969 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:13:48.974 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:13:48.976 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
15:13:48.978 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(default)
15:13:48.983 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
3. Catch the error and translate it to a custom exception (onErrorMap)
Nothing fancy about the below piece of code. This is the most common methodology for handling errors inside the map(). The handling logic here is responsible for translating the error into meaningful business exceptions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void fluxErrorHandling_onErrorMap(){ | |
Flux<String> stringFlux = Flux.just(«a»,»b»,»c») | |
.concatWith(Flux.error(new RuntimeException(«Exception Occurred»))) | |
.concatWith(Flux.just(«D»)) | |
.onErrorMap((e) -> new CustomException(e)); // here returning a simple string on any errors | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext(«a»,»b»,»c») | |
.expectError(CustomException.class) | |
.verify(); | |
} |
Event Sequence is as follows:-
15:15:51.883 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
15:15:51.890 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(b)
15:15:51.892 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(c)
15:15:51.935 [main] ERROR reactor.Flux.OnErrorResume.1 - onError(CustomException: Exception Occurred)
15:15:51.937 [main] ERROR reactor.Flux.OnErrorResume.1 -
CustomException: Exception Occurred
4. Catch the error and retry the same stream for a specific number of times (retry)
Retry mechanism is required when you lose connection with the source emitting the data and you would like to retry establishing the connection to the same source or a different one.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void fluxErrorHandling_withRetry(){ | |
Flux<String> stringFlux = Flux.just(«a»,»b»,»c») | |
.concatWith(Flux.error(new RuntimeException(«Exception Occurred»))) | |
.concatWith(Flux.just(«D»)) | |
.onErrorMap((e) -> new CustomException(e)) | |
.retry(2); | |
// P.s. Retry produces same stream again | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«a»,»b»,»c») | |
.expectError(CustomException.class) | |
.verify(); | |
} |
Event Sequence displayed below clearly depicts 2 retries that happened with onNext Events in repetition.
Event Sequence :-
15:16:48.154 [main] INFO reactor.Flux.Retry.1 - onSubscribe(FluxRetry.RetrySubscriber)
15:16:48.160 [main] INFO reactor.Flux.Retry.1 - request(unbounded)
15:16:48.163 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.163 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.164 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.215 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(a)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(b)
15:16:48.216 [main] INFO reactor.Flux.Retry.1 - onNext(c)
15:16:48.217 [main] ERROR reactor.Flux.Retry.1 - onError(CustomException: Exception Occurred)
15:16:48.219 [main] ERROR reactor.Flux.Retry.1 -
CustomException: Exception Occurred
5. Catch the error using back off along with retry (retryBackoff)
Before attempting a retry, when you would like to wait for a specific duration.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void fluxErrorHandling_withRetryBackoff(){ | |
Flux<String> stringFlux = Flux.just(«a»,»b»,»c») | |
.concatWith(Flux.error(new RuntimeException(«Exception Occurred»))) | |
.concatWith(Flux.just(«D»)) | |
.onErrorMap((e) -> new CustomException(e)) | |
.retryBackoff(2, Duration.ofSeconds(5));// when you want to perform a backoff before retry | |
StepVerifier.create(stringFlux.log()) | |
.expectSubscription() | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«a»,»b»,»c») | |
.expectNext(«a»,»b»,»c») | |
.expectError(IllegalStateException.class) | |
.verify(); | |
} |
Event Sequence:-
Event Sequence shows log message and we can see error signal is in play.
15:18:11.551 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:18:11.630 [main] INFO reactor.Flux.RetryWhen.1 - onSubscribe(SerializedSubscriber)
15:18:11.636 [main] INFO reactor.Flux.RetryWhen.1 - request(unbounded)
15:18:11.650 [main] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:11.651 [main] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:11.651 [main] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:19.067 [parallel-1] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(a)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(b)
15:18:28.744 [parallel-2] INFO reactor.Flux.RetryWhen.1 - onNext(c)
15:18:28.750 [parallel-2] ERROR reactor.Flux.RetryWhen.1 - onError(java.lang.IllegalStateException: Retries exhausted: 2/2)
15:18:28.753 [parallel-2] ERROR reactor.Flux.RetryWhen.1 -
java.lang.IllegalStateException: Retries exhausted: 2/2
Observe the event stream above, you obtain IllegalStateException after all the retries are exhausted.
References
- My motivation for writing this blog comes from this course –https://www.udemy.com/course/build-reactive-restful-apis-using-spring-boot-webflux/learn/lecture/12962854#overview
- Operators
- Error Handling
- Catch
recover from an onError notification by continuing the sequence without error
The Catch operator intercepts an
There are several variants of the Catch operator, and a
In some ReactiveX implementations, there is an operator called something likeonError
notification from the source Observable and, instead of passing it through to any
observers, replaces it with some other item or sequence of items, potentially allowing
the resulting Observable to terminate normally or not to terminate at all.
variety of names used by different ReactiveX implementations to describe this operation,
as you can see in the sections below.
“OnErrorResumeNext” that behaves like a Catch
variant: specifically reacting to an onError
notification from the source
Observable. In others, there is an operator with that name that behaves more like a
Concat variant: performing the concatenation operation
regardless of whether the source Observable terminates normally or with an error. This is
unfortunate and confusing, but something we have to live with.
See Also
- Concat
- Retry
- Introduction to Rx: Catch
Language-Specific Information:
RxClojure catch*
RxClojure implements this operator as
You may replace the first function parameter (the predicate that evaluates the exception)
is equivalent to:
catch*
. This operator takes two arguments,
both of which are functions of your choosing that take the exception raised by
onError
as their single parameters. The first function is a predicate. If it
returns false
, catch*
passes the onError
notification
unchanged to its observers. If it returns true
, however, catch*
swallows the error, calls the second function (which returns an Observable), and passes along
the emissions and notifications from this new Observable to its observers.
with a class object representing a variety of exception. If you do this, catch*
will treat it as equivalent to predicate that performs an instance?
check to see
if the exception from the onError
notification is an instance of the class
object. In other words:
Sample Code
(->> my-observable
(catch* IllegalArgumentException
(fn [e] (rx/return 1)))
)
(->> my-observable
(catch* (fn [e] (-> instance? IllegalArgumentException e))
(fn [e] (rx/return 1)))
)
RxCpp
RxCpp does not implement the Catch operator.
RxGroovy onErrorResumeNext onErrorReturn onExceptionResumeNext
RxGroovy implements the Catch operator in the same way as
does RxJava. There are three distinct operators that provide this functionality:
onErrorReturn
- instructs an Observable to emit a particular item when it encounters an error, and then terminate normally
onErrorResumeNext
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error
onExceptionResumeNext
- instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
onErrorReturn
The onErrorReturn
method returns an Observable that mirrors the behavior of the
source Observable, unless that Observable invokes onError
in which case, rather
than propagating that error to the observer, onErrorReturn
will instead emit a
specified item and invoke the observer’s onCompleted
method, as shown in
the following sample code:
Sample Code
def myObservable = Observable.create({ aSubscriber ->
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Four');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError();
});
myObservable.onErrorReturn({ return('Blastoff!'); }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Four
Three
Two
One
Blastoff!
Sequence complete
onErrorReturn(Func1)
onErrorResumeNext
The onErrorResumeNext
method returns an Observable that mirrors the behavior of
the source Observable, unless that Observable invokes onError
in which case,
rather than propagating that error to the observer, onErrorResumeNext
will
instead begin mirroring a second, backup Observable, as shown in the following sample code:
Sample Code
def myObservable = Observable.create({ aSubscriber ->
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Three');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('Two');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('One');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onError();
});
def myFallback = Observable.create({ aSubscriber ->
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('0');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('1');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onNext('2');
if(false == aSubscriber.isUnsubscribed()) aSubscriber.onCompleted();
});
myObservable.onErrorResumeNext(myFallback).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Three
Two
One
0
1
2
Sequence complete
onErrorResumeNext(Func1)
onErrorResumeNext(Observable)
onExceptionResumeNext
Much like onErrorResumeNext
method, this returns an Observable that mirrors the
behavior of the source Observable, unless that Observable invokes onError
in
which case, if the Throwable passed to onError
is an Exception, rather than
propagating that Exception to the observer, onExceptionResumeNext
will instead
begin mirroring a second, backup Observable. If the Throwable is not an Exception, the
Observable returned by onExceptionResumeNext
will propagate it to its
observer’s onError
method and will not invoke its backup Observable.
onExceptionResumeNext(Observable)
RxJava 1․x onErrorResumeNext onErrorReturn onExceptionResumeNext
RxJava implements the Catch operator with three distinct
operators:
onErrorReturn
- instructs an Observable to emit a particular item when it encounters an error, and then terminate normally
onErrorResumeNext
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error
onExceptionResumeNext
- instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
onErrorReturn
The onErrorReturn
method returns an Observable that mirrors the behavior of the
source Observable, unless that Observable invokes onError
in which case, rather
than propagating that error to the observer, onErrorReturn
will instead emit a
specified item and invoke the observer’s onCompleted
method.
onErrorReturn(Func1)
onErrorResumeNext
The onErrorResumeNext
method returns an Observable that mirrors the behavior of
the source Observable, unless that Observable invokes onError
in which case,
rather than propagating that error to the observer, onErrorResumeNext
will
instead begin mirroring a second, backup Observable.
onErrorResumeNext(Func1)
onErrorResumeNext(Observable)
onExceptionResumeNext
Much like onErrorResumeNext
method, this returns an Observable that mirrors the
behavior of the source Observable, unless that Observable invokes onError
in
which case, if the Throwable passed to onError
is an Exception, rather than
propagating that Exception to the observer, onExceptionResumeNext
will instead
begin mirroring a second, backup Observable. If the Throwable is not an Exception, the
Observable returned by onExceptionResumeNext
will propagate it to its
observer’s onError
method and will not invoke its backup Observable.
onExceptionResumeNext(Observable)
RxJS catch onErrorResumeNext
RxJS implements the Catch operator with two distinct
operators:
catch
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error
onErrorResumeNext
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error or if the source Observable terminates normally
catch
catch
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
onErrorResumeNext
This implementation borrows the confusing nomenclature from Rx.NET, in which
onErrorResumeNext
switches to a back-up Observable both on an error and
on a normal, error-free termination of the source Observable.
onErrorResumeNext
is found in the following distributions:
rx.js
rx.compat.js
RxKotlin onErrorResumeNext onErrorReturn onExceptionResumeNext
RxKotlin implements the Catch operator in the same way as
does RxJava. There are three distinct operators that provide this functionality:
onErrorReturn
- instructs an Observable to emit a particular item when it encounters an error, and then terminate normally
onErrorResumeNext
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error
onExceptionResumeNext
- instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
onErrorReturn
The onErrorReturn
method returns an Observable that mirrors the behavior of the
source Observable, unless that Observable invokes onError
in which case, rather
than propagating that error to the observer, onErrorReturn
will instead emit a
specified item and invoke the observer’s onCompleted
method.
onErrorResumeNext
The onErrorResumeNext
method returns an Observable that mirrors the behavior of
the source Observable, unless that Observable invokes onError
in which case,
rather than propagating that error to the observer, onErrorResumeNext
will
instead begin mirroring a second, backup Observable.
onExceptionResumeNext
Much like onErrorResumeNext
method, this returns an Observable that mirrors the
behavior of the source Observable, unless that Observable invokes onError
in
which case, if the Throwable passed to onError
is an Exception, rather than
propagating that Exception to the observer, onExceptionResumeNext
will instead
begin mirroring a second, backup Observable. If the Throwable is not an Exception, the
Observable returned by onExceptionResumeNext
will propagate it to its
observer’s onError
method and will not invoke its backup Observable.
RxNET Catch OnErrorResumeNext
Rx.NET implements the Catch operator with two distinct
operators:
Catch
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error
OnErrorResumeNext
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error or if the source Observable terminates normally
Catch
The Catch
operator has a variant that allows you to specify which sort of
Exception you want to catch. If you use that variant of the operator, any other Exceptions
will be passed through to the observer as if the Catch
operator had not been
applied.
OnErrorResumeNext
This implementation introduces a confusing nomenclature, in which in spite of its name
OnErrorResumeNext
switches to a back-up Observable both on an error and
on a normal, error-free termination of the source Observable. It is therefore more like a
concatenation operator.
RxPHP catch
RxPHP implements this operator as catch
.
Continues an observable sequence that is terminated by an exception with the next observable sequence.
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/catch/catch.php $obs2 = RxObservable::of(42); $source = RxObservable::error(new Exception('Some error')) ->catch(function (Throwable $e, RxObservable $sourceObs) use ($obs2) { return $obs2; }); $subscription = $source->subscribe($stdoutObserver);
Next value: 42 Complete!
RxPY catch_exception on_error_resume_next
RxPY implements the Catch operator with two distinct
operators:
catch_exception
- instructs an Observable, if it encounters an error, to begin emitting items from a set of other Observables, one Observable at a time, until one of those Observables terminates successfully
on_error_resume_next
- instructs an Observable to concatenate items emitted by a set of other Observables, one Observable at a time, regardless of whether the source Observable or any subsequent Observable terminates with an error
catch_exception
You may pass catch_exception
a set of back-up Observables either as individual
function parameters or as a single array of Observables. If it encounters an
onError
notification from the source Observable, it will subscribe to and begin
mirroring the first of these back-up Observables. If this back-up Observable itself
issues an onError
notification, catch_exception
will swallow it
and switch over to the next back-up Observable. If any of these Observables issues an
onCompleted
notification, catch_exception
will pass this along
and will stop.
on_error_resume_next
You may pass on_error_resume_next
a set of back-up Observables either as
individual function parameters, as a single array of Observables, or as a factory function
that generates Observables. When the source Observable terminates, whether normally or with
an error, on_error_resume_next
will subscribe to and begin mirroring the first
of these back-up Observables, and then will recursively continue this concatenation process
for each additional Observable until there are no more Observables to mirror, at which time
it will pass on the onError
or onCompleted
notification from the
last of these Observables.
Rxrb on_error_resume_next rescue_error
Rx.rb implements the Catch operator with two distinct
operators:
rescue_error
- instructs an Observable to begin emitting items from another Observable, or from an Observable returned from an action, if it encounters an error
on_error_resume_next
- instructs an Observable to concatenate items emitted by another Observable to the sequence emitted by the source Observable, regardless of whether the source Observable terminates normally or with an error
rescue_error
You may pass rescue_error
either an Observable or a factory action that
generates an Observable.
on_error_resume_next
In Rx.rb, on_error_resume_next
inherits the misleading nomenclature from Rx.NET
in that it concatenates the second Observable sequence to the source sequence whether that
source sequence terminates normally or with an error.
RxScala onErrorFlatMap onErrorResumeNext onErrorReturn onExceptionResumeNext
Rx.rb implements the Catch operator with four distinct
operators:
onErrorFlatMap
- replaces all
onError
notifications from a misbehaving Observable into the emissions from a secondary Observable onErrorResumeNext
- instructs an Observable to begin emitting a second Observable sequence if it encounters an error
onErrorReturn
- instructs an Observable to emit a particular item when it encounters an error, and then terminate normally
onExceptionResumeNext
- instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
onErrorFlatMap
Because
Note that you should apply
Note that onErrorFlatMap
handles a special case: a source Observable that is noncompliant
with the Observable contract in such a way that it may interleave
onError
notifications with its emissions without terminating. This operator allows you to
replace those onError
notifications with the emissions of an Observable of your choosing
without unsubscribing from the source, so that any future items emitted from the source will
be passed along to observers as though the sequence had not been interrupted with an
onError
notification.
onErrorFlatMap
is designed to work with pathological source Observables
that do not terminate after issuing an error, it is mostly useful in debugging/testing
scenarios.
onErrorFlatMap
directly to the pathological source
Observable, and not to that Observable after it has been modified by additional operators,
as such operators may effectively renormalize the source Observable by unsubscribing from it
immediately after it issues an error. Above, for example, is an illustration showing how
onErrorFlatMap
will respond to two error-generating Observables that have been
merged by the Merge operator:
onErrorFlatMap
will not react to both errors generated by both
Observables, but only to the single error passed along by merge
.
onErrorResumeNext
The onErrorResumeNext
method returns an Observable that mirrors the behavior of
the source Observable, unless that Observable invokes onError
in which case,
rather than propagating that error to the observer, onErrorResumeNext
will
instead begin mirroring a second, backup Observable.
onErrorReturn
The onErrorReturn
method returns an Observable that mirrors the behavior of the
source Observable, unless that Observable invokes onError
in which case, rather
than propagating that error to the observer, onErrorReturn
will instead emit a
specified item and invoke the observer’s onCompleted
method.
onExceptionResumeNext
Much like onErrorResumeNext
method, this returns an Observable that mirrors the
behavior of the source Observable, unless that Observable invokes onError
in
which case, if the Throwable passed to onError
is an Exception, rather than
propagating that Exception to the observer, onExceptionResumeNext
will instead
begin mirroring a second, backup Observable. If the Throwable is not an Exception, the
Observable returned by onExceptionResumeNext
will propagate it to its
observer’s onError
method and will not invoke its backup Observable.
Best Java code snippets using reactor.core.publisher.Mono.onErrorResume (Showing top 20 results out of 666)
@Override public RouterFunctions.Builder onError(Predicate<? super Throwable> predicate, BiFunction<? super Throwable, ServerRequest, Mono<ServerResponse>> responseProvider) { Assert.notNull(predicate, "Predicate must not be null"); Assert.notNull(responseProvider, "ResponseProvider must not be null"); return filter((request, next) -> next.handle(request) .onErrorResume(predicate, t -> responseProvider.apply(t, request))); }
@Override public <T extends Throwable> RouterFunctions.Builder onError(Class<T> exceptionType, BiFunction<? super T, ServerRequest, Mono<ServerResponse>> responseProvider) { Assert.notNull(exceptionType, "ExceptionType must not be null"); Assert.notNull(responseProvider, "ResponseProvider must not be null"); return filter((request, next) -> next.handle(request) .onErrorResume(exceptionType, t -> responseProvider.apply(t, request))); }
@SuppressWarnings("unchecked") private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) { return (Mono<T>) response.bodyToMono(Void.class) .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex); }
@Nullable private String formatBody(@Nullable MediaType contentType, Mono<byte[]> body) { return body .map(bytes -> { if (contentType == null) { return bytes.length + " bytes of content (unknown content-type)."; } Charset charset = contentType.getCharset(); if (charset != null) { return new String(bytes, charset); } if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) { return new String(bytes, StandardCharsets.UTF_8); } return bytes.length + " bytes of content."; }) .defaultIfEmpty("No content") .onErrorResume(ex -> Mono.just("Failed to obtain content: " + ex.getMessage())) .block(this.timeout); }
@Override public Mono<MatchResult> matches(ServerWebExchange exchange) { return this.bearerTokenConverter.convert(exchange) .flatMap(this::nullAuthentication) .onErrorResume(e -> notMatch()); }
@Override public Mono<Void> handle(ServerWebExchange exchange) { Mono<Void> completion; try { completion = super.handle(exchange); } catch (Throwable ex) { completion = Mono.error(ex); } for (WebExceptionHandler handler : this.exceptionHandlers) { completion = completion.onErrorResume(ex -> handler.handle(exchange, ex)); } return completion; }
private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) { return getResultHandler(result).handleResult(exchange, result) .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult -> getResultHandler(exceptionResult).handleResult(exchange, exceptionResult))); }
@Override public Mono<Void> notify(InstanceEvent event) { return Flux.fromIterable(delegates).flatMap(d -> d.notify(event).onErrorResume(error -> { log.warn("Unexpected exception while triggering notifications. Notification might not be sent.", error); return Mono.empty(); })).then(); } }
private <T extends Publisher<?>> T handleBody(ClientResponse response, T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) { if (HttpStatus.resolve(response.rawStatusCode()) != null) { for (StatusHandler handler : this.statusHandlers) { if (handler.test(response.statusCode())) { HttpRequest request = this.requestSupplier.get(); Mono<? extends Throwable> exMono = handler.apply(response, request); exMono = exMono.flatMap(ex -> drainBody(response, ex)); exMono = exMono.onErrorResume(ex -> drainBody(response, ex)); return errorFunction.apply(exMono); } } return bodyPublisher; } else { return errorFunction.apply(createResponseException(response, this.requestSupplier.get())); } }
@Override public ListenableFuture<Void> shutdown() { if (this.stopping) { SettableListenableFuture<Void> future = new SettableListenableFuture<>(); future.set(null); return future; } this.stopping = true; Mono<Void> result; if (this.channelGroup != null) { result = FutureMono.from(this.channelGroup.close()); if (this.loopResources != null) { result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater()); } if (this.poolResources != null) { result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater()); } result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler()); } else { result = stopScheduler(); } return new MonoToListenableFutureAdapter<>(result); }
protected Mono<Instance> doUpdateStatus(Instance instance) { if (!instance.isRegistered()) { return Mono.empty(); } log.debug("Update status for {}", instance); return instanceWebClient.instance(instance) .get() .uri(Endpoint.HEALTH) .exchange() .log(log.getName(), Level.FINEST) .flatMap(this::convertStatusInfo) .doOnError(ex -> logError(instance, ex)) .onErrorResume(this::handleError) .map(instance::withStatusInfo); }
@Override public Mono<Void> handle(ServerWebExchange exchange) { Mono<Void> completion; try { completion = super.handle(exchange); } catch (Throwable ex) { completion = Mono.error(ex); } for (WebExceptionHandler handler : this.exceptionHandlers) { completion = completion.onErrorResume(ex -> handler.handle(exchange, ex)); } return completion; }
@Override public Mono<Void> doNotify(InstanceEvent event, Instance instance) { return delegate.notify(event).onErrorResume(error -> Mono.empty()).then(Mono.fromRunnable(() -> { if (shouldEndReminder(event)) { reminders.remove(event.getInstance()); } else if (shouldStartReminder(event)) { reminders.putIfAbsent(event.getInstance(), new Reminder(event)); } })); }
protected Mono<Void> recomputeSnapshot(InstanceId instanceId) { return this.getEventStore() .find(instanceId) .collectList() .map(events -> Instance.create(instanceId).apply(events)) .doOnNext(instance -> snapshots.put(instance.getId(), instance)) .then() .onErrorResume(ex2 -> { log.error( "Error while recomputing snapshot. Event history for instance {} may be wrong,", instanceId, ex2 ); return Mono.empty(); }); } }
private Mono<Void> authenticate(ServerWebExchange exchange, WebFilterChain chain, Authentication token) { WebFilterExchange webFilterExchange = new WebFilterExchange(exchange, chain); return this.authenticationManager.authenticate(token) .switchIfEmpty(Mono.defer(() -> Mono.error(new IllegalStateException("No provider found for " + token.getClass())))) .flatMap(authentication -> onAuthenticationSuccess(authentication, webFilterExchange)) .onErrorResume(AuthenticationException.class, e -> this.authenticationFailureHandler .onAuthenticationFailure(webFilterExchange, e)); }
protected Mono<Void> updateSnapshot(InstanceEvent event) { return Mono.<Void>fromRunnable(() -> snapshots.compute(event.getInstance(), (key, old) -> { Instance instance = old != null ? old : Instance.create(key); return instance.apply(event); })).onErrorResume(ex -> { log.warn( "Error while updating the snapshot with event {}. Recomputing instance snapshot from event history.", event, ex ); return recomputeSnapshot(event.getInstance()); }); }
@Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { return this.authorizationRequestResolver.resolve(exchange) .switchIfEmpty(chain.filter(exchange).then(Mono.empty())) .onErrorResume(ClientAuthorizationRequiredException.class, e -> { return this.requestCache.saveRequest(exchange) .then(this.authorizationRequestResolver.resolve(exchange, e.getClientRegistrationId())); }) .flatMap(clientRegistration -> sendRedirectForAuthorization(exchange, clientRegistration)); }
@Test public void writeWithError() throws Exception { TestServerHttpResponse response = new TestServerHttpResponse(); response.getHeaders().setContentLength(12); IllegalStateException error = new IllegalStateException("boo"); response.writeWith(Flux.error(error)).onErrorResume(ex -> Mono.empty()).block(); assertFalse(response.statusCodeWritten); assertFalse(response.headersWritten); assertFalse(response.cookiesWritten); assertFalse(response.getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH)); assertTrue(response.body.isEmpty()); }
@Override public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { HandlerMethod handlerMethod = (HandlerMethod) handler; Assert.state(this.methodResolver != null && this.modelInitializer != null, "Not initialized"); InitBinderBindingContext bindingContext = new InitBinderBindingContext( getWebBindingInitializer(), this.methodResolver.getInitBinderMethods(handlerMethod)); InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod); Function<Throwable, Mono<HandlerResult>> exceptionHandler = ex -> handleException(ex, handlerMethod, bindingContext, exchange); return this.modelInitializer .initModel(handlerMethod, bindingContext, exchange) .then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext))) .doOnNext(result -> result.setExceptionHandler(exceptionHandler)) .doOnNext(result -> bindingContext.saveModel()) .onErrorResume(exceptionHandler); }
@Override public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.forwardedHeaderTransformer != null) { request = this.forwardedHeaderTransformer.apply(request); } ServerWebExchange exchange = createExchange(request, response); LogFormatUtils.traceDebug(logger, traceOn -> exchange.getLogPrefix() + formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : "")); return getDelegate().handle(exchange) .doOnSuccess(aVoid -> logResponse(exchange)) .onErrorResume(ex -> handleUnresolvedError(exchange, ex)) .then(Mono.defer(response::setComplete)); }