Splunk Search

How to join a lookup value for comparison?

nahfam
Path Finder

The query below works, but i need to add a lookup value 'interval' to compare against the 'hours since last seen' values..
the lookup im adding is sourcetype_interval.csv which has two columns (sourcetype & interval)

| tstats latest(_indextime) as Latest where index=* by host sourcetype index 
| `remove_blacklisted_servers()`
| search NOT [inputlookup sourcetype_blacklist.csv | table sourcetype]
| eval current=now() 
| eval Minimum_Age=round(((current-Latest)/60)/60,2) 

| rangemap field=Minimum_Age default=Critical Normal=0-0.5 Elevated=0.5-2 Warning=2-3
| eval stIDX=tostring(index) + " -- " + tostring(sourcetype)
| stats values(stIDX) as "Index -- Sourcetype" list(Latest) as "Latest Event" list(Minimum_Age) as Minimum_Age list(range) as Threshold by host 
| convert ctime("Latest Event") timeformat="%Y/%m/%d %H:%M"
| eventstats avg(Minimum_Age) as average by host 
| eval average=round(average,2) 
| eval threshold_filter=mvfilter(NOT match(Threshold,"Normal") AND NOT match(Threshold,"Warning") AND NOT match(Threshold,"Elevated"))

| rename Minimum_Age as "Hours Since Last Seen" average as "Avg Hours Since Last Seen"
| fields - "Avg Hours Since Last Seen" threshold_filter

===================================================================================================

Here is how I was trying to to do it, bu to no avail. Thanks

| tstats latest(_indextime) as Latest where index=* by host sourcetype index 
| `remove_blacklisted_servers()`
| search NOT [inputlookup sourcetype_blacklist.csv | table sourcetype]
| join sourcetype [|inputlookup sourcetype_interval.csv | table sourcetype interval]
| eval interval=round(interval/60/60,2) 
| eval current=now() 
| eval Minimum_Age=round(((current-Latest)/60)/60,2) 

| rangemap field=Minimum_Age default=Critical Normal=0-0.5 Elevated=0.5-2 Warning=2-3
| eval stIDX=tostring(index) + " -- " + tostring(sourcetype)
| stats values(stIDX) as Index--Sourcetype list(Latest) as "Latest Event" list(Minimum_Age) as Minimum_Age list(range) as Threshold by host 

| convert ctime("Latest Event") timeformat="%Y/%m/%d %H:%M"
| eventstats avg(Minimum_Age) as average by host 
| eval average=round(average,2) 
| eval threshold_filter=mvfilter(NOT match(Threshold,"Normal") AND NOT match(Threshold,"Warning") AND NOT match(Threshold,"Elevated"))
| where average > interval
| rename Minimum_Age as "Hours Since Last Seen" average as "Avg Hours Since Last Seen"
| fields - "Avg Hours Since Last Seen" threshold_filter
Tags (2)
0 Karma
1 Solution

aberkow
Builder

This query might work (i'll suggest a slight build later on), but your biggest issue is you aren't passing "interval" through the stats function in line 11, and since it's a transforming command, Splunk won't have any knowledge of the field "interval" after this. To debug, I would go line by line back through your search to figure out where you lost "interval" for future cases. Here's some docs on transforming commands: https://docs.splunk.com/Splexicon:Transformingcommand. Adding back values(interval) should fix your immediate issue.

My slight build: When using lookups, you can use the lookup command instead of the inputlookup command to more implicitly join, example syntax would be:

{code}
| tstats latest(_indextime) as Latest where index=* by host sourcetype index
| remove_blacklisted_servers()
| search NOT [inputlookup sourcetype_blacklist.csv | table sourcetype]
| lookup sourcetype_interval.csv sourcetype OUTPUT interval
...
{code}

You can take a look at the function here: https://docs.splunk.com/Documentation/Splunk/8.0.0/SearchReference/Lookup.

Hope this helps!

View solution in original post

nahfam
Path Finder

****ANSWER UPDATE****

I ended up with two versions: Both listed below

*** The first is a multi-vlaue comparison of sourcetype interval vs time last seen--with a percentage of change column ***

| tstats latest(_indextime) as Latest where index=* by host sourcetype index 
| `remove_blacklisted_servers()` 
| search NOT 
    [ inputlookup sourcetype_blacklist.csv 
    | table sourcetype] 
| lookup sourcetype_interval.csv sourcetype OUTPUT interval as intervals 
| eval intervals=round(intervals/60/60,2) 
| eval intervals=coalesce(intervals,0)

| eval current=now() 
| eval Minimum_Age=round(((current-Latest)/60)/60,2) 
| eval perc_change=((Minimum_Age-intervals)/Minimum_Age*100)
| rangemap field=Minimum_Age default=Critical Normal=0-0.5 Elevated=0.5-2 Warning=2-3 
| eval stIDX=tostring(index) + " -- " + tostring(sourcetype) 
| eval stINT=tostring(sourcetype) + " -- " + tostring(intervals) 
| eval stLast=tostring(sourcetype) + " -- " + tostring(Minimum_Age) 
| eval pcChange=tostring(sourcetype) + " -- " + tostring(perc_change)
| stats values(stIDX) as Index--Sourcetype list(Latest) as "Latest Event" list(Minimum_Age) as Minimum_Age list(range) as Threshold list(stINT) as Sourcetype--Interval list(stLast) as Sourcetype--HoursSinceLast list(pcChange) as Sourcetype--PercChange by host 

| convert ctime("Latest Event") timeformat="%Y/%m/%d %H:%M" 
| eventstats avg(Minimum_Age) as average by host 
| eval average=round(average,2) 
| rename Minimum_Age as "Hours Since Last Seen" average as "Avg Hours Since Last Seen" lintervals as ST_Interval 
| sort "Latest Event" 
| fields - "Avg Hours Since Last Seen" 
| table host "Latest Event" Threshold Sourcetype--Interval Sourcetype--HoursSinceLast Sourcetype--PercChange

The second uses a single value table (makes it easier to compare lastSeen > interval)--also, has a field for percentage of change to filter on

| tstats latest(_indextime) as Latest where index=* by host sourcetype index 
| `remove_blacklisted_servers()` 
| search NOT 
    [ inputlookup sourcetype_blacklist.csv 
    | table sourcetype] 
| lookup sourcetype_interval.csv sourcetype OUTPUT interval 
| eval interval=round(interval/60/60,2) 
| eval current=now() 
| eval Minimum_Age=round(((current-Latest)/60)/60,2) 
| rangemap field=Minimum_Age default=Critical Normal=0-0.5 Elevated=0.5-2 Warning=2-3 
| eval stIDX=tostring(index) + " -- " + tostring(sourcetype) 
| stats values(stIDX) as Index--Sourcetype list(Latest) as "Latest Event" list(Minimum_Age) as Minimum_Age list(range) as Threshold values(interval) as interval by host sourcetype 
| convert ctime("Latest Event") timeformat="%Y/%m/%d %H:%M" 
| eventstats avg(Minimum_Age) as average by host 
| eval average=round(average,2) 
| where Minimum_Age > interval
| eval threshold_filter=mvfilter(NOT match(Threshold,"Normal") AND NOT match(Threshold,"Warning") AND NOT match(Threshold,"Elevated"))
| rename Minimum_Age as hours_since average as "Avg Hours Since Last Seen" interval as ST_Interval 
| where threshold_filter = "Critical"
| sort - hours_since
| fields - "Avg Hours Since Last Seen"
| eval perc_change=((hours_since-ST_Interval)/hours_since*100)
| where perc_change > 90
| table host Index--Sourcetype "Latest Event" hours_since Threshold ST_Interval perc_change
0 Karma

woodcock
Esteemed Legend

Just use the lookup file as a lookup like this:

| tstats latest(_indextime) as Latest where index=* by host sourcetype index 
| `remove_blacklisted_servers()`
| search NOT [inputlookup sourcetype_blacklist.csv | table sourcetype]

| lookup sourcetype_interval.csv sourcetype OUTPUT interval

| eval interval=round(interval/60/60,2) 
| eval current=now() 
| eval Minimum_Age=round(((current-Latest)/60)/60,2) 

| rangemap field=Minimum_Age default=Critical Normal=0-0.5 Elevated=0.5-2 Warning=2-3
| eval stIDX=tostring(index) + " -- " + tostring(sourcetype)
| stats values(stIDX) as Index--Sourcetype list(Latest) as "Latest Event" list(Minimum_Age) as Minimum_Age list(range) as Threshold by host 

| convert ctime("Latest Event") timeformat="%Y/%m/%d %H:%M"
| eventstats avg(Minimum_Age) as average by host 
| eval average=round(average,2) 
| eval threshold_filter=mvfilter(NOT match(Threshold,"Normal") AND NOT match(Threshold,"Warning") AND NOT match(Threshold,"Elevated"))
| where average > interval
| rename Minimum_Age as "Hours Since Last Seen" average as "Avg Hours Since Last Seen"
| fields - "Avg Hours Since Last Seen" threshold_filter

aberkow
Builder

This query might work (i'll suggest a slight build later on), but your biggest issue is you aren't passing "interval" through the stats function in line 11, and since it's a transforming command, Splunk won't have any knowledge of the field "interval" after this. To debug, I would go line by line back through your search to figure out where you lost "interval" for future cases. Here's some docs on transforming commands: https://docs.splunk.com/Splexicon:Transformingcommand. Adding back values(interval) should fix your immediate issue.

My slight build: When using lookups, you can use the lookup command instead of the inputlookup command to more implicitly join, example syntax would be:

{code}
| tstats latest(_indextime) as Latest where index=* by host sourcetype index
| remove_blacklisted_servers()
| search NOT [inputlookup sourcetype_blacklist.csv | table sourcetype]
| lookup sourcetype_interval.csv sourcetype OUTPUT interval
...
{code}

You can take a look at the function here: https://docs.splunk.com/Documentation/Splunk/8.0.0/SearchReference/Lookup.

Hope this helps!

nahfam
Path Finder

Thanks aberkow!

I've implemented some changes and I am now getting an 'interval' column with values...However, the values dont seem to line up with their appropriate corresponding sourcetype. Not sure why. Any ideas?

| tstats latest(_indextime) as Latest where index=* by host sourcetype index  
| `remove_blacklisted_servers()` 
| search NOT [inputlookup sourcetype_blacklist.csv | table sourcetype] 
| lookup sourcetype_interval.csv sourcetype OUTPUT interval as intervals 
| eval intervals=round(intervals/60/60,2) 
| eval current=now()  
| eval Minimum_Age=round(((current-Latest)/60)/60,2) 
| rangemap field=Minimum_Age default=Critical Normal=0-0.5 Elevated=0.5-2 Warning=2-3 
| eval stIDX=tostring(index) + " -- " + tostring(sourcetype) 
| stats values(stIDX) as Index--Sourcetype list(Latest) as "Latest Event" list(Minimum_Age) as Minimum_Age list(range) as Threshold list(intervals) as lintervals by host 
| convert ctime("Latest Event") timeformat="%Y/%m/%d %H:%M" 
| eventstats avg(Minimum_Age) as average by host  
| eval average=round(average,2)  

| rename Minimum_Age as "Hours Since Last Seen" average as "Avg Hours Since Last Seen" lintervals as ST_Interval 

| fields - "Avg Hours Since Last Seen" 
0 Karma

aberkow
Builder

Unfortunately I don't see a magic answer from a quick scan of the search, especially without knowing the contents of the csv. My suggestion is to run the search in one tab with the first 3 lines and in another tab with the fourth line included, basically taking a diff of just that line. Like I said above the best way to debug is definitely to go line by line. If you're getting a sourcetype that matches something in that csv, and getting a different interval than the one corresponding to that, I'd be surprised.

The top answer I can think of: you might have multiple intervals for a sourcetype, and one of your later commands (if you go line by line checking the output) might be removing anything after the first one. I.e. OUTPUT returns 3 values, and round() might only return the first.

After that, I'd make sure you're hitting the right csv (do you have two? are they permissioned correctly?), and if you still can't figure it out, perhaps link some scrubbed data for us to work with on here for a return object after line 3, the csv, and the object after line 4.

0 Karma

nahfam
Path Finder

Ok soooo. I'm pretty sure its because im doing a stats list(interval) by host...Thus, its just simply listing the intervals ...Whereas I should be doing a stats values(interval) by sourcetype...however, I'm not sure how to add that in?. Again, thanks for your help

0 Karma

aberkow
Builder

Solid debugging! I think you're right. My answer depends on how you want that information passed through. Do you want every host to have every interval? Every host, sourcetype combination?

You can try something like eventstats to pass through all values of interval for each sourcetype on each host, like below:

eventstats values(interval) as intervals by sourcetype, and then passing through values(intervals) by host in your later line, but this only works if you want all intervals for all sourcetypes on a host.

You could try splitting your stats clause, doing ...values(interval) as vinterval by host, sourcetype or by host, interval. I think playing around with the stats line is probably your best bet, and potentially adding another stats line after it, further aggregating after you've initially passed through values.

Unfortunately these questions are also hard to answer without data to work with, since I don't know the exact output at any state. Hope this helps though! (You can also consider opening another question if none of these suggestions resolve your issue)

0 Karma

nahfam
Path Finder

Thanks for your help. played around with what you suggested and went with the two versions posted below. Will accept your answer..thanks again!

0 Karma

nahfam
Path Finder

You are a cool dude. Thanks for the feedback. Perhaps it is an issue of multiple intervals, or permissions related. I'll check it out. thanks!

0 Karma

aberkow
Builder

Aw thanks. I appreciate the upvote on the answer! You can also accept it as the question asker

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...