Splunk Search

How to find high-frequency behavioral events using Splunk?

bestSplunker
Contributor

There are many accounts with different roles that often use the backend management system to query user information. Now, I need to use Splunk to search for accounts that frequently query user information.

Example events are as follows:
 
`_time=2022-12-01T10:00:01.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:00:02.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:03.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:00:07.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:09.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:00:11.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:12.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:13.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:14.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:22.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:01:27.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:00:27.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:30.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:33.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:34.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:36.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:01:37.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:39.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:01:45.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:47.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:55.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:59.000Z, account_id=3, query user infomation.`
 

We can obtain the average time frequency of queries by calculating the sum of time intervals between each query for each account, and then dividing it by the number of queries.

account_id =1 ,account 1 has queried 4 times and the total time interval is 2+6+30=38 seconds,so the average query time frequency is 38 seconds/3 times = 12.66 seconds/times.
 
account_id =2 ,account 1 has queried 12 times and the total time interval is 4+3+1+1+1+8+5+3+1+1+2=38 seconds,so the average query time frequency is 30 seconds/11 times = 2.72 seconds/times.
 
account_id =3 ,account 1 has queried 6 times and the total time interval is 10+8+2+8+4=32 seconds,so the average query time frequency is 32 seconds/5 times = 6.4 seconds/times.
 
 

 now, I want to find accounts with query interval below 5 seconds. By manual calculation, we can see that the average query interval time for account_id=2 is 2.72s, so it may have exhibited abnormal behavior.It's possible that account 2 used an automation tool to crawl user information in the backend, given its short query intervals.

so  how to use SPL statements to search for abnormal accounts with an average query interval of less than 5 seconds, and to calculate the total number of queries and the average interval for each account?"

 
 
 
 
 
 
Labels (1)
0 Karma

bowesmana
SplunkTrust
SplunkTrust

NB: There are a couple of mistakes in your interval calculations, e.g. account 1 is 2 + 6 + 90, not 30. Anyway, you can simply use streamstats to get the gap and then average that out, i.e.

... your_search ...
| streamstats c window=2 global=f range(_time) as gap by account_id
| where c=2
| stats avg(gap) as avg_frequency by account_id
| where avg_frequency<5

streamstats will count the number (c) of events seen in it's gap calulation and then take the range of time values to create a field 'gap' by the account id.

where c=2 is done to remove the first event for each account id, as the gap will always be 0, so we want to use a gap count rather than event count to calculate average

Then just use stats/where to filter out those less than 5.

You could of course use stdev and calculate outliers from the norm rather than using a fixed 5 second gap as that would be more flexible if traffic changes.

Please avoid using transaction - it is not intended for this purpose and has memory limitations that can cause it just to ignore certain events where you detect transactions over a long period of time.

0 Karma

bowesmana
SplunkTrust
SplunkTrust

@bestSplunker 

You can see a working example with your data by copying/pasting this to your search window.

| makeresults
| eval data=split(replace("_time=2022-12-01T10:00:01.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:00:02.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:03.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:00:07.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:09.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:00:11.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:12.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:13.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:14.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:22.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:01:27.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:00:27.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:30.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:33.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:34.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:00:36.000Z, account_id=2, query user infomation.
_time=2022-12-01T10:01:37.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:39.000Z, account_id=1, query user infomation.
_time=2022-12-01T10:01:45.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:47.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:55.000Z, account_id=3, query user infomation.
_time=2022-12-01T10:01:59.000Z, account_id=3, query user infomation.", "\n", "###"), "###")
| mvexpand data
| rex field=data "_time=(?<t>\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z), account_id=(?<account_id>\d+),"
| eval _time=strptime(t, "%FT%T.%QZ")
| table _time account_id
``` Above is just your example data setup ```

``` Use streamstats to calculate the event count and gap for each account ```
| streamstats c window=2 global=f range(_time) as gap by account_id
``` Remove the first event, so it doesn't get used in the gap average calculation ```
| where c=2
``` Now calculate the average and total span and gap count ```
| stats sum(gap) as span count as gap_count avg(gap) as avg_frequency by account_id
| where avg_frequency<5

 

0 Karma

lim2
Communicator

Sorry for the delay, I found this after searching a for a question which you posted & also answered. I edited 2022-12 to 2023-12 and oneshot the test.csv with header timestamp,account_id,desc. Please try following:

earliest=9/1/2023:0:0:0 index=test source=/tmp/test.csv
|transaction account_id
| rename linecount as totalqueries
| table account_id timestamp duration totalqueries
| eval frequency=duration/totalqueries
| where frequency < 5

 

0 Karma

lim2
Communicator

output of the queryoutput of the query

0 Karma
Get Updates on the Splunk Community!

Stay Connected: Your Guide to May Tech Talks, Office Hours, and Webinars!

Take a look below to explore our upcoming Community Office Hours, Tech Talks, and Webinars this month. This ...

They're back! Join the SplunkTrust and MVP at .conf24

With our highly anticipated annual conference, .conf, comes the fez-wearers you can trust! The SplunkTrust, as ...

Enterprise Security Content Update (ESCU) | New Releases

Last month, the Splunk Threat Research Team had two releases of new security content via the Enterprise ...