This repository was archived by the owner on Jun 16, 2021. It is now read-only.
File tree Expand file tree Collapse file tree 2 files changed +20
-12
lines changed
tests/OpenMessage.Tests/Pipelines Expand file tree Collapse file tree 2 files changed +20
-12
lines changed Original file line number Diff line number Diff line change @@ -78,14 +78,22 @@ public async Task<List<SqsMessage<T>>> ConsumeAsync(CancellationToken cancellati
78
78
if ( _acknowledgementAction is null )
79
79
Throw . Exception ( "Acknowledgement action cannot be null for SQS message" ) ;
80
80
81
- result . Add ( new SqsMessage < T > ( _acknowledgementAction )
81
+ try
82
+ {
83
+ result . Add ( new SqsMessage < T > ( _acknowledgementAction )
84
+ {
85
+ Id = message . MessageId ,
86
+ Properties = properties ,
87
+ ReceiptHandle = message . ReceiptHandle ,
88
+ QueueUrl = _currentConsumerOptions . QueueUrl ,
89
+ Value = _deserializationProvider . From < T > ( message . Body , contentType , messageType ?? string . Empty )
90
+ } ) ;
91
+ }
92
+ catch ( Exception e )
82
93
{
83
- Id = message . MessageId ,
84
- Properties = properties ,
85
- ReceiptHandle = message . ReceiptHandle ,
86
- QueueUrl = _currentConsumerOptions . QueueUrl ,
87
- Value = _deserializationProvider . From < T > ( message . Body , contentType , messageType ?? string . Empty )
88
- } ) ;
94
+ // Swallow deserialization exception to prevent blocking the pipeline and processing of subsequent messages.
95
+ _logger . LogError ( e , $ "Error deserializing message body. { e . Message } . { nameof ( message . MessageId ) } :{ message . MessageId } . { nameof ( message . MD5OfBody ) } :{ message . MD5OfBody } ") ;
96
+ }
89
97
}
90
98
91
99
return result ;
Original file line number Diff line number Diff line change @@ -31,9 +31,9 @@ await Task.WhenAll(Enumerable.Range(0, _batchSize - 1)
31
31
32
32
Assert . Equal ( 1 , _history . Count ) ;
33
33
34
- Assert . Equal ( _batchSize - 1 , _history . Single ( )
35
- . Count ) ;
36
- Assert . True ( stopwatch . Elapsed >= _timeout ) ;
34
+ Assert . Equal ( _batchSize - 1 , _history . Single ( ) . Count ) ;
35
+
36
+ Assert . True ( stopwatch . Elapsed . Add ( TimeSpan . FromMilliseconds ( 5 ) ) >= _timeout ) ;
37
37
}
38
38
39
39
[ Fact ]
@@ -49,8 +49,8 @@ await Task.WhenAll(Enumerable.Range(0, _batchSize)
49
49
50
50
Assert . Equal ( 1 , _history . Count ) ;
51
51
52
- Assert . Equal ( _batchSize , _history . Single ( )
53
- . Count ) ;
52
+ Assert . Equal ( _batchSize , _history . Single ( ) . Count ) ;
53
+
54
54
Assert . True ( stopwatch . Elapsed < _timeout ) ;
55
55
}
56
56
You can’t perform that action at this time.
0 commit comments