Please login or register to access secure site features.

Note: By continuing to use DevConnect Program Services you agree to our latest Registered Member Terms.

Sign in using DevConnect ID

Forgot password?

Trouble logging in?

Submit a ticket for Registration Support.

I have an SSO ID

?
sign in

Don't have a DevConnect or SSO ID ?

Create a DevConnect account or join the program.

register now
^
Forum Index » Avaya Breeze » .NET application connecting to Kafka on Analytics   XML
 
Author Message
NICOLASH_A



Joined: 03/01/2019 04:20:37
Messages: 6
Offline

Hello Everybody,


We are developing a custom application using Confluent .NET library. This application is supposed to be a Kafka consumer subscribing on real-time agent states events. We have difficulties connecting to Kafka server on Analytics using TLS protocol.
Does anyone know which must be set on ConsumerConfig parameter be subscribing ?

var tConsumerConfig = new ConsumerConfig
{
BootstrapServers = pCApplicationParametersManager.PKafkaBroker + ":" +
pCApplicationParametersManager.PKafkaTLSPort.ToString(),
SecurityProtocol = SecurityProtocol.Ssl,
SslCertificateLocation = @"E:\Certificates\a_lab_kafka.cer.pem",
SslKeyLocation = @"E:\Certificates\a_lab_kafka_key.pem",
SslKeyPassword = @"myPassword",
GroupId = "groupid",
Debug = "security"
};


Do you have a .NET example to share ?

Thank you very much for your help.

Regards.
Shahin



dineshud.avaya.com



Joined: 11/03/2020 12:42:33
Messages: 9
Offline

We don't have a .net sample. Can you share what is the error you are seeing?

Thank You,
Dinesh

This message was edited 1 time. Last update was at 24/11/2020 04:01:00

NICOLASH_A



Joined: 03/01/2019 04:20:37
Messages: 6
Offline

Hello,


Thank you your reply.
Here is the exception

2020-11-25 13:43:27,422 [1] ERROR Exception Soruce => Confluent.Kafka
2020-11-25 13:43:27,805 [1] ERROR Exception Message => ssl.certificate.location failed: .\ssl\ssl_rsa.c:708: error:140DC009:SSL routines:SSL_CTX_use_certificate_chain_file:PEM lib
2020-11-25 13:43:28,365 [1] ERROR Exception StackTrace at Confluent.Kafka.Impl.SafeKafkaHandle.Create(RdKafkaType type, IntPtr config, IClient owner)
at Confluent.Kafka.Consumer`2..ctor(ConsumerBuilder`2 builder)
at Confluent.Kafka.ConsumerBuilder`2.Build()
at AgentStatePublisher.CKafkaConsumer.SubscribeAsync(String PTopic, Action`1 message) in E:\Development\AgentStatePublisher\CKafkaConsumer.cs:line 60
at AgentStatePublisher.CKafkaManager.Start() in E:\Development\AgentStatePublisher\CKafkaManager.cs:line 79
NICOLASH_A



Joined: 03/01/2019 04:20:37
Messages: 6
Offline

Also, it would be great to have an example even it is not in .NET.
Regards.
VincentTDSE



Joined: 09/04/2019 11:15:54
Messages: 1
Offline

Hi NICOLASH_A

Coincidentally, we are also currently developing a .NET interface to Avaya Oceana Analytics (Kafka).

I threw a small code sample together below. As everything is buried in classes etc. in our current application, this might not work 'out of the box' but I think it should get you started.

Here's our config setup.
 var l_KafkaConfig = 
                    (await File.ReadAllLinesAsync("MyConfig.properties"))
                    .Where(line => !line.StartsWith("#") && !string.IsNullOrWhiteSpace(line) )
                .ToDictionary(
                    line => line.Substring(0, line.IndexOf('=')),
                    line => line.Substring(line.IndexOf('=') + 1));

 ClientConfig l_ClientConfig = new ClientConfig(l_KafkaConfig);

Note: as working with certificate verification can be a hassle to get right, you can disable it during development by setting:
l_ClientConfig.EnableSslCertificateVerification = false;
Don't use it in production though.
Our config file
bootstrap.servers=<your_ingress_server>:32090
enable.auto.commit=true
session.timeout.ms=15000
client.id=1
group.id=2
ssl.ca.location=./ssl/root-cert.pem
security.protocol=ssl

Note:
The following config doesn't work for the .NET library.
  • ssl.truststore.location=./ssl/truststore.jks

  • ssl.truststore.password=Testing

  • As truststores are a Java thing.
    Note2: I haven't had the chance to look into every line in detail, as we are still in development. but this seems to work.
    Note3: Place the root-cert.pem in a newly created 'ssl' folder in the root of the application executable.

    To subscribe to a measurestream:
    const string TOPIC_SUBSCRIPTION_REQUEST = "realtimesubscriptionrequest";
    int l_SendKey = 0;
    var l_Producer = new ProducerBuilder<int, string>(a_Config).Build();
    string l_JsonMessage = "<My subscription message>"; // Have a look at the API documentation linked below for examples here.
    CancellationTokenSource l_CTS = new CancellationTokenSource();
    var l_ProdResponse = await l_Producer.ProduceAsync(TOPIC_SUBSCRIPTION_REQUEST,
                         new Message<int, string> { Key = l_SendKey++, Value = l_JSonMessage },
                            l_CTS.Token
                    );
    
    


    And consume:

    const string TOPIC_SUBSCRIPTION_RESPONSE = "realtimesubscriptionresponse";
    var l_Consumer = new ConsumerBuilder<Ignore, string>(l_Config).Build();
    
    l_Consumer.Subscribe(TOPIC_SUBSCRIPTION_RESPONSE);
    CancellationTokenSource l_CTS2 = new CancellationTokenSource ();
    var l_ConsResult = a_Consumer.Consume(l_CTS2.Token);
    


    Note. We've setup the consumer, and producer in different threads, making sure the subscriber is subscribed before we send a message.

    Note2: Avaya also provided a sample client using Java. I noticed many of the Java configs don't transfer 1-on-1 over to the Confluent.Kafka .NET library

    API documentation & Sample client

    For completeness: the API document also describes a method on how to extract the needed root-perm.pem file.
    NICOLASH_A



    Joined: 03/01/2019 04:20:37
    Messages: 6
    Offline

    Hi Vincent,



    Thank you very much for your response.

    We can now connect to Kafka and produce message on realtimesubscriptionrequest topic.

    Kafka responds back by a failure message on realtimesubscriptionresponse telling that the user cannot be authenticated.


    Here is the JSON message we publish on realtimesubscriptionrequest

    var configProducer = new ProducerConfig
    {
    BootstrapServers = brokerList,
    ClientId = Dns.GetHostName(),
    StatisticsIntervalMs = 5000,
    SecurityProtocol = SecurityProtocol.Ssl,
    SslCaLocation = @"E:\Certificates\AC-RACINE-NF-rootCA.pem",
    Debug = "security"
    };



    var realtimesubscriptionrequestTopic = new Dictionary<string, string>
    {
    {"userName","agentstatepublisher"},
    {"password","Mypassword"},
    {"subscriptionRequestId","SUB_1232"},
    {"request","SUBSCRIBE"},
    {"measuresStream","AGENTMEASURES"},
    {"version","4.1"}
    };


    Here is the message we get from Kafka on realtimesubscriptionresponse topic.


    {"subscriptionRequestId":"SUB_1232","userName":"agentstatepublisher","measuresStream":"AGENTMEASURES","topic":null,"result":"FAILURE","reason":"The subscription request 'user' could not be authenticated","version":"4.1","subscribedToStreams":[]},"key":null,"keyMeta":null,"valueMeta":{},"valueDecoder":"Json","keyDecoder":null}



    The subscription request 'user' could not be authenticated

    However, our agentstatepublisher user is declared in ACM as Manager and Analytics and Oceana supervisor. The user are associated to agent groups.

    Any idea.

    Thank you very much in advance.





     
     
    Go to: