Author Message
NICOLASH_A
Joined: Jan 3, 2019
Messages: 6
Offline
Hello Everybody,


We are developing a custom application using Confluent .NET library. This application is supposed to be a Kafka consumer subscribing on real-time agent states events. We have difficulties connecting to Kafka server on Analytics using TLS protocol.
Does anyone know which must be set on ConsumerConfig parameter be subscribing ?

var tConsumerConfig = new ConsumerConfig
{
BootstrapServers = pCApplicationParametersManager.PKafkaBroker + ":" +
pCApplicationParametersManager.PKafkaTLSPort.ToString(),
SecurityProtocol = SecurityProtocol.Ssl,
SslCertificateLocation = @"E:\Certificates\a_lab_kafka.cer.pem",
SslKeyLocation = @"E:\Certificates\a_lab_kafka_key.pem",
SslKeyPassword = @"myPassword",
GroupId = "groupid",
Debug = "security"
};


Do you have a .NET example to share ?

Thank you very much for your help.

Regards.
Shahin



Dinesh012024
Joined: Dec 18, 2015
Messages: 25
Offline
We don't have a .net sample. Can you share what is the error you are seeing?

Thank You,
Dinesh
NICOLASH_A
Joined: Jan 3, 2019
Messages: 6
Offline
Hello,


Thank you your reply.
Here is the exception

2020-11-25 13:43:27,422 [1] ERROR Exception Soruce => Confluent.Kafka
2020-11-25 13:43:27,805 [1] ERROR Exception Message => ssl.certificate.location failed: .\ssl\ssl_rsa.c:708: error:140DC009:SSL routines:SSL_CTX_use_certificate_chain_file:PEM lib
2020-11-25 13:43:28,365 [1] ERROR Exception StackTrace at Confluent.Kafka.Impl.SafeKafkaHandle.Create(RdKafkaType type, IntPtr config, IClient owner)
at Confluent.Kafka.Consumer`2..ctor(ConsumerBuilder`2 builder)
at Confluent.Kafka.ConsumerBuilder`2.Build()
at AgentStatePublisher.CKafkaConsumer.SubscribeAsync(String PTopic, Action`1 message) in E:\Development\AgentStatePublisher\CKafkaConsumer.cs:line 60
at AgentStatePublisher.CKafkaManager.Start() in E:\Development\AgentStatePublisher\CKafkaManager.cs:line 79
NICOLASH_A
Joined: Jan 3, 2019
Messages: 6
Offline
Also, it would be great to have an example even it is not in .NET.
Regards.
VincentTDSE
Joined: Apr 9, 2019
Messages: 1
Offline
Hi NICOLASH_A

Coincidentally, we are also currently developing a .NET interface to Avaya Oceana Analytics (Kafka).

I threw a small code sample together below. As everything is buried in classes etc. in our current application, this might not work 'out of the box' but I think it should get you started.

Here's our config setup.

var l_KafkaConfig =
(await File.ReadAllLinesAsync("MyConfig.properties"))
.Where(line => !line.StartsWith("#") && !string.IsNullOrWhiteSpace(line) )
.ToDictionary(
line => line.Substring(0, line.IndexOf('=')),
line => line.Substring(line.IndexOf('=') + 1));

ClientConfig l_ClientConfig = new ClientConfig(l_KafkaConfig);

Note: as working with certificate verification can be a hassle to get right, you can disable it during development by setting:
l_ClientConfig.EnableSslCertificateVerification = false;
Don't use it in production though.
Our config file

bootstrap.servers=<your_ingress_server>:32090
enable.auto.commit=true
session.timeout.ms=15000
client.id=1
group.id=2
ssl.ca.location=./ssl/root-cert.pem
security.protocol=ssl

Note:
The following config doesn't work for the .NET library.
  • ssl.truststore.location=./ssl/truststore.jks

  • ssl.truststore.password=Testing

  • As truststores are a Java thing.
    Note2: I haven't had the chance to look into every line in detail, as we are still in development. but this seems to work.
    Note3: Place the root-cert.pem in a newly created 'ssl' folder in the root of the application executable.

    To subscribe to a measurestream:

    const string TOPIC_SUBSCRIPTION_REQUEST = "realtimesubscriptionrequest";
    int l_SendKey = 0;
    var l_Producer = new ProducerBuilder<int, string>(a_Config).Build();
    string l_JsonMessage = "<My subscription message>"; // Have a look at the API documentation linked below for examples here.
    CancellationTokenSource l_CTS = new CancellationTokenSource();
    var l_ProdResponse = await l_Producer.ProduceAsync(TOPIC_SUBSCRIPTION_REQUEST,
    new Message<int, string> { Key = l_SendKey++, Value = l_JSonMessage },
    l_CTS.Token
    );



    And consume:


    const string TOPIC_SUBSCRIPTION_RESPONSE = "realtimesubscriptionresponse";
    var l_Consumer = new ConsumerBuilder<Ignore, string>(l_Config).Build();

    l_Consumer.Subscribe(TOPIC_SUBSCRIPTION_RESPONSE);
    CancellationTokenSource l_CTS2 = new CancellationTokenSource ();
    var l_ConsResult = a_Consumer.Consume(l_CTS2.Token);


    Note. We've setup the consumer, and producer in different threads, making sure the subscriber is subscribed before we send a message.

    Note2: Avaya also provided a sample client using Java. I noticed many of the Java configs don't transfer 1-on-1 over to the Confluent.Kafka .NET library

    API documentation & Sample client

    For completeness: the API document also describes a method on how to extract the needed root-perm.pem file.
    NICOLASH_A
    Joined: Jan 3, 2019
    Messages: 6
    Offline
    Hi Vincent,



    Thank you very much for your response.

    We can now connect to Kafka and produce message on realtimesubscriptionrequest topic.

    Kafka responds back by a failure message on realtimesubscriptionresponse telling that the user cannot be authenticated.


    Here is the JSON message we publish on realtimesubscriptionrequest

    var configProducer = new ProducerConfig
    {
    BootstrapServers = brokerList,
    ClientId = Dns.GetHostName(),
    StatisticsIntervalMs = 5000,
    SecurityProtocol = SecurityProtocol.Ssl,
    SslCaLocation = @"E:\Certificates\AC-RACINE-NF-rootCA.pem",
    Debug = "security"
    };



    var realtimesubscriptionrequestTopic = new Dictionary<string, string>
    {
    {"userName","agentstatepublisher"},
    {"password","Mypassword"},
    {"subscriptionRequestId","SUB_1232"},
    {"request","SUBSCRIBE"},
    {"measuresStream","AGENTMEASURES"},
    {"version","4.1"}
    };


    Here is the message we get from Kafka on realtimesubscriptionresponse topic.


    {"subscriptionRequestId":"SUB_1232","userName":"agentstatepublisher","measuresStream":"AGENTMEASURES","topic":null,"result":"FAILURE","reason":"The subscription request 'user' could not be authenticated","version":"4.1","subscribedToStreams":[]},"key":null,"keyMeta":null,"valueMeta":{},"valueDecoder":"Json","keyDecoder":null}



    The subscription request 'user' could not be authenticated

    However, our agentstatepublisher user is declared in ACM as Manager and Analytics and Oceana supervisor. The user are associated to agent groups.

    Any idea.

    Thank you very much in advance.





    Go to:   
    Mobile view