We have two kind of logs for our system:
First one logs all the user sessions with user name, src ip, dst ip, and login/logout time.
Jun 22 10:11:00 : UserSession: User=user1, SRC=192.168.0.1, Login=2017-06-22 10:00:00, Logout=2017-06-22 10:10:00
Jun 22 10:21:00 : UserSession: User=user2, SRC=192.168.0.2, Login=2017-06-22 10:11:00, Logout=2017-06-22 10:20:00
Jun 22 10:31:00 : UserSession: User=user3, SRC=192.168.0.1, Login=2017-06-22 10:21:00, Logout=2017-06-22 10:30:00
Jun 22 10:41:00 : UserSession: User=user1, SRC=192.168.0.1, Login=2017-06-22 10:31:00, Logout=2017-06-22 10:40:00
Jun 22 10:51:00 : UserSession: User=user1, SRC=192.168.0.4, Login=2017-06-22 10:41:00, Logout=2017-06-22 10:50:00
SRC IP above comes from a pool, and can be reassigned to another user, if it's not being used by anyone else at the time.
Another log is from IPTable, and lets say logs src and dst ip for each tcp connection for all users between their login/logout time.
1. Jun 22 10:04:00 Iptable: SRC=192.168.0.1 DST=10.0.0.10
2. Jun 22 10:06:00 Iptable: SRC=192.168.0.1 DST=10.0.0.10
3. Jun 22 10:09:00 Iptable: SRC=192.168.0.1 DST=10.0.0.11
4. Jun 22 10:15:00 Iptable: SRC=192.168.0.2 DST=10.0.0.12
5. Jun 22 10:25:00 Iptable: SRC=192.168.0.1 DST=10.0.0.13
6. Jun 22 10:35:00 Iptable: SRC=192.168.0.1 DST=10.0.0.13
7. Jun 22 10:44:00 Iptable: SRC=192.168.0.4 DST=10.0.0.10
8. Jun 22 10:45:00 Iptable: SRC=192.168.0.4 DST=10.0.0.14
Now I want to correlate these two logs on SRC, and get a table with number of src to dst ip connections for each user. Note that each connection is valid only between login and logout time.
So query should be something like:
join UserSession Iptable where UserSession.SRC == Iptable.SRC and (Iptable._time>Login and Iptable._time and then calculate Num_connections as unique set of User,SRC,DST.
Finally, I should get connection stats like this:
user1 192.168.0.1 10.0.0.10 3 (IPTable logs 1,2 and 7 connections belong here)
user1 192.168.0.1 10.0.0.11 1 (IPTable log 3)
user1 192.168.0.1 10.0.0.14 1 (IPTable log 😎
user2 192.168.0.2 10.0.0.12 2 (IPTable logs 4,5)
user3 192.168.0.1 10.0.0.13 1 (IPTable log 6)
I have been scratching my head over this since last few days, and I couldn't find any doc on joining based on a specific condition. Any help is appreciated.
Maybe like this:
(index=YouShouldAlwaysSpecifyAnIndes sourcetype=AndAlsoAlwaysSourcetype UserSession stuff here) AND (index=SecondIndexHere sourcetype=SecondSourcetypeHere IPtable stuff here)
| eval SRC=coalesce(UserSession.SRC, Iptable.SRC)
| reverse
| streamstats last(User) AS lastUser BY SRC
| search sourcetype=SecondSourcetypeHere
| rename Iptable.DST AS DST
| stats count AS Num_connections BY User SRC DST Num_connections
This pulls the User
value from the UserSession
set and merges it into the associated IPtable
set and then discards the UserSession
events. Give it a try and it should do what you need even though we are not explicitly coding in the "time" conditions (that should be done by the reverse
and streamstats
part).
Hang on; I am updating my answer...
Updated now.
Try this (won't be efficient)
your first search get user sessions
| join max=0 SRC [search your second search to get IPTable data | rename _time as iptabletime ]
| rename COMMENT as "Above join will get all records for that SRC in the main search so youll now apply filter to keep relevant rows"
| where (iptabletime >=strptime(Login,"%Y-%m-%d %H:%M:%S") AND iptabletime <strptime(Logout,"%Y-%m-%d %H:%M:%S"))
Splunk had join function since long time. Problem is, searches can be joined only on a field, but I want to pass a condition to it. Please read the complete question. I can clarify the question more if you want.
Hi,
I hope you're at 6.6 already because Splunk introduced the join command: http://docs.splunk.com/Documentation/Splunk/6.6.1/SearchReference/Join
Which does quite the same thing as a join does in SQL where you can join on a specified field.
Skalli