I have a working state machine using MassTransit, however, I would like to persist the storage of state using EF Core. I followed the guide on MassTransit's website but it won't work, it does generate the table but does not populate it with data when the state machine is used.
This is my code in startup.cs:
var saga = new OrderStateMachine();
var repo = new InMemorySagaRepository<OrderStateData>();
services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);
services.AddMassTransit(config =>
{
config.AddSagaRepository<OrderStateData>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<OrderContext>();
});
config.AddSagaStateMachine<OrderStateMachine, OrderStateData>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
r.AddDbContext<DbContext, OrderContext>((provider,builder) =>
{
builder.UseMySql(connString, ServerVersion.AutoDetect(connString) , m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderContext)}");
});
});
});
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(EnvironmentVariables.RabbitMqConnectionString);
cfg.ReceiveEndpoint("Constants.OrderBus1", c =>
{
c.StateMachineSaga(saga, repo);
});
});
});
services.AddMassTransitHostedService();
And this is my EF core DBContext:
public class OrderContext : SagaDbContext
{
public DbSet<Order> Orders { get; set; }
public OrderContext(DbContextOptions<OrderContext> options)
: base(options)
{
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get { yield return new OrderStateMap(); }
}
}
And lastly my orderstatemap
public class OrderStateMap :
SagaClassMap<OrderStateData>
{
protected override void Configure(EntityTypeBuilder<OrderStateData> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
entity.Property(x => x.OrderCreationDateTime);
entity.Property(x => x.OrderId);
entity.Property(x => x.PackagesId);
entity.Property(x => x.PaymentMethod);
entity.Property(x => x.CheckoutSessionId);
entity.Property(x => x.OrderCancelDateTime);
entity.Property(x => x.OrderCollectedDateTime);
}
}
I think it has to do with me defining " var repo = new InMemorySagaRepository();" But I don't know what to replace that with.
EDIT:
After applying a fix I got the following error:
fail: Microsoft.EntityFrameworkCore.Database.Command[20102]
Failed executing DbCommand (270ms) [Parameters=[p0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
SELECT `o`.`CorrelationId`, `o`.`CheckoutSessionId`, `o`.`CurrentState`, `o`.`OrderCancelDateTime`, `o`.`OrderCollectedDateTime`, `o`.`OrderCreationDateTime`, `o`.`OrderId`, `o`.`PackagesId`, `o`.`PaymentMethod`
FROM (
SELECT * FROM dbo.OrderStateData WITH (UPDLOCK, ROWLOCK) WHERE CorrelationId = @p0
) AS `o`
LIMIT 2
fail: Microsoft.EntityFrameworkCore.Query[10100]
An exception occurred while iterating over the results of a query for context type 'OrderService.DAL.OrderContext'.
MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-117598
' at line 3
---> MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-
' at line 3c'
at MySqlConnector.Core.ServerSession.ReceiveReplyAsyncAwaited(ValueTask`1 task) in /_/src/MySqlConnector/Core/ServerSession.cs:line 815
at MySqlConnector.Core.ResultSet.ReadResultSetHeaderAsync(IOBehavior ioBehavior) in /_/src/MySqlConnector/Core/ResultSet.cs:line 49
at MySqlConnector.MySqlDataReader.ActivateResultSet(CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 131
at MySqlConnector.MySqlDataReader.CreateAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary`2 cachedProcedures, IMySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken
cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 436
at MySqlConnector.Core.CommandExecutor.ExecuteReaderAsync(IReadOnlyList`1 commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/CommandEx
ecutor.cs:line 60
at MySqlConnector.MySqlCommand.ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 310
at MySqlConnector.MySqlCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 304
at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.InitializeReaderAsync(DbContext _, Boolean result, CancellationToken cancellationToken)
at Pomelo.EntityFrameworkCore.MySql.Storage.Internal.MySqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.MoveNextAsync()
MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-117598
' at line 3
---> MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-
' at line 3c'
at MySqlConnector.Core.ServerSession.ReceiveReplyAsyncAwaited(ValueTask`1 task) in /_/src/MySqlConnector/Core/ServerSession.cs:line 815
at MySqlConnector.Core.ResultSet.ReadResultSetHeaderAsync(IOBehavior ioBehavior) in /_/src/MySqlConnector/Core/ResultSet.cs:line 49
at MySqlConnector.MySqlDataReader.ActivateResultSet(CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 131
at MySqlConnector.MySqlDataReader.CreateAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary`2 cachedProcedures, IMySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken
cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 436
at MySqlConnector.Core.CommandExecutor.ExecuteReaderAsync(IReadOnlyList`1 commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/CommandEx
ecutor.cs:line 60
at MySqlConnector.MySqlCommand.ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 310
at MySqlConnector.MySqlCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 304
at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.InitializeReaderAsync(DbContext _, Boolean result, CancellationToken cancellationToken)
at Pomelo.EntityFrameworkCore.MySql.Storage.Internal.MySqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.MoveNextAsync()
I tried adding a new db migration but that did not fix the issue
The correct version of your first configuration script is shown below. You had a lot of duplicate types defined, and we're configuring the receive endpoint correctly.
services.AddMassTransit(config =>
{
config.SetKebabCaseEndpointNameFormatter();
config.AddSagaStateMachine<OrderStateMachine, OrderStateData>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
// you will need to create the lock statement provider
r.LockStatementProvider = new MySqlLockStatementProvider();
r.AddDbContext<DbContext, OrderContext>((provider,builder) =>
{
builder.UseMySql(connString, ServerVersion.AutoDetect(connString) , m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderContext)}");
});
});
});
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(EnvironmentVariables.RabbitMqConnectionString);
cfg.ReceiveEndpoint("Constants.OrderBus1", c =>
{
c.ConfigureSaga<OrderStateData>(ctx);
});
});
});
services.AddMassTransitHostedService();
User contributions licensed under CC BY-SA 3.0