Splunk Search

Tracking Field Changes in Events

PavelP
Motivator

Hello,

I'm looking of your insights to pinpoint changes in fields over time. Events structured with timestamp, ID, and various fields. Seeking advice on constructing a dynamic timeline to identify altered values and corresponding fields. Example events below:

 

10:20:30 25/Jan/2024 id=1 a=1534 b=253 c=384 ...
10:20:56 25/Jan/2024 id=1 a=1534 b=253 c=385 ...
10:20:56 25/Jan/2024 id=2 a="something" b=253 c=385 ...
10:21:35 25/Jan/2024 id=2 a="something" b=253 c=385 ...
10:22:56 25/Jan/2024 id=2 a="xyz" b="-" c=385 ...

Desired result format:

10:20:56 25/Jan/2024 id=1 changed field "c"
10:22:56 25/Jan/2024 id=2 changed field "a", changed field "b"

My pseudo SPL to find changed events:

... | streamstats reset_on_change=true dc(*) AS * by id | foreach * [ ??? ]

With hundreds of fields per event, seeking efficient method - considering a combination of streamstats, foreach, transaction or stats.
Insights appreciated.

Labels (3)
0 Karma
1 Solution

yuanliu
SplunkTrust
SplunkTrust

Because your desired result is an aggregation, stats is the tool of choice.

 

| stats max(_time) as _time values(*) as * by id
| foreach *
    [eval changed = mvappend(changed, if(mvcount(<<FIELD>>) > 1, "changed field \"<<FIELD>>\"", null()))]
| table _time changed
| eval changed = mvjoin(changed, ", ")

 

Your sample events give

_timechanged
2024-01-25 10:20:56changed field "c"
2024-01-25 10:22:56changed field "a", changed field "b"

Here is an emulation you can play with and compare with real data

 

| makeresults
| eval data =split("10:20:30 25/Jan/2024 id=1 a=1534 b=253 c=384 ...
10:20:56 25/Jan/2024 id=1 a=1534 b=253 c=385 ...
10:20:56 25/Jan/2024 id=2 a=something b=253 c=385 ...
10:21:35 25/Jan/2024 id=2 a=something b=253 c=385 ...
10:22:56 25/Jan/2024 id=2 a=xyz b=- c=385 ...", "
")
| mvexpand data
| rename data as _raw
| extract
| rex "(?<_time>\S+ \S+)"
| eval _time = strptime(_time, "%H:%M:%S %d/%b/%Y")
``` data emulation above ```

 

View solution in original post

yuanliu
SplunkTrust
SplunkTrust

Because your desired result is an aggregation, stats is the tool of choice.

 

| stats max(_time) as _time values(*) as * by id
| foreach *
    [eval changed = mvappend(changed, if(mvcount(<<FIELD>>) > 1, "changed field \"<<FIELD>>\"", null()))]
| table _time changed
| eval changed = mvjoin(changed, ", ")

 

Your sample events give

_timechanged
2024-01-25 10:20:56changed field "c"
2024-01-25 10:22:56changed field "a", changed field "b"

Here is an emulation you can play with and compare with real data

 

| makeresults
| eval data =split("10:20:30 25/Jan/2024 id=1 a=1534 b=253 c=384 ...
10:20:56 25/Jan/2024 id=1 a=1534 b=253 c=385 ...
10:20:56 25/Jan/2024 id=2 a=something b=253 c=385 ...
10:21:35 25/Jan/2024 id=2 a=something b=253 c=385 ...
10:22:56 25/Jan/2024 id=2 a=xyz b=- c=385 ...", "
")
| mvexpand data
| rename data as _raw
| extract
| rex "(?<_time>\S+ \S+)"
| eval _time = strptime(_time, "%H:%M:%S %d/%b/%Y")
``` data emulation above ```

 

PickleRick
SplunkTrust
SplunkTrust

Sorry, but it doesn't work.

| makeresults
| eval data =split("10:20:30 25/Jan/2024 id=1 a=1534 b=253 c=384 ...
10:20:56 25/Jan/2024 id=1 a=1534 b=253 c=385 ...
10:20:56 25/Jan/2024 id=2 a=something b=253 c=385 ...
10:21:35 25/Jan/2024 id=2 a=something b=253 c=385 ...
10:21:36 25/Jan/2024 id=2 a=something2 b=11 c=12 ...
10:22:56 25/Jan/2024 id=2 a=xyz b=- c=385 ...", "
")
| mvexpand data
| rename data as _raw
| extract
| rex "(?<_time>\S+ \S+)"
| eval _time = strptime(_time, "%H:%M:%S %d/%b/%Y")
| stats max(_time) as _time values(*) as * by id
| foreach *
[eval changed = mvappend(changed, if(mvcount(<<FIELD>>) > 1, "changed field \"<<FIELD>>\"", null()))]
| table _time changed
| eval changed = mvjoin(changed, ", ")

It outputs

_timechanged
2024-01-25 10:20:56changed field "c"
2024-01-25 10:22:56changed field "a", changed field "b", changed field "c"

 

Which is definitely not what happened in data.

Firstly, we don't know which id we're talking about, secondly, at 10:20:56 there could have been no change since it's our first data point. There is no change reported at all as 10:21:36...

"Normal" stats is _not_ the way to go to find moments of change. It can be a way to find if a field changed at all throughout the sample but not when id did change. You need to use streamstats (or autoregress and clever sorting) to get the value from the previous event to have something to compare it with. Otherwise you have only the overall aggregation, not "running changes".

0 Karma

PavelP
Motivator

yes, the _time is not the time of change, I noticed it too. But overall the code reports summarized all changes per id:

| table _time id changed

The first data point is at 10:20:30, so the reported change at 10:20:56 is correct.

I would be very interested in a solution involving "running changes". BTW never heard about the "autoregress" command, thx!

0 Karma

PickleRick
SplunkTrust
SplunkTrust
0 Karma

PavelP
Motivator

great! thank you

0 Karma

PickleRick
SplunkTrust
SplunkTrust

If you want to track changes per id, you can do something like

| streamstats window=1 current=f values(*) as previous_* by id

This will give you a value from previous event with a "previous_" prefix. Then you can try to use foreach to fiddle with that.

0 Karma

PavelP
Motivator

any ideas how can I use foreach to "collect" all changes (using mvappend)? My current attempt works only if I restrict foreach to one specific field (e.g. "a") and even then it shows just one change pro id:


| foreach  a  [ eval changed = if ( previous_<> != <> , mvappend(changed, "<>") , 0) ] | search changed!=0 | stats values(changed) values(id) by _time

 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Look at this run-anywhere example (yes, it's ugly but it seems to work.

| makeresults count=100 
| eval val1=random() % 4, val2=random()%3
| streamstats window=1 current=f values(val*) as previous_val*
| eval changed=""
| foreach *
[ eval changed=mvappend(changed,if(like("<<FIELD>>","previous_%") OR '<<FIELD>>'='previous_<<FIELD>>',null(),"Field: <<FIELD>> oldval: ".'previous_<<FIELD>>'." newval: ".'<<FIELD>>'))]

EDIT: Added "by id" to streamstats because I'd forgotten it before.

EDIT2: Removed "by id" it made sense in the context of the original question but didn't in case of this mockup data.

0 Karma

PavelP
Motivator

something is missing here, there aren't any values in the column "changed" 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Try now. I "fixed" it but I overthought it so I "fixed it back".

0 Karma

PavelP
Motivator

that looks great, thank you PickleRick!

I modified it to "| foreach val1 val2" to avoid "Failed to parse templatized search for field 'previous_val1'" error.

I need some time to adapt it to my real search, I'll get back 

0 Karma
Get Updates on the Splunk Community!

Stay Connected: Your Guide to May Tech Talks, Office Hours, and Webinars!

Take a look below to explore our upcoming Community Office Hours, Tech Talks, and Webinars this month. This ...

They're back! Join the SplunkTrust and MVP at .conf24

With our highly anticipated annual conference, .conf, comes the fez-wearers you can trust! The SplunkTrust, as ...

Enterprise Security Content Update (ESCU) | New Releases

Last month, the Splunk Threat Research Team had two releases of new security content via the Enterprise ...