Splunk Search

How do you group two queries from different sourcetypes and events?

Branden
Builder

Hello. I am having trouble with a complicated query. Here's what I'm trying to do:

We have events from IIS w3svc1 logs which include requestor IPs. For example:

2018-09-03 15:47:10 yy.yy.yy.yy GET //Catalog/ year=2018-2019 80 - xx.xx.xx.xx WordPress/4.9.6;+https://somewhere.com https://somewhere.com/Public/Catalog/?year=2018-2019 200 0 0 62

We also have standard WinEventLog:Application events that include the username that authenticated but does not include the requestor IP address.

When the WinEventLog:Application event has "MESSAGE="Login success", I want to correlate that event with the w3svc1 access log event at the same exact time stamp.

In other words, I want to determine which user successfully logged in at a specific time by referencing two separate logs. I can create the two separate queries, I'm unsure how to connect the two. With the help of DalJeanis below, I have the following so far:

(index=wineventlog host=somehost* USER=johndoe MESSAGE="Login success") OR (index=ag host=somehost* user="-" NOT requestor_ip=1.2.3.4 NOT requestor_ip=5.6.7.8 NOT username=*bot*) 
| fields USER, requestor_ip 
| eval new_time=if(match(sourcetype,"wineventlog"), _time-10,_time) 
| sort 0 new_time 
| streamstats time_window=30s last(USER) as USER 
| where match(sourcetype,"w3svc1")

I see where this is going... but I'm having trouble with streamstats complaining:

Error in 'streamstats' command: time_window can only be used on input that is sorted in time order (both ascending and descending order are ok)

I think the issue is that I'm having trouble with the match function in the eval. I'm not sure what was meant in the comment by "Type A record" and Type B record. My guess was that it's a way to distinguish between the two searches, so I used their sourcetypes for comparison. Maybe that's the wrong approach?

To work around the error, I tried this:

(index=wineventlog host=somehost* USER=johndoe MESSAGE="Login success") OR (index=ag host=somehost* user="-" NOT requestor_ip=1.2.3.4 NOT requestor_ip=5.6.7.8 NOT username=*bot*) 
    | transaction requestor_ip maxspan=30s
    | fields USER, requestor_ip 
    | eval new_time=if(match(sourcetype,"wineventlog"), _time-10,_time) 
    | sort 0 new_time 
    | streamstats last(USER) as USER 
    | where match(sourcetype,"w3svc1")

The problem is it's returning over 5000 results for the past 24 hours. It should only be returning maybe 2 or 3. It's as if the USER=johndoe constraint isn't working... Upon closer inspection, it looks like it's not even performing search A...

Thanks!

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

1) I'm assuming that yy.yy.yy.yy and xx.xx.xx.xx are IP addresses. if you substitute 1.2.3.4 or 123.123.123.123 it would be more obvious.

2) In my experience, there is no such thing as an "exact same timestamp" between different logs. If, in your exact use case, the logs really get 100% the same timestamp, then you can just do a join on _time between the two types of events and you are done. More likely, you are going to need to identify which one usually logs first, and by how many seconds, and use that order to help define the match logic.

3) This use case is typically a job for streamstats, plus a little time manipulation. Let's suppose that log type A is usually first, by as much as 45 seconds, but may be delayed up to 5 seconds after log B. Here's the pseudocode to match up the records.

 (search for event A) OR
 (search for event B)
 | fields ... AUserName BSrc_IP   list all the fields you need from either record
 | eval new_time=if(this is a type A record, _time - 10, _time)
 | sort 0 new_time
 | streamstats time_window=50s last(AUserName) as AUserName
 | where (record is type B)

So, we copy the required type A data over to the following type B record, and throw way the type A records.

If you have significant volume of these, you may get a situation where several logons occur at roughly the same time. For hat, you might want to change last(AUserName) to list(AUserName), acknowledging that it could be any of them.

Branden
Builder

Thank you for your detailed response. I'm going to experiment with streamstats and see where I end up. I'll let you know how I make out!

kmaron
Motivator

If you provide your two queries it will be easier to help you combine them.

0 Karma

Branden
Builder

Done, thank you!

0 Karma

shiv1593
Communicator

Hey Branden,

Could ou please tell us what did you do to get it running normally?

Thank you

0 Karma
Get Updates on the Splunk Community!

.conf24 | Registration Open!

Hello, hello! I come bearing good news: Registration for .conf24 is now open!   conf is Splunk’s rad annual ...

Splunk is officially part of Cisco

Revolutionizing how our customers build resilience across their entire digital footprint.   Splunk ...

Splunk APM & RUM | Planned Maintenance March 26 - March 28, 2024

There will be planned maintenance for Splunk APM and RUM between March 26, 2024 and March 28, 2024 as ...