How can i persist MassTransit state data in entity framework?

0

I have a working state machine using MassTransit, however, I would like to persist the storage of state using EF Core. I followed the guide on MassTransit's website but it won't work, it does generate the table but does not populate it with data when the state machine is used.

This is my code in startup.cs:

var saga = new OrderStateMachine();

var repo = new InMemorySagaRepository<OrderStateData>();

services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);

services.AddMassTransit(config =>
{
    
    config.AddSagaRepository<OrderStateData>()
        .EntityFrameworkRepository(r =>
        {
            r.ExistingDbContext<OrderContext>();
        });
    config.AddSagaStateMachine<OrderStateMachine, OrderStateData>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

            r.AddDbContext<DbContext, OrderContext>((provider,builder) =>
            {
                builder.UseMySql(connString, ServerVersion.AutoDetect(connString) , m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(OrderContext)}");
                });
            });
        });
  
    config.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host(EnvironmentVariables.RabbitMqConnectionString);

        cfg.ReceiveEndpoint("Constants.OrderBus1", c =>
        {
            c.StateMachineSaga(saga, repo);
           
        });
    });
   
});
services.AddMassTransitHostedService();

And this is my EF core DBContext:

public class OrderContext : SagaDbContext
{
    public DbSet<Order> Orders { get; set; }

    public OrderContext(DbContextOptions<OrderContext> options)
        : base(options)
    {

    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
       
    }
    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get { yield return new OrderStateMap(); }
    }
    
}

And lastly my orderstatemap

 public class OrderStateMap :
    SagaClassMap<OrderStateData>
    {
        protected override void Configure(EntityTypeBuilder<OrderStateData> entity, ModelBuilder model)
        {
            entity.Property(x => x.CurrentState).HasMaxLength(64);
            entity.Property(x => x.OrderCreationDateTime);
            entity.Property(x => x.OrderId);
            entity.Property(x => x.PackagesId);
            entity.Property(x => x.PaymentMethod);
            entity.Property(x => x.CheckoutSessionId);
            entity.Property(x => x.OrderCancelDateTime);
            entity.Property(x => x.OrderCollectedDateTime);


        }
    }

I think it has to do with me defining " var repo = new InMemorySagaRepository();" But I don't know what to replace that with.

EDIT:

After applying a fix I got the following error:

fail: Microsoft.EntityFrameworkCore.Database.Command[20102]
      Failed executing DbCommand (270ms) [Parameters=[p0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT `o`.`CorrelationId`, `o`.`CheckoutSessionId`, `o`.`CurrentState`, `o`.`OrderCancelDateTime`, `o`.`OrderCollectedDateTime`, `o`.`OrderCreationDateTime`, `o`.`OrderId`, `o`.`PackagesId`, `o`.`PaymentMethod`
      FROM (
          SELECT * FROM dbo.OrderStateData WITH (UPDLOCK, ROWLOCK) WHERE CorrelationId = @p0
      ) AS `o`
      LIMIT 2
fail: Microsoft.EntityFrameworkCore.Query[10100]
      An exception occurred while iterating over the results of a query for context type 'OrderService.DAL.OrderContext'.
      MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-117598
' at line 3
       ---> MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-
' at line 3c'
         at MySqlConnector.Core.ServerSession.ReceiveReplyAsyncAwaited(ValueTask`1 task) in /_/src/MySqlConnector/Core/ServerSession.cs:line 815
         at MySqlConnector.Core.ResultSet.ReadResultSetHeaderAsync(IOBehavior ioBehavior) in /_/src/MySqlConnector/Core/ResultSet.cs:line 49
         at MySqlConnector.MySqlDataReader.ActivateResultSet(CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 131
         at MySqlConnector.MySqlDataReader.CreateAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary`2 cachedProcedures, IMySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken
cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 436
         at MySqlConnector.Core.CommandExecutor.ExecuteReaderAsync(IReadOnlyList`1 commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/CommandEx
ecutor.cs:line 60
         at MySqlConnector.MySqlCommand.ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 310
         at MySqlConnector.MySqlCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 304
         at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
         at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
         at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.InitializeReaderAsync(DbContext _, Boolean result, CancellationToken cancellationToken)
         at Pomelo.EntityFrameworkCore.MySql.Storage.Internal.MySqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)
         at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.MoveNextAsync()
      MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-117598
' at line 3
       ---> MySqlConnector.MySqlException (0x80004005): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(UPDLOCK, ROWLOCK) WHERE CorrelationId = '1198e061-d2d4-463f-9db6-
' at line 3c'
         at MySqlConnector.Core.ServerSession.ReceiveReplyAsyncAwaited(ValueTask`1 task) in /_/src/MySqlConnector/Core/ServerSession.cs:line 815
         at MySqlConnector.Core.ResultSet.ReadResultSetHeaderAsync(IOBehavior ioBehavior) in /_/src/MySqlConnector/Core/ResultSet.cs:line 49
         at MySqlConnector.MySqlDataReader.ActivateResultSet(CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 131
         at MySqlConnector.MySqlDataReader.CreateAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary`2 cachedProcedures, IMySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken
cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 436
         at MySqlConnector.Core.CommandExecutor.ExecuteReaderAsync(IReadOnlyList`1 commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/CommandEx
ecutor.cs:line 60
         at MySqlConnector.MySqlCommand.ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 310
         at MySqlConnector.MySqlCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 304
         at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
         at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
         at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.InitializeReaderAsync(DbContext _, Boolean result, CancellationToken cancellationToken)
         at Pomelo.EntityFrameworkCore.MySql.Storage.Internal.MySqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)
         at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.MoveNextAsync()

I tried adding a new db migration but that did not fix the issue

c#
mysql
entity-framework
masstransit
asked on Stack Overflow Jan 27, 2021 by Joep Van Diessen • edited Jan 28, 2021 by Joep Van Diessen

1 Answer

0

The correct version of your first configuration script is shown below. You had a lot of duplicate types defined, and we're configuring the receive endpoint correctly.

services.AddMassTransit(config =>
{
    config.SetKebabCaseEndpointNameFormatter();
    
    config.AddSagaStateMachine<OrderStateMachine, OrderStateData>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic;

            // you will need to create the lock statement provider
            r.LockStatementProvider = new MySqlLockStatementProvider();

            r.AddDbContext<DbContext, OrderContext>((provider,builder) =>
            {
                builder.UseMySql(connString, ServerVersion.AutoDetect(connString) , m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(OrderContext)}");
                });
            });
        });
  
    config.UsingRabbitMq((ctx, cfg) =>
    {
        cfg.Host(EnvironmentVariables.RabbitMqConnectionString);

        cfg.ReceiveEndpoint("Constants.OrderBus1", c =>
        {
            c.ConfigureSaga<OrderStateData>(ctx);
        });
    });
   
});
services.AddMassTransitHostedService();
answered on Stack Overflow Jan 28, 2021 by Chris Patterson • edited Jan 28, 2021 by Chris Patterson

User contributions licensed under CC BY-SA 3.0