Save entities in batch to Database

0

I am creating a web API that stores log messages in a database. The requirements specify that my API needs to be able to store 900 messages per second, in parallel. I have created the following simple repository method to insert entities:

public async Task<int> InsertListAsync(IEnumerable<LogMessage> logMessages)
{
    await _context.LogMessages.AddRangeAsync(logMessages);
    return await _context.SaveChangesAsync();
}

This is my controller code:

public async Task<IActionResult> LogMessage([FromBody] IEnumerable<LogMessageDTO> logMessageModels)
{
    try
    {
        var logMessages = logMessageModels.Select(x => LogMessageDTOMapper.Map(x));
        await _repo.InsertListAsync(logMessages);
        return new OkResult();
    }
    catch (Exception e)
    {
        _logger.LogError(1000,e,"Error while processing LogMessages");
        return BadRequest($"Error while processing LogMessages:{e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

This test works fine for a relatively small amount of clients sending requests at the same time (max 90). However, I have written the following integration test (the GetStringContentForPostRequest function basically creates an amount of randomly generated LogMessages, in this case, 1):

[Fact]
public void Test_MultipleRequests()
{
    // Arrange
    var client = _factory.CreateClient();
    //Act
    var responses = new List<Task<HttpResponseMessage>>();
    for (int i = 0; i < 900; i++)
    {

        responses.Add(client.PostAsync("/logs", GetStringContentForPostRequest(1)));
    }
    Task.WaitAll(responses.ToArray());
    // Assert
    foreach (var item in responses)
    {
        item.Result.EnsureSuccessStatusCode(); // Status Code 200-299
    }
}

This test sends 900 requests at once, each containing one LogMessage. This test fails however, with error message:

System.InvalidOperationException: An exception has been raised that is likely due to a transient failure.
 ---> Npgsql.PostgresException (0x80004005): 53300: sorry, too many clients already

One solution could be to higher the amount of allowed connections on my Postgres database server, but is there an easy way to first collect all received log messages over a period of time and save them to the database all at once?

c#
postgresql
asp.net-core-mvc
ef-core-3.0
asked on Stack Overflow May 3, 2020 by wserr • edited May 7, 2020 by wserr

1 Answer

0

I solved it by using a ConcurrentQueue as a buffer for my LogMessages. Instead of directly saving them to the database in my controller, I added them to abuffer, that is injected as a singleton into my API.

public class Buffer: IBuffer
{
    private ConcurrentQueue<LogMessageDTO> _logMessageDTOs;
    public Buffer()
    {
        _logMessageDTOs = new ConcurrentQueue<LogMessageDTO>();
    }
    public ConcurrentQueue<LogMessageDTO> LogMessageDTOs()
    {
        return _logMessageDTOs;
    }

}

So my controller action now looks like this:

public async Task<IActionResult> LogMessage([FromBody] IEnumerable<LogMessageDTO> logMessageModels)
{
    try
    {
        await Task.Run(() =>
        {
            foreach (var item in logMessageModels)
            {
                _buffer.LogMessageDTOs().Enqueue(item);
            }
        });
        return new OkResult();
    }
    catch (Exception e)
    {
        _logger.LogError(1000, e, "Error while processing LogMessages");
        return BadRequest($"Error while processing LogMessages:{e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

Which produces faster response times for the consumer calling this API.

This ConcurrentQueue is then emptied by a TimedHostService, which I read more about on this Microsoft docs page. What this timed host service essentially does, is emptying the ConcurrentQueue, and saving the items to the DB. This is the DoWork method of my TimedHostService:

private async void DoWork(object state)
{
    _logger.LogInformation("Timed background service is working");
    var logMessageDTOs = _buffer.LogMessageDTOs();
    using (var scope = _services.CreateScope())
    {
        var repository =
            scope.ServiceProvider
                .GetRequiredService<ILogMessageRepository>();
        var mappedLogMessages = new List<LogMessage>();
        while (logMessageDTOs.TryDequeue(out LogMessageDTO logMessageDTO))
        {
            mappedLogMessages.Add(LogMessageDTOMapper.Map(logMessageDTO,_logMessagePrefix));
        }
        await repository.InsertListAsync(mappedLogMessages);
    }
    _logger.LogInformation("Timed background service has stopped working");
}

This results in far fewer connections needed to the DB, as not every controller action instantiates a new connection to the database.

answered on Stack Overflow May 7, 2020 by wserr

User contributions licensed under CC BY-SA 3.0