Splunk Search

How to run a two step search (probably using map)

scott_cultuream
New Member

I have a requirement to get the count of events in the past 6 weeks, grouped by week. The query looks a like this:

| pivot Application_User_Events Report_Views dc(user_email) AS "users" SPLITROW _time AS _time PERIOD week SPLITROW account_subdomain AS account_subdomain

Then for each week that is generated, I want to look back 30 days and count a number of other events that occurred. So, for example, if one the resulting rows was

2017-04-09  foo  12

then I'd want to add an extra column that counted the number of events between 30 days prior to 2017-04-09 and 2017-04-09. The resulting row would be:

2017-04-09  foo  12  90

I've been toying with mapand appendcols, as well as earliest and latest, but I feel like I'm fumbling in the dark. Any help would be appreciated.

0 Karma

woodcock
Esteemed Legend

This was one of the hardest ones I ever did! This will do it:

| gentimes start=-40
| rename COMMENT AS "BE SURE TO RUN FOR 'Last 80(ish) days` to cover 6 weeks + 30 days farther back"
| rename starttime AS week 
| fields week 
| bin span=1w week 
| dedup week 
| tail 6 
| eval week_less_30_days = week - (30*24*60*60) 
| stats values(*) AS *
| eval week0 = mvindex(week, 0)
| eval week1 = mvindex(week, 1)
| eval week2 = mvindex(week, 2)
| eval week3 = mvindex(week, 3)
| eval week4 = mvindex(week, 4)
| eval week5 = mvindex(week, 5)
| eval week0_less_30_days = mvindex(week_less_30_days, 0)
| eval week1_less_30_days = mvindex(week_less_30_days, 1)
| eval week2_less_30_days = mvindex(week_less_30_days, 2)
| eval week3_less_30_days = mvindex(week_less_30_days, 3)
| eval week4_less_30_days = mvindex(week_less_30_days, 4)
| eval week5_less_30_days = mvindex(week_less_30_days, 5)
| fields - week week_less_30_days
| rename COMMENT AS "You can think of the above as setting a group of initial variables based on the TimePicker value"

| map search="|pivot Application_User_Events Report_Views dc(user_email) AS users SPLITROW _time AS _time PERIOD day SPLITROW account_subdomain AS account_subdomain

| rename COMMENT AS \"Your fields at this point should look be: '_time account_subdomain_1 account_subdomain_2 ... account_subdomain_Z'\"
| rename COMMENT AS \"If your fields are: '_time account_subdomain users', then remove the next 'untable' line\"
| untable _time account_subdomain users

| eval week=_time
| bin span=1w week

| eval daysOf30 = \",\"
| eval daysOf30 = if((_time >= $week0_less_30_days$ AND _time <= $week0$), daysOf30 . \",week0\", daysOf30)
| eval daysOf30 = if((_time >= $week1_less_30_days$ AND _time <= $week1$), daysOf30 . \",week1\", daysOf30)
| eval daysOf30 = if((_time >= $week2_less_30_days$ AND _time <= $week2$), daysOf30 . \",week2\", daysOf30)
| eval daysOf30 = if((_time >= $week3_less_30_days$ AND _time <= $week3$), daysOf30 . \",week3\", daysOf30)
| eval daysOf30 = if((_time >= $week4_less_30_days$ AND _time <= $week4$), daysOf30 . \",week4\", daysOf30)
| eval daysOf30 = if((_time >= $week5_less_30_days$ AND _time <= $week5$), daysOf30 . \",week5\", daysOf30)
| makemv delim=\",\" daysOf30
"

| multireport [
| stats sum(users) AS users BY week account_subdomain
| rename week AS _time | tail 6
| streamstats count AS week
| eval week = "week" . (6-week)
][
| stats sum(users) AS users BY account_subdomain daysOf30
| rename week AS _time, daysOf30 AS week, users AS users30
]

| stats first(_time) AS _time values(*) AS * BY week account_subdomain
| table _time week account_subdomain users users30
0 Karma

micahkemp
Champion

I think you should try to do everything in one search if at all possible, and in this case I think it is. Here is an attempt at a similar type of counting:

index=_internal 
| timechart span=1d count
| streamstats window=30 current=false sum(count) AS last_30_count
| timechart span=1w sum(count) AS week_count first(last_30_count) AS last_30_count
0 Karma

scott_cultuream
New Member

Thanks. I think you're on the right track by using streamstats to get the rolling count. The problem is I need to count distinct, not just count, and that always makes things just a little harder.

I did find this really useful thread—https://answers.splunk.com/answers/91676/rolling-distinct-counts.html—which uses values and the streamstats to do the rolling count distinct. It sort of works for me, but I suppose I don't understand it 100%, so I'm not totally sure it's doing what I think.

0 Karma

scott_cultuream
New Member

I think I more or less had this right before, but was tripped up by the default maxsearches default to map which was truncating my results. However, this seems like further evidence I'm doing this wrong, as getting the results requires nearly 10,000 searches.

Here's what I have:

| tstats dc(All_Application_Events.user_email) AS "report_viewers"
  FROM datamodel=Application_User_Events
  WHERE nodename=All_Application_Events.Authenticated_Events.Report_Views 
  BY All_Application_Events.account_subdomain, _time span=1w
| eval earliest_time=relative_time(_time,"-30d"), latest_time=relative_time(_time,"-1d")
| rename All_Application_Events.account_subdomain AS account_subdomain
| map maxsearches=100 search="| 
  tstats dc(id) AS response_count 
  FROM datamodel=Application_User_Events
  WHERE
    nodename=All_Application_Events.Capture_Events.Response_Views AND
    earliest>=$earliest_time$ AND 
    latest<=$latest_time$ AND 
    All_Application_Events.account_subdomain=$account_subdomain$
  | eval start_time=$earliest_time$, end_time=$latest_time$, report_viewers=$report_viewers$, account_subdomain=\"$account_subdomain$\"
  | fields start_time, end_time, response_count, report_viewers, account_subdomain"

Thanks.

0 Karma

scott_cultuream
New Member

Found another answer—https://answers.splunk.com/answers/91676/rolling-distinct-counts.html—that seems really helpful. I've modified it to look at a weekly grouping, and then the previous 4 weeks to get a rolling distinct count. I think this may be the answer, but I'm not 100% sure yet. One thing I would love to do is get rid of the join, but two searches is better than 10,000!

| tstats values(All_Application_Events.params.id) AS respondents_list
  FROM datamodel=Application_User_Events
  WHERE nodename=All_Application_Events.Capture_Events.Response_Views 
  BY All_Application_Events.account_subdomain, _time span=1w
| streamstats window=4 dc(respondents_list) as respondents_count
| fields - response_list
| join account_subdomain,_time type=outer [tstats dc(All_Application_Events.user_email) AS report_viewers
  FROM datamodel=Application_User_Events
  WHERE nodename=All_Application_Events.Authenticated_Events.Report_Views 
  BY All_Application_Events.account_subdomain, _time span=1w | eval event="Report View"]
| rename
    All_Application_Events.account_subdomain AS account_subdomain
    All_Application_Events.params.id AS response_id
| eval ratio=report_viewers/respondents_count
0 Karma
Get Updates on the Splunk Community!

What's new in Splunk Cloud Platform 9.1.2312?

Hi Splunky people! We are excited to share the newest updates in Splunk Cloud Platform 9.1.2312! Analysts can ...

What’s New in Splunk Security Essentials 3.8.0?

Splunk Security Essentials (SSE) is an app that can amplify the power of your existing Splunk Cloud Platform, ...

Let’s Get You Certified – Vegas-Style at .conf24

Are you ready to level up your Splunk game? Then, let’s get you certified live at .conf24 – our annual user ...