Splunk Search

Multiple correlation of different eventtypes.

NicolayCSPI
Engager

Hello everybody,

I am in the process of building a use case, which consists of 5 real-time alerts. In order to make the logic simpler, cleaner and more readable, I have created 4 eventtypes (EventA, EventB, EventC and EevntD), all belong to the same sourcetype and represent the 4 type of events that the scoped processes (the ones that we want to monitor) can generate.

There are 5 scenarios that must be alerted in real time:
(1) A process generates EventA, EventB and EventC within a period of 30 seconds.
(2) A process generates EventA, EventB and EventD within a period of 30 seconds.
(3) A process generates EventA, EventC and EventD within a period of 30 seconds.
(4) A process generates EventB, EventC and EventD within a period of 30 seconds.
(5) A process generates EventA, EventB, EventC and EventD within a period of 30 seconds.
The order of occurrence is not important. All the eventtypes must have the same process identifier (ProcessID).

I have created some logic for that but is failing. For instance, the search that I have written for the last and more important scenario is the following.
eventtype=EventA OR eventtype=EventB | transaction ProcessID| append [search eventtype=EventC] | transaction ProcessID | append [search eventtype=EventD] | transaction ProcessID

This search works if the process generates only the 4 eventtypes, but fails if more than one event of each eventtype is generated. For instance, if several EventA and several EventB are generated by the same process, this search stacks all of them and produces a result joining all of them. I would like to know if there is another way to correlate this situation and\or how can I get rid of the redundant events.

For the scenarios 1 to 4, I need to make sure in each scenario that the not included event (e.g. Scenario 1: EventD) is not generated.

I would really appreciate any kind of support. Thank you very much in advance.

Kind regards,
Nicolay

0 Karma
1 Solution

DalJeanis
SplunkTrust
SplunkTrust

All of these scenarios can be simplified to "if at least three different ones of the four event types are generated, alert me".
Once you get it to that simple statement, you basically just have to use stats to roll together all the events with values(eventtype) as eventtype and see if there are three or more different values. The trick is to figure out how to roll 30 seconds, and only 30 seconds, together.

Try this -

eventtype=EventA OR eventtype=EventB OR Eventtype=EventC OR Eventtype=EventD
| rename COMMENT as "The above gets all the relevant events"

| rename COMMENT as "bin them at 30 seconds and spread them across the next 30.  If three occur within 30 seconds, they must all end up with the same bin one way or another."
| bin _time as time1 span=30s
| eval time1=mvappend(time1,time1+30)
| mvexpand time1

| rename COMMENT as "stats them by time bin and process"
| rename COMMENT as "we keep the entire lists of times and eventtypes for further review"
| eval timedisp=strftime(_time,"%Y-%m-%d %H:%M:%S.%3Q")
| eval timesevents=timedisp."=>".eventtype
| stats min(_time) as mintime, max(_time) as maxtime, range(_time) as timerange,
    values(timeevents) as timesevents, values(eventtype) as eventtype by ProcessID time1
| where mvcount(eventtype)>2 
| eval timerange=maxtime-mintime

The above isn't perfect, but it will cheaply eliminate vast swathes of stuff you don't want. This may occasionally result in multiple records for a single ProcessID. Test the following to see which one you prefer...

| rename COMMENT as "this one throws away the later record, since the earlier already matched"
| dedup ProcessID eventtype

or

| rename COMMENT as "this one rolls ALL records for the same process together, no matter how far apart"
| stats min(mintime) as mintime, max(maxtime) as maxtime, values(time1) as time1,
    values(timesevents) as timesevents, values(eventtype) as eventtype by ProcessID
| eval timerange=maxtime-mintime

Given your business requirements, I didn't see a principled way to be absolutely sure that we got every positive case if we eliminated potentially negative ones where the different events occurred within, say, 45-60 seconds. It can probably be done, perhaps by taking that last stats version of the dedup code, mvexpanding the timesevents into separate events, then sorting, setting a flag for each eventtype and using streamstats with timewindow=30s to mark every record that happens within 30 seconds after each eventtype.

However, that's a lot of fiddly nonsense, since your 30 seconds is not a hard sciencey number anyway, it's a human-selected suspiciously round number.

I'm sure you can use the basic code above and it will meet your real business need admirably with less CPU. After you've given it a test fly, you can change the span to 10 or 15 or 20 if you get too many false positives or if the mood strikes you. Warning - Make sure the number divides evenly into 60 or 600 or 3600 or you will regret it.

View solution in original post

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

All of these scenarios can be simplified to "if at least three different ones of the four event types are generated, alert me".
Once you get it to that simple statement, you basically just have to use stats to roll together all the events with values(eventtype) as eventtype and see if there are three or more different values. The trick is to figure out how to roll 30 seconds, and only 30 seconds, together.

Try this -

eventtype=EventA OR eventtype=EventB OR Eventtype=EventC OR Eventtype=EventD
| rename COMMENT as "The above gets all the relevant events"

| rename COMMENT as "bin them at 30 seconds and spread them across the next 30.  If three occur within 30 seconds, they must all end up with the same bin one way or another."
| bin _time as time1 span=30s
| eval time1=mvappend(time1,time1+30)
| mvexpand time1

| rename COMMENT as "stats them by time bin and process"
| rename COMMENT as "we keep the entire lists of times and eventtypes for further review"
| eval timedisp=strftime(_time,"%Y-%m-%d %H:%M:%S.%3Q")
| eval timesevents=timedisp."=>".eventtype
| stats min(_time) as mintime, max(_time) as maxtime, range(_time) as timerange,
    values(timeevents) as timesevents, values(eventtype) as eventtype by ProcessID time1
| where mvcount(eventtype)>2 
| eval timerange=maxtime-mintime

The above isn't perfect, but it will cheaply eliminate vast swathes of stuff you don't want. This may occasionally result in multiple records for a single ProcessID. Test the following to see which one you prefer...

| rename COMMENT as "this one throws away the later record, since the earlier already matched"
| dedup ProcessID eventtype

or

| rename COMMENT as "this one rolls ALL records for the same process together, no matter how far apart"
| stats min(mintime) as mintime, max(maxtime) as maxtime, values(time1) as time1,
    values(timesevents) as timesevents, values(eventtype) as eventtype by ProcessID
| eval timerange=maxtime-mintime

Given your business requirements, I didn't see a principled way to be absolutely sure that we got every positive case if we eliminated potentially negative ones where the different events occurred within, say, 45-60 seconds. It can probably be done, perhaps by taking that last stats version of the dedup code, mvexpanding the timesevents into separate events, then sorting, setting a flag for each eventtype and using streamstats with timewindow=30s to mark every record that happens within 30 seconds after each eventtype.

However, that's a lot of fiddly nonsense, since your 30 seconds is not a hard sciencey number anyway, it's a human-selected suspiciously round number.

I'm sure you can use the basic code above and it will meet your real business need admirably with less CPU. After you've given it a test fly, you can change the span to 10 or 15 or 20 if you get too many false positives or if the mood strikes you. Warning - Make sure the number divides evenly into 60 or 600 or 3600 or you will regret it.

0 Karma

NicolayCSPI
Engager

Many thanks DalJeanis.
I am currently testing your proposal. I will come back to you as soon as I have some results.
Indeed the timeframe was selected base on the one picked for the same use case in a different platform. The three/four events are generated in less than 10 seconds, but considering possible delays or small amendments in the received time due to the ingestion into Splunk and making sure that the whole 10 seconds are covered, a timeframe of 30 seconds was taken. It's like a good compromise and it worked in the other platform.
Thanks a lot for guiding me in a different direction.
Kind regards,
Nicolay

DalJeanis
SplunkTrust
SplunkTrust

Must have lost my reply.
@NicolayCSPI - Great! Should work fine for you, and you can try times from 30s down to about 10s and see which ones are more efficient.

If they usually are within 10 seconds, then you can also try adding 10-12 seconds and then bin by 30s, which should result in about 70% of the hits being rolled into a single bin. Test because YMMV.

0 Karma
Get Updates on the Splunk Community!

Routing logs with Splunk OTel Collector for Kubernetes

The Splunk Distribution of the OpenTelemetry (OTel) Collector is a product that provides a way to ingest ...

Welcome to the Splunk Community!

(view in My Videos) We're so glad you're here! The Splunk Community is place to connect, learn, give back, and ...

Tech Talk | Elevating Digital Service Excellence: The Synergy of Splunk RUM & APM

Elevating Digital Service Excellence: The Synergy of Real User Monitoring and Application Performance ...