I have a helper class wrapped around Nest to interact with Elastic Search. This method takes a BlockingCollection and is used to perform bulk operations.
An IndexDocument looks like:
public class IndexDocument
{
public string ID { get; set; } // ID of the record in ElasticSearch
public IndexFile File { get; set; } // Underlying file that this record came from
public string Index { get; set; } // Index that should be posted to
public string Document { get; set; } // JSON of the actual record
}
A bulk operation is preferred because we can specify 1 or many indexes. I've merged all data across all indexes into a single operation for performance. The problem is Nest doesn't like raw JSON. Typically the low level client is what you want. Instead of reinventing the wheel of this bulk operation, I'd like to use it.
public static void Bulk(IEnumerable<IndexDocument> documents)
{
var request = new BulkAllRequest<IndexDocument>(documents);
Client.BulkAll(documents, func =>
{
return func.Index(null).BufferToBulk((descriptor, buffer) =>
{
foreach (var document in buffer)
{
descriptor.Index<Dictionary<string, object>>(operation =>
{
var product = JsonConvert.DeserializeObject<Dictionary<string, object>>(document.Document);
return operation.Index(document.Index)
.Document(product)
.Id(document.ID);
});
}
})
.BackOffTime("10s")
.Size(1) // if I can't get one to work....
.RefreshOnCompleted()
.MaxDegreeOfParallelism(10)
.BackOffRetries(2);
}).Wait(TimeSpan.FromMinutes(5), next =>
{
});
}
The problem I having is how do I convert RAW json into an object that Nest will accept? My raw JSON consists of:
These fields are represented as a Dictionary<string, object> today. This data is managed outside of our development group so I cannot modify it. I cannot create a concrete class because the fields are dynamic. I created a concrete class on top of dictionary but I got an error message:
Elasticsearch.Net.UnexpectedElasticsearchClientException
HResult=0x80131500
Message=GenericArguments[0], 'Newtonsoft.Json.Linq.JContainer', on 'Elasticsearch.Net.Utf8Json.Formatters.NonGenericListFormatter`1[T]' violates the constraint of type parameter 'T'.
Source=Nest
StackTrace:
at Nest.BlockingSubscribeExtensions.WaitOnObservable[TObservable,TObserve,TObserver](TObservable observable, TimeSpan maximumRunTime, Func`3 factory)
at Nest.BlockingSubscribeExtensions.Wait[T](BulkAllObservable`1 observable, TimeSpan maximumRunTime, Action`1 onNext)
at Ced.Search.Services.Indexing.Helpers.ElasticHelper.Bulk2(IEnumerable`1 documents) in
[STACK]
Inner Exception 1:
TypeLoadException: GenericArguments[0], 'Newtonsoft.Json.Linq.JContainer', on 'Elasticsearch.Net.Utf8Json.Formatters.NonGenericListFormatter`1[T]' violates the constraint of type parameter 'T'.
I tried to use a Newtonsoft.JObject but it posted a weird value that was invalid:
[Request]
{"index":{"_id":"cpn_0007473CPN46","_index":"trade_1"}}
[[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[[]]]] // WTF IS THIS!!!!
[Response]
{"took":1,"errors":true,"items":[{"index":{"_index":"trade_1","_type":"_doc","_id":"cpn_0007473CPN46","status":400,"error":{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}}}]}
That exception makes sense. It was sent junk.
What else can I try?
...I found the answer. I stumbled over a custom JSON serializer functionality described here:
https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/custom-serialization.html
The root of the problem is the built in serializer doesn't know how to transform a generic dictionary or even a JObject. So my original ElasticClient looked like:
private static ElasticClient Client = new ElasticClient(ServiceConfiguration.Instance.ElasticSearch.Uri);
Now it looks like:
private static ElasticClient Client = null;
static ElasticHelper()
{
var pool = new SingleNodeConnectionPool(ServiceConfiguration.Instance.ElasticSearch.Uri);
var connectionSettings = new ConnectionSettings(pool, sourceSerializer: (builtin, settings) =>
{
return new JsonNetSerializer(builtin, settings, () =>
{
return new JsonSerializerSettings { };
},
resolver => resolver.NamingStrategy = new DefaultNamingStrategy());
});
Client = new ElasticClient(connectionSettings);
}
Now my bulk operations are serializing as expected. Be sure to add the JsonNetSerializer nuget package to your project.
using Nest.JsonNetSerializer;
User contributions licensed under CC BY-SA 3.0