Hi NICOLASH_A
Coincidentally, we are also currently developing a .NET interface to Avaya Oceana Analytics (Kafka).
I threw a small code sample together below. As everything is buried in classes etc. in our current application, this might not work 'out of the box' but I think it should get you started.
Here's our config setup.
var l_KafkaConfig =
(await File.ReadAllLinesAsync("MyConfig.properties"))
.Where(line => !line.StartsWith("#") && !string.IsNullOrWhiteSpace(line) )
.ToDictionary(
line => line.Substring(0, line.IndexOf('=')),
line => line.Substring(line.IndexOf('=') + 1));
ClientConfig l_ClientConfig = new ClientConfig(l_KafkaConfig);
Note: as working with certificate verification can be a hassle to get right, you can disable it during development by setting:
l_ClientConfig.EnableSslCertificateVerification = false;
Don't use it in production though.
Our config file
bootstrap.servers=<your_ingress_server>:32090
enable.auto.commit=true
session.timeout.ms=15000
client.id=1
group.id=2
ssl.ca.location=./ssl/root-cert.pem
security.protocol=ssl
Note:
The following config doesn't work for the .NET library.
ssl.truststore.location=./ssl/truststore.jks
ssl.truststore.password=Testing
As truststores are a Java thing.
Note2: I haven't had the chance to look into every line in detail, as we are still in development. but this seems to work.
Note3: Place the root-cert.pem in a newly created 'ssl' folder in the root of the application executable.
To subscribe to a measurestream:
const string TOPIC_SUBSCRIPTION_REQUEST = "realtimesubscriptionrequest";
int l_SendKey = 0;
var l_Producer = new ProducerBuilder<int, string>(a_Config).Build();
string l_JsonMessage = "<My subscription message>"; // Have a look at the API documentation linked below for examples here.
CancellationTokenSource l_CTS = new CancellationTokenSource();
var l_ProdResponse = await l_Producer.ProduceAsync(TOPIC_SUBSCRIPTION_REQUEST,
new Message<int, string> { Key = l_SendKey++, Value = l_JSonMessage },
l_CTS.Token
);
And consume:
const string TOPIC_SUBSCRIPTION_RESPONSE = "realtimesubscriptionresponse";
var l_Consumer = new ConsumerBuilder<Ignore, string>(l_Config).Build();
l_Consumer.Subscribe(TOPIC_SUBSCRIPTION_RESPONSE);
CancellationTokenSource l_CTS2 = new CancellationTokenSource ();
var l_ConsResult = a_Consumer.Consume(l_CTS2.Token);
Note. We've setup the consumer, and producer in different threads, making sure the subscriber is subscribed before we send a message.
Note2: Avaya also provided a sample client using Java. I noticed many of the Java configs don't transfer 1-on-1 over to the Confluent.Kafka .NET library
API documentation & Sample client
For completeness: the API document also describes a method on how to extract the needed root-perm.pem file.