Splunk Search

Extracting timebased information from multiple joins

targoyal
New Member

Hi,

I am trying to join information returned by an index, with different filters, to each other and I am unable to get correct information and need help in sorting it out.

So, I have an index that contains some events such as login, logoff, logonReject, InformationFlow etc. There is no particular sequence in which these events happen, i.e. a logged in user can login again without logging off.

Now I want to join these events in such a way that they appear as following in the same row for a particular user:
Time of InformationFlow, Time of Logoff, Time of Logon, Time of Logon Reject.

I want only the first logofff, logon, logonReject event after every InformationFlow event.

My query looks something like below:

*
index="xyZ" source=events.log ("Event=InformationFlow") | table UserId, EventTime as InformationFlowTime

| join UserId type=left usetime=true earlier=false max=0 [search index="xyZ" source=events.log ("Event=Logoff") | table UserId, EventTime as LogoffTime]

| join UserId type=left usetime=true earlier=false max=0 [search index="xyZ" source=events.log ("Event=Logon") |  table UserId, EventTime as LogonTime]

| join UserId type=left usetime=true earlier=false max=0 [search index="xyZ" source=events.log ("Event=LogonReject") |  table UserId, EventTime as LogonReject]

| table UserId, InformationFlowTime, LogoffTime, LogonTime, LogonReject

*

Now the problem is, the query above gives me all possible combinations of InformationFlowTime with LogonTime, LogoffTime etc whereas I want only the first LogonTime, LogoffTime, LogonRejects after each InformationFlow type of event.
if I remove max=0 from the query, it just gives me the most recent LogonTime, LogoffTime etc because I am using earlier=false in the join.

Can someone help tell me how I can join the data in the right order that they appear? I am running out of ideas here.

Thanks,
TG

0 Karma
1 Solution

DalJeanis
SplunkTrust
SplunkTrust

I find it very annoying that the first two pages in the documentation for join are not about how to use join, but a description of all the different things you can use instead of join.

There's a reason.

When approaching data in splunk, you need to start with the events, not the relations between the events.

THROWING THEM ALL IN THE POT

You want to link each InformationFlow event with certain the other events. So, let's start by throwing all those events in a pot...

index="xyZ" source=events.log ("Event=InformationFlow" OR "Event=Logoff" OR 
                               "Event=Logon" OR "Event=LogonReject" ) 
| rename COMMENT as "limit to the fields we want, and set the time fields based on event"
| fields UserId, Event, EventTime
| eval timetype=Event."Time"
| eval {timetype}=EventTime
| rename COMMENT as "now we have all the events present, and the times set"

That brackets thing {xxx}= is just a tricky way of writing this following chunk of code, which is how we usually do it, because pulling data from differential records isn't normally so simple a pattern. However, you have a very simple fact pattern, so it makes the above elegant little code snippet possible. You might prefer the following, just because it is more "normal" to read....

| rename COMMENT as "this does the same thing as the above stuff, a different way."
| rename COMMENT as "limit to the fields we want, and set the time fields based on event"
| fields UserId, Event, EventTime
| eval InformationFlowTime=if(Event="InformationFlow",EventTime,null())
| eval LogoffTime=if(Event="Logoff",EventTime,null())
| eval LogonTime=if(Event="Logon",EventTime,null())
| eval LogonRejectTime=if(Event="LogonReject",EventTime,null())

Now that they are all in the pot, let's stir, shall we?

STIRRING UNTIL THEY'RE SOUP

We need to figure out which of the various records are relevant. You have said that you only want the first records of each kind AFTER the InformationFlow record. So, let's sort the records into order, copy the latest InformationFlowTime down onto all the following records, and then finally pick the lowest time for each of the relevant types of record.

Some might be null; for example, if there are no LogonReject records between two InformationFlow records, then the LogonRejectTime for that InformationFlow record will be null.

| rename COMMENT as "now we have all the events present, and the times set"
| sort 0 UserId, EventTime
| streamstats prior(InformationFlowTime) as PriorInf  by UserId
| eventstats min(LogoffTime) as minLogoff, min(LogonTime) as minLogon, min(LogonRejectTime) as minReject by UserId PriorInf

| rename COMMENT as "and finally, kill everything that isn't an information flow record"
| search Event="InformationFlow"

MORE COMPLICATED STEWS

Now, if an InformationFlow record was something that happened while someone was logged on, and you needed the immediate prior Logon and the immediate following Logoff, then you'd do the above procedure once in each direction for the relevant type of records. sort -> streamstats -> eventstats -> (sort or reverse) -> streamstats -> eventstats.

You could also do intermediate evals and streamstats or eventstats processing if you only wanted the events that were in a certain order... for instance, if you wanted LogonRejects only before the first successful Logon after each InformationFlow, and you only want Logoffs after the first successful Logon.

| rename COMMENT as "now we have all the events present, and the times set"
| sort 0 UserId, EventTime
| streamstats prior(InformationFlowTime) as PriorInf  by UserId
| eventstats min(LogonTime) as minLogon, min(LogonRejectTime) as minReject by UserId PriorInf
| eval minReject = if (minReject > minLogon,null(),minReject)
| search Event!="LogonReject"
| eval LogoffTime=if(LogoffTime<minLogon,null(),LogoffTime)
| eventstats min(LogoffTime) as minLogoff by UserId PriorInf

| rename COMMENT as "and finally, kill everything that isn't an information flow record"
| search Event="InformationFlow"

There are various other patterns that are quite useful, but the above will help you with your current use case.

Happy splunking!

View solution in original post

DalJeanis
SplunkTrust
SplunkTrust

I find it very annoying that the first two pages in the documentation for join are not about how to use join, but a description of all the different things you can use instead of join.

There's a reason.

When approaching data in splunk, you need to start with the events, not the relations between the events.

THROWING THEM ALL IN THE POT

You want to link each InformationFlow event with certain the other events. So, let's start by throwing all those events in a pot...

index="xyZ" source=events.log ("Event=InformationFlow" OR "Event=Logoff" OR 
                               "Event=Logon" OR "Event=LogonReject" ) 
| rename COMMENT as "limit to the fields we want, and set the time fields based on event"
| fields UserId, Event, EventTime
| eval timetype=Event."Time"
| eval {timetype}=EventTime
| rename COMMENT as "now we have all the events present, and the times set"

That brackets thing {xxx}= is just a tricky way of writing this following chunk of code, which is how we usually do it, because pulling data from differential records isn't normally so simple a pattern. However, you have a very simple fact pattern, so it makes the above elegant little code snippet possible. You might prefer the following, just because it is more "normal" to read....

| rename COMMENT as "this does the same thing as the above stuff, a different way."
| rename COMMENT as "limit to the fields we want, and set the time fields based on event"
| fields UserId, Event, EventTime
| eval InformationFlowTime=if(Event="InformationFlow",EventTime,null())
| eval LogoffTime=if(Event="Logoff",EventTime,null())
| eval LogonTime=if(Event="Logon",EventTime,null())
| eval LogonRejectTime=if(Event="LogonReject",EventTime,null())

Now that they are all in the pot, let's stir, shall we?

STIRRING UNTIL THEY'RE SOUP

We need to figure out which of the various records are relevant. You have said that you only want the first records of each kind AFTER the InformationFlow record. So, let's sort the records into order, copy the latest InformationFlowTime down onto all the following records, and then finally pick the lowest time for each of the relevant types of record.

Some might be null; for example, if there are no LogonReject records between two InformationFlow records, then the LogonRejectTime for that InformationFlow record will be null.

| rename COMMENT as "now we have all the events present, and the times set"
| sort 0 UserId, EventTime
| streamstats prior(InformationFlowTime) as PriorInf  by UserId
| eventstats min(LogoffTime) as minLogoff, min(LogonTime) as minLogon, min(LogonRejectTime) as minReject by UserId PriorInf

| rename COMMENT as "and finally, kill everything that isn't an information flow record"
| search Event="InformationFlow"

MORE COMPLICATED STEWS

Now, if an InformationFlow record was something that happened while someone was logged on, and you needed the immediate prior Logon and the immediate following Logoff, then you'd do the above procedure once in each direction for the relevant type of records. sort -> streamstats -> eventstats -> (sort or reverse) -> streamstats -> eventstats.

You could also do intermediate evals and streamstats or eventstats processing if you only wanted the events that were in a certain order... for instance, if you wanted LogonRejects only before the first successful Logon after each InformationFlow, and you only want Logoffs after the first successful Logon.

| rename COMMENT as "now we have all the events present, and the times set"
| sort 0 UserId, EventTime
| streamstats prior(InformationFlowTime) as PriorInf  by UserId
| eventstats min(LogonTime) as minLogon, min(LogonRejectTime) as minReject by UserId PriorInf
| eval minReject = if (minReject > minLogon,null(),minReject)
| search Event!="LogonReject"
| eval LogoffTime=if(LogoffTime<minLogon,null(),LogoffTime)
| eventstats min(LogoffTime) as minLogoff by UserId PriorInf

| rename COMMENT as "and finally, kill everything that isn't an information flow record"
| search Event="InformationFlow"

There are various other patterns that are quite useful, but the above will help you with your current use case.

Happy splunking!

woodcock
Esteemed Legend

s/annoying/highly agreeable/g

DalJeanis
SplunkTrust
SplunkTrust

@woodcock - (snort) - They annoy me every time I'm looking up a point of join syntax, because I know what verb I need to use and all that relative stuff should be in a linked annex or something.

But, more and more, when I post here to explain that, no, join is not your linkage verb of first resort, I have to sigh at the appropriateness of it all.

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...