I have a background task that is polling an SQL Server database every 200ms.
The code looks like this:
listener = await Task.Factory.StartNew(async () =>
{
try
{
while (true)
{
topToken.ThrowIfCancellationRequested();
try
{
using (var dbConnection = new SqlConnection(ConnectionString))
using (var command = new SqlCommand("marc.GetEvents", dbConnection))
{
await command.Connection.OpenAsync().ConfigureAwait(false);
command.CommandType = CommandType.StoredProcedure;
command.Parameters.AddWithValue("@fromId", lastEventId);
using (var reader = await command.ExecuteReaderAsync(topToken).ConfigureAwait(false))
{
int received = lastEventId;
while (await reader.ReadAsync(topToken).ConfigureAwait(false))
{
/// do stuff...
}
lastEventId = received;
}
}
await Task.Delay(PollIntervalMilliseconds, topToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
if (ex is SqlException && topToken.IsCancellationRequested)
{
throw new OperationCanceledException("Operation cancelled by user", ex);
}
logger.Warn(ex, $"Exception on polling Codeks db. Waiting {delayOnSqlError}ms..."); // this is hit
_OnReaderEvent.OnError(ex);
await Task.Delay(delayOnSqlError, topToken).ConfigureAwait(false); // probably not executed
}
}
}
catch (OperationCanceledException)
{
logger.Info("Listening task ended. Service is stopping?");
}
catch (Exception ex)
{
logger.Error(ex, "General exception"); // falling here
}
}, TaskCreationOptions.LongRunning).ConfigureAwait(false);
Today I got a report about the fact that this task ended prematurely. According to the logs, the first catch
set is hit and it is reporting an SQL Exception:
2018-08-01 17:42:08.6348|Warn|Exception on polling Codeks db. Waiting 5000ms... System.Data.SqlClient.SqlException (0x80131904): Transaction (Process ID 53) was deadlocked on lock | communication buffer resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
But instead of delaying, it is falling out from the loop immedialtely to the outer catch
with the very same exception.
2018-08-01 17:42:08.6488|Error|Jantar.CodeksConnector|General exception System.Data.SqlClient.SqlException (0x80131904): Transaction (Process ID 53) was deadlocked on lock | communication buffer resources with another process and has been chosen as the deadlock victim. Rerun the transaction. at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action
1 wrapCloseInAction) at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action
1 wrapCloseInAction) at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose) at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady) at System.Data.SqlClient.SqlDataReader.TryHasMoreRows(Boolean& moreRows) at System.Data.SqlClient.SqlDataReader.TryReadInternal(Boolean setTimeout, Boolean& more) at System.Data.SqlClient.SqlDataReader.<>c__DisplayClass189_0.b__0(Task t) at System.Data.SqlClient.SqlDataReader.InvokeRetryable[T](Func2 moreFunc, TaskCompletionSource
1 source, IDisposable objectToDispose) --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Jantar.CodeksConnector.<b__18_0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) at System.Reactive.Stubs.<>c.<.cctor>b__2_1(Exception ex) at System.Reactive.AnonymousSafeObserver1.OnError(Exception error) at System.Reactive.Linq.ObservableImpl.SelectMany
2..OnError(Exception error) at System.Reactive.Linq.ObservableImpl.Where1._.OnError(Exception error) at System.Reactive.Linq.ObservableImpl.AsObservable
1..OnError(Exception error) at System.Reactive.Observer1.OnError(Exception error) at System.Reactive.Subjects.Subject
1.OnError(Exception error) at Jantar.CodeksConnector.<b__18_0>d.MoveNext()
I am out of ideas...
[Update: 08.03]
@sellotape pointed me to the right direction. According to the updated second log entry, the stacktrace makes it clear that the exception is rethrown by the Subject<T>.onError(ex)
(which I have removed because it is a bug). It was a double bug, as there is no error subscriber. I was unaware that in this case the exception is rethrown, but only if there is any subscriber, and swallowed if there is none.
While this isn't a direct answer to your question, you've tagged it as "System.Reactive" so I thought I might show you (roughly) what an Rx solution to your code looks like. Keep in mind that I couldn't accurately provide code for /// do stuff...
so I made it up.
Here's the Rx:
IObservable<string> query =
from t in Observable.Interval(TimeSpan.FromMilliseconds(PollIntervalMilliseconds))
from x in Observable.Using(
() => new SqlConnection(ConnectionString),
dbConnection =>
Observable.Using(
() =>
{
var c = new SqlCommand("marc.GetEvents", dbConnection);
c.CommandType = CommandType.StoredProcedure;
c.Parameters.AddWithValue("@fromId", lastEventId);
return c;
},
command =>
from o in Observable.FromAsync(() => command.Connection.OpenAsync())
from reader in Observable.FromAsync(() => command.ExecuteReaderAsync(topToken))
let received = lastEventId
from r in Observable.FromAsync(() => reader.ReadAsync(topToken))
select reader.GetFieldValue<string>(0)))
select x;
User contributions licensed under CC BY-SA 3.0