Splunk Search

Join two searches based on a condition

rakes568
Explorer

We have two kind of logs for our system:
First one logs all the user sessions with user name, src ip, dst ip, and login/logout time.
Jun 22 10:11:00 : UserSession: User=user1, SRC=192.168.0.1, Login=2017-06-22 10:00:00, Logout=2017-06-22 10:10:00
Jun 22 10:21:00 : UserSession: User=user2, SRC=192.168.0.2, Login=2017-06-22 10:11:00, Logout=2017-06-22 10:20:00
Jun 22 10:31:00 : UserSession: User=user3, SRC=192.168.0.1, Login=2017-06-22 10:21:00, Logout=2017-06-22 10:30:00
Jun 22 10:41:00 : UserSession: User=user1, SRC=192.168.0.1, Login=2017-06-22 10:31:00, Logout=2017-06-22 10:40:00
Jun 22 10:51:00 : UserSession: User=user1, SRC=192.168.0.4, Login=2017-06-22 10:41:00, Logout=2017-06-22 10:50:00

SRC IP above comes from a pool, and can be reassigned to another user, if it's not being used by anyone else at the time.

Another log is from IPTable, and lets say logs src and dst ip for each tcp connection for all users between their login/logout time.
1. Jun 22 10:04:00 Iptable: SRC=192.168.0.1 DST=10.0.0.10
2. Jun 22 10:06:00 Iptable: SRC=192.168.0.1 DST=10.0.0.10
3. Jun 22 10:09:00 Iptable: SRC=192.168.0.1 DST=10.0.0.11
4. Jun 22 10:15:00 Iptable: SRC=192.168.0.2 DST=10.0.0.12
5. Jun 22 10:25:00 Iptable: SRC=192.168.0.1 DST=10.0.0.13
6. Jun 22 10:35:00 Iptable: SRC=192.168.0.1 DST=10.0.0.13
7. Jun 22 10:44:00 Iptable: SRC=192.168.0.4 DST=10.0.0.10
8. Jun 22 10:45:00 Iptable: SRC=192.168.0.4 DST=10.0.0.14

Now I want to correlate these two logs on SRC, and get a table with number of src to dst ip connections for each user. Note that each connection is valid only between login and logout time.

So query should be something like:
join UserSession Iptable where UserSession.SRC == Iptable.SRC and (Iptable._time>Login and Iptable._time and then calculate Num_connections as unique set of User,SRC,DST.
Finally, I should get connection stats like this:

User SRC DST Num_connections

user1 192.168.0.1 10.0.0.10 3 (IPTable logs 1,2 and 7 connections belong here)
user1 192.168.0.1 10.0.0.11 1 (IPTable log 3)
user1 192.168.0.1 10.0.0.14 1 (IPTable log 😎
user2 192.168.0.2 10.0.0.12 2 (IPTable logs 4,5)
user3 192.168.0.1 10.0.0.13 1 (IPTable log 6)

I have been scratching my head over this since last few days, and I couldn't find any doc on joining based on a specific condition. Any help is appreciated.

0 Karma

woodcock
Esteemed Legend

Maybe like this:

(index=YouShouldAlwaysSpecifyAnIndes sourcetype=AndAlsoAlwaysSourcetype UserSession stuff here) AND (index=SecondIndexHere sourcetype=SecondSourcetypeHere IPtable stuff here)
| eval SRC=coalesce(UserSession.SRC, Iptable.SRC)
| reverse
| streamstats last(User) AS lastUser BY SRC
| search sourcetype=SecondSourcetypeHere
| rename Iptable.DST AS DST
| stats count AS Num_connections BY User SRC DST Num_connections

This pulls the User value from the UserSession set and merges it into the associated IPtable set and then discards the UserSession events. Give it a try and it should do what you need even though we are not explicitly coding in the "time" conditions (that should be done by the reverse and streamstats part).

0 Karma

woodcock
Esteemed Legend

Hang on; I am updating my answer...

0 Karma

woodcock
Esteemed Legend

Updated now.

0 Karma

somesoni2
SplunkTrust
SplunkTrust

Try this (won't be efficient)

your first search get user sessions
| join max=0 SRC [search your second search to get IPTable data | rename _time as iptabletime ]
| rename COMMENT as "Above join will get all records for that SRC in the main search so youll now apply filter to keep relevant rows"
| where  (iptabletime >=strptime(Login,"%Y-%m-%d %H:%M:%S")  AND iptabletime <strptime(Logout,"%Y-%m-%d %H:%M:%S"))

rakes568
Explorer

Splunk had join function since long time. Problem is, searches can be joined only on a field, but I want to pass a condition to it. Please read the complete question. I can clarify the question more if you want.

0 Karma

skalliger
SplunkTrust
SplunkTrust

Hi,

I hope you're at 6.6 already because Splunk introduced the join command: http://docs.splunk.com/Documentation/Splunk/6.6.1/SearchReference/Join

Which does quite the same thing as a join does in SQL where you can join on a specified field.

Skalli

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...