All Apps and Add-ons

How to get around ForEach's inability to contain non-streaming commands?

dmoore44
New Member

With help from the Splunk Machine Learning Toolkit, I've constructed a query that detects numeric outliers; in this case the sum of outbound bytes from a server in 10 minute chunks:

index="proxies" earliest=-7d@d (c_ip="1.2.3.4")
| timechart span=10m sum(bytes_out) as bytesout by c_ip
| streamstats window=200 current=true median("1.2.3.4") as median
| eval absDev=(abs('1.2.3.4'-median))
| streamstats window=200 current=true median(absDev) as medianAbsDev
| eval lowerBound=(median-medianAbsDev*exact(20)), upperBound=(median+medianAbsDev*exact(20))
| eval isOutlier=if('1.2.3.4' < lowerBound OR '1.2.3.4' > upperBound, 1, 0)
| where isOutlier=1

Now, what I would like to do is iterate over a group of servers like this:

index="proxies" earliest=-7d@d [|inputlookup lu_inventory where function="web_server" | table ip | rename ip as c_ip | format]
| foreach c_ip [| timechart span=10m sum(bytes_out) as bytesout by '<<FIELD>>'
| streamstats window=200 current=true median('<<FIELD>>') as median
| eval absDev=(abs('<<FIELD>>'-median))
| streamstats window=200 current=true median(absDev) as medianAbsDev
| eval lowerBound=(median-medianAbsDev*exact(20)), upperBound=(median+medianAbsDev*exact(20))
| eval isOutlier=if('<<FIELD>>' < lowerBound OR '<<FIELD>>' > upperBound, 1, 0)
| where isOutlier=1]

But, the problem is the foreach command cannot contain non-streaming commands.

So, is there a way to programmatically iterate over a list of IPs and find the outliers?

0 Karma

aljohnson_splun
Splunk Employee
Splunk Employee

This is actually a challenge question in the labs from our Splunk for Analytics and Data Science class!

There is a simpler solution IMO. Its using stats and splitting by _time after using bin. What we're basically doing is NOT splitting out (via timechart) and keeping things "stats-like" and row centric for as long as possible. This allows each eval to work on a full column across various groups. Here is an example:

index=_internal component=* 
| bin _time span=10m 
| makecontinuous _time
| stats count as some_aggregation by component _time 
| streamstats window=10 current=true median(some_aggregation) as median by component
| eval absDev=(abs(some_aggregation-median)) 
| streamstats window=10 current=true median(absDev) as medianAbsDev by component
| eval lowerBound=(median-medianAbsDev*exact(20)), upperBound=(median+medianAbsDev*exact(20)) 
| eval isOutlier=if(some_aggregation < lowerBound OR some_aggregation > upperBound, 1, 0) 
| where isOutlier=1

The important parts:

| bin _time span=10m
| makecontinuous _time
| stats count as some_aggregation by component _time

^ Here we are doing the bucketing that timechart does for us manually, then splitting but keeping things row-based.

| streamstats window=10 current=true median(some_aggregation) as median by component
| ...
| streamstats window=10 current=true median(absDev) as medianAbsDev by component

^ Once again, make sure you split by component whenever you're doing any stats-like command.

In the end, you can then manipulate things however you want to try to visualize (e.g. xyseries + untable) but it takes a little massaging... likely however, if you're doing this over a large number of IPs, you aren't trying to visualize but rather alert.

0 Karma

aljohnson_splun
Splunk Employee
Splunk Employee

Oh haha I realized this is almost identical to @acharlieh 's answer - oops!

0 Karma

martin_mueller
SplunkTrust
SplunkTrust

Almost nothing here should actually need foreach.

Here's your search, plus some commentary:

     index="proxies" earliest=-7d@d [|inputlookup lu_inventory where function="web_server" | table ip | rename ip as c_ip | format]
     | foreach c_ip [| timechart span=10m sum(bytes_out) as bytesout by '<<FIELD>>'

foreach c_ip runs exactly once - for the field c_ip. Removing the foreach and inserting c_ip everywhere would do what you wrote... but achieve nothing, because c_ip stops being a field after timechart transformed your events. Here you have reason #1 why foreach doesn't do transforming commands - if you transformed your events n times for each matching field, the result wouldn't be what you expected.

Instead, just use timechart limit=1000 span=10m sum(bytes_out) by c_ip | fillnull, giving you a field for each IP value out of the top 1000 - and filling in empty values with zeroes, as the sum of nothing is null, not zero.

     | streamstats window=200 current=true median('<<FIELD>>') as median

streamstats already has a sort-of foreach built in, use streamstats window=200 current=true median(*.*.*.*) as median_*.*.*.*, assuming IPv4.

     | eval absDev=(abs('<<FIELD>>'-median))

THIS actually is a case for foreach, because you want to do the maths for all IPs: foreach median_* [eval absDev_<<MATCHSTR>> = abs('<<MATCHSTR>>'-'<<FIELD>>')]

     | streamstats window=200 current=true median(absDev) as medianAbsDev

See above: streamstats window=200 current=true median(absDev_*) as medianAbsDev_*

     | eval lowerBound=(median-medianAbsDev*exact(20)), upperBound=(median+medianAbsDev*exact(20))
     | eval isOutlier=if('<<FIELD>>' < lowerBound OR '<<FIELD>>' > upperBound, 1, 0)

These again do call for foreach, plus a little magic: foreach median_* [eval lowerBound_<<MATCHSTR>> = ('<<FIELD>>' - 'medianAbsDev_<<MATCHSTR>>'*exact(20)), upperBound_<<MATCHSTR>> = ('<<FIELD>>' + 'medianAbsDev_<<MATCHSTR>>'*exact(20)), outliers = if('<<MATCHSTR>>' < 'lowerBound_<<MATCHSTR>>' OR '<<MATCHSTR>>' > 'upperBound_<<MATCHSTR>>', mvappend(outliers, "<<MATCHSTR>>"), outliers)]

     | where isOutlier=1]

This now becomes where mvcount(outliers) > 0

In its entirety:

  index="proxies" earliest=-7d@d [|inputlookup lu_inventory where function="web_server" | table ip | rename ip as c_ip | format]
| timechart limit=1000 span=10m sum(bytes_out) by c_ip
| fillnull
| streamstats window=200 current=true median(*.*.*.*) as median_*.*.*.*
| foreach median_* [eval absDev_<<MATCHSTR>> = abs('<<MATCHSTR>>'-'<<FIELD>>')]
| streamstats window=200 current=true median(absDev_*) as medianAbsDev_*
| foreach median_* [eval lowerBound_<<MATCHSTR>> = ('<<FIELD>>' - 'medianAbsDev_<<MATCHSTR>>'*exact(20)), upperBound_<<MATCHSTR>> = ('<<FIELD>>' + 'medianAbsDev_<<MATCHSTR>>'*exact(20)), outliers = if('<<MATCHSTR>>' < 'lowerBound_<<MATCHSTR>>' OR '<<MATCHSTR>>' > 'upperBound_<<MATCHSTR>>', mvappend(outliers, "<<MATCHSTR>>"), outliers)]
| where mvcount(outliers) > 0 | fields outliers

PS: Entirely untested, just typed into answers - should work though, bar possibly typos...

acharlieh
Influencer

foreach is not needed in this case... you would use foreach when you need to do something to each distinct field... but in your case you have a bunch of values of the same field. I think if you know first that timechart will create a different field per value of the by clause field, and that untable can take a tabular format back out to a statistical format so that each of your events gives the bytes by ip for each 10 minute window (this is the opposite of the xyseries command), and that streamstats has a global parameter by which you can ensure the the window of 200 events (~30/40 minutes) is by each ip instead of by each set of events, your search is rather easy to translate to work across multiple ip addresses:

 index="proxies" earliest=-7d@d 
    [inputlookup lu_inventory where function="web_server" | rename ip as c_ip | fields c_ip]
|timechart limit=0 span=10m sum(bytes_out) by c_ip
|fillnull
|untable _time ip bytesout
|streamstats window=200 current=true global=false median(bytesout) as median by ip
|eval absDev=(abs(bytesout-median))
|streamstats window=200 current=true global=false median(absDev) as medianAbsDev by ip
|eval lowerBound=(median-medianAbsDev*exact(20)), upperBound=(median+medianAbsDev*exact(20))
|eval isOutlier=if(bytesout < lowerBound OR bytesout > upperBound, 1, 0)
|where isOutlier=1 

The reason you'd do timechart ... | untable ... is that way you get those 10 minute spans where a particular IP has 0 bytes... another option would be to do bin _time span=10m | stats sum(bytes_out) as bytesout by c_ip but this would not include those 10 minute spans where an IP address has no bytes out, and thus throw off the running median... there's another command you would need to include to fill those time periods but offhand I'm not sure what it is.

acharlieh
Influencer

On the Splunk Usergroup Slack Chat (Need an invite? https://splk.it/slack ), Martin pointed out that a sum of nothing would be null not zero :), so quick adding an appropriate fillnull in here.

0 Karma

martin_mueller
SplunkTrust
SplunkTrust

You've got makecontinuous on the tip of your tongue, but it doesn't do by field.

Get Updates on the Splunk Community!

Extending Observability Content to Splunk Cloud

Watch Now!   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to leverage ...

More Control Over Your Monitoring Costs with Archived Metrics!

What if there was a way you could keep all the metrics data you need while saving on storage costs?This is now ...

New in Observability Cloud - Explicit Bucket Histograms

Splunk introduces native support for histograms as a metric data type within Observability Cloud with Explicit ...