Hello Members,
I am trying to use splunk connect for kafka to get JSON messages from few topics be inserted into Splunk.(http://docs.splunk.com/Documentation/KafkaConnect/1.0.0/User/About)
I configured the connect-distributed.properties and I also configured the splunk as per the documentation to consume the messages. So far I am able to see the correct output of commands such as
http://localhost:8083/connector-plugins and http://localhost:8083/connectors (GET request to see all the connector names) and also able to send the task using post request as below:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics":"TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval" : "20",
"splunk.hec.ack.poll.threads" : "2",
"splunk.hec.event.timeout" : "120",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE"
}
}
For above post request I get following response back:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval": "20",
"splunk.hec.ack.poll.threads": "2",
"splunk.hec.event.timeout": "120",
"splunk.hec.raw": "false",
"splunk.hec.json.event.enrichment": "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE",
"name": "splunk-kafka-poc-11"
},
"tasks": []
}
But I am getting errors in the kafka console about this task which suggests there are sime issues.
My Kafka Topics have messages in JSON- So in connect-distributed.properties should I use jsoncenverter as bellow:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
OR
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Since I was not sure so I tried with both the values but I am getting errors in both scenarios.
But I saw in the documentation to use the stringConverter only. Could you please confirm which converter to be used?.
When I use Stringconverter I get following error:
[2018-05-25 16:34:27,360] ERROR [pool-1-thread-3] failed to send batch (com.splunk.kafka.connect.SplunkSinkTask)
com.splunk.hecclient.HecException: All channels have back pressure
at com.splunk.hecclient.LoadBalancer.send(LoadBalancer.java:62)
at com.splunk.hecclient.Hec.send(Hec.java:94)
at com.splunk.kafka.connect.SplunkSinkTask.send(SplunkSinkTask.java:189)
at com.splunk.kafka.connect.SplunkSinkTask.handleFailedBatches(SplunkSinkTask.java:124)
at com.splunk.kafka.connect.SplunkSinkTask.put(SplunkSinkTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2018-05-25 16:34:27,360] INFO [pool-1-thread-3] handled 2 failed batches with 702 events (com.splunk.kafka.connect.SplunkSinkTask)
When I use Jsonconverter I get following error
[2018-05-31 15:45:40,514] ERROR [pool-1-thread-24] Task splunk-kafka-poc-11-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:269)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:71)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Could you please advise how to make json messages make their way to splunk,
Please let me know if you need any other information.
I've used this config before:
connector.class=com.splunk.kafka.connect.SplunkSinkConnector
key.converter.schemas.enable=false
topics=payments
name=SplunkTest2
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
splunk.hec.token=92933e00-0adf-424d-ace4-96e07f47bba9
splunk.hec.uri=http://somehost.compute.amazonaws.com:8088
I highly recommend using Lenses.io to manage Kafka and Kafka Connect. See this little example vid tutorial: https://www.youtube.com/watch?v=cnKHhE8ApPA&
Hi,
looks like you found a solution:
https://github.com/splunk/kafka-connect-splunk/issues/151
@Ken Chen said:
Did you have valid CA signed certificate on your Splunk localhost ? If not, disable the cert validation ( hec.ssl.validate.cert to false) in the config and retry for the string converter scenario.
HTH,
Holger
Any update on this