Security

Read messages from Active MQ Topic

ManishaAgrawal
Explorer

Hi,

I am trying to configure JMS Modular App to read messages from ActiveMQ Topic. MQ runs locally on the machine and there is a java code that uses jndi and pushes messages into topic called MyTopic. MQ is working fine, each time I run the code, message count in Topic increases (verified on ActiveMQ Web Console).

Problem is that , I am still unable to index the messages in Splunk.
Please let me know where I am wrong.

Configuration on the UI is as below :

Initialisation Mode : jndi
JMS Connection Factory JNDI Name : connectionFactory
JNDI Initial Context Factory Name : org.apache.activemq.jndi.ActiveMQInitialContextFactory
JNDI Provider URL : tcp://localhost:61616
(Also tried with file:/home/user/MQJNDI/) jndi.properties file is placed at /home/user/MQJNDI/

Index message property values , Index message property values and Index message property values are checked.

Sourcetype : mqTopic (manual)

After saving the changes, I push messages again in the Topic , but nothing gets indexed.

jndi.properties is as below:

START SNIPPET: jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

use the following property to configure the default connector

java.naming.provider.url = tcp://localhost:61616

use the following property to specify the JNDI name the connection factory

should appear as.

connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

register some queues in JNDI using the form

queue.[jndiName] = [physicalName]

queue.MyQueue = MyQueue

register some topics in JNDI using the form

topic.[jndiName] = [physicalName]

topic.MyTopic = MyTopic

END SNIPPET: jndi

Java code that writes messages to Topic
Arguments : MyTopic 100

package com.humana.splunk.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A simple polymorphic JMS producer which can work with Queues or Topics which
* uses JNDI to lookup the JMS connection factory and destination
*
*
*/
public final class SimpleProducer {

private static final Logger LOG = LoggerFactory.getLogger(SimpleProducer.class);

private SimpleProducer() {
}

/**
 * @param args the destination name to send to and optionally, the number of
 *                messages to send
 */
public static void main(String[] args) {
    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;
    Destination destination = null;
    MessageProducer producer = null;
    String destinationName = null;
    final int numMsgs;

    if ((args.length < 1) || (args.length > 2)) {
        LOG.info("Usage: java SimpleProducer <destination-name> [<number-of-messages>]");
        System.out.println("args : " + args[0]);
        System.exit(1);
    }
    destinationName = args[0];
    LOG.info("Destination name is " + destinationName);
    if (args.length == 2) {
        numMsgs = (new Integer(args[1])).intValue();
    } else {
        numMsgs = 1;
    }

    /*
     * Create a JNDI API InitialContext object
     */
    try {
        jndiContext = new InitialContext();
    } catch (NamingException e) {
       LOG.info("Could not create JNDI API context: " + e.toString());
        System.exit(1);
    }

    /*
     * Look up connection factory and destination.
     */
    try {
        connectionFactory = (ConnectionFactory)jndiContext.lookup("topicConnectionFactory");
        destination = (Destination)jndiContext.lookup(destinationName);
    } catch (NamingException e) {
       LOG.info("JNDI API lookup failed: " + e);
        System.exit(1);
    }

    /*
     * Create connection. Create session from connection; false means
     * session is not transacted. Create sender and text message. Send
     * messages, varying text slightly. Send end-of-messages message.
     * Finally, close connection.
     */
    try {
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();
        for (int i = 0; i < numMsgs; i++) {
            message.setText("This is message " + (i + 1));
           LOG.info("Sending message: " + message.getText());
            producer.send(message);
        }

        /*
         * Send a non-text control message indicating end of messages.
         */
        producer.send(session.createMessage());
    } catch (JMSException e) {
        LOG.info("Exception occurred: " + e);
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
            }
        }
    }
}

}

Could you please help me identify/resolve the issue.

Thanks in advance.

Tags (3)
0 Karma
1 Solution

Damien_Dallimor
Ultra Champion

Here is an example stanza that works in my ActiveMQ environment :

[jms://topic/dynamicTopics/splunktopic]
durable = 1
host = some-host
index = main
index_message_header = 1
index_message_properties = 1
init_mode = jndi
jms_connection_factory_name = ConnectionFactory
jndi_initialcontext_factory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
jndi_provider_url = tcp://ubuntu-personal:61616
sourcetype = jms
disabled = 1
browse_mode = stats
browse_queue_only = 0
strip_newlines = 0

Furthermore , do you have any error messages ? Search "index=_internal ExecProcessor error jms.py"

View solution in original post

0 Karma

Damien_Dallimor
Ultra Champion

Here is an example stanza that works in my ActiveMQ environment :

[jms://topic/dynamicTopics/splunktopic]
durable = 1
host = some-host
index = main
index_message_header = 1
index_message_properties = 1
init_mode = jndi
jms_connection_factory_name = ConnectionFactory
jndi_initialcontext_factory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
jndi_provider_url = tcp://ubuntu-personal:61616
sourcetype = jms
disabled = 1
browse_mode = stats
browse_queue_only = 0
strip_newlines = 0

Furthermore , do you have any error messages ? Search "index=_internal ExecProcessor error jms.py"

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...