Splunk Search

Join closest event from different logs

IVV
Path Finder

Hello everyone!
The problem: I want to identify users who use SSH with login other than their own. I have two logs:
1) SSH log like
timestamp="1111120" yyy.yyy.yyy.yyy sshd[1737]: Accepted publickey for user1 from xxx.xxx.xxx.xxx port 41902 ssh2
2) Firewall log like
timestamp="1111111" login="user1" assigned_ip="xxx.xxx.xxx.xxx"
Now it works using the query below

 sourcetype="sshd" "Accepted"
| rex field=_raw  " (?<srv_ip>\S+) sshd" 
| lookup dnslookup clientip AS srv_ip OUTPUT clienthost as  fqdn 
| rex field=_raw  "for (?<local_user>\S+) from" 
| rex field=_raw  "from (?<src_ip>\S+)"
| join type=inner src_ip [search sourcetype="firewall_logs"  
|  rename assigned_ip as src_ip  
|  stats max(_time) as fwtimestamp  by login,src_ip  
| convert timeformat="%m/%d/%Y %H:%M:%S" ctime(fwtimestamp) as fwtime  
| sort -fwtimestamp]  
| convert timeformat="%m/%d/%Y %H:%M:%S" ctime(accesstimestamp) as accesstime
| fields - accesstimestamp  fwtimestamp 
| where fwtime<accesstime 
| where local_user!="root" 
| where login!=local_user

But it misses a lot of events because of using 'stats max'. So I need to compare ssh event to firewall event by closest same IP lease.
Example:
time=5 ip=ip1 login=man1
time=7 ip=ip2 login=man2
time=9 ip=ip2 login=man2
time=10 ssh_ip=ip_ssh ssh_login=evil ip=ip2
time=15 ip=ip2 login=man2
Result should be something like this:
fw_time=9 ssh_time=10 ip=ip2 login=man2 ssh_ip=ip_ssh ssh_login=evil

How can I do it?
Thank you.

Tags (2)
0 Karma

anuremanan88
Explorer

Did you get this working? I would like to know how it works

0 Karma

woodcock
Esteemed Legend

Your problems are surely due to the limits on the number of events returned by subsearches. You can do the join without join (and thus without the subsearch and its lmits) and I strongly encourage you to do so. This search will do the join and enhance event data with the field I think you need:

(sourcetype="sshd" "Accepted") OR (sourcetype="firewall_logs") | rex " (?<srv_ip>\S+) sshd.*? for (?<local_user>\S+) from (?<src_ip>\S+)" | lookup dnslookup clientip AS srv_ip OUTPUT clienthost AS fqdn | rename assigned_ip as src_ip | eventstats dc(sourcetype) AS sourcetypes by src_ip | search sourcetypes>=2 | eventstats max(_time) AS maxTimeBySTandLogin BY sourcetype, login

At this point you only have events which share the same value of src_ip (events with any particular src_ip that do not show up in each sourcetype are filtered out) and you have enhanced each event to gain a field maxTimeBySTandLogin which contains the highest/latest/most-recent time among groups of events which share the same sourcetype and login. In other words, within each sourcetype dataset, each event possesses its own copy the (shared) max(_time) which was assessed by comparing all events with the same login. From here, you should be able to modify until you get exactly what you would like. If you can't get there on your own, do explain exactly what the post-join logic is supposed to do (it is not at clear to me because some of your explanation diverges from itself).

0 Karma

IVV
Path Finder

In the first part of the original query max() function was used only to limit the number of events before joining. It is not the best solution because we can miss some events, but for the sake of simplicity let us not consider ones.

0 Karma

IVV
Path Finder

Thank you for your answer. I'll try to be more accurate. I have two types of logs:
1) SSH access log (sourcetype="sshd"). Examples:
May 12 23:59:59 SRV1_IP sshd[11720]: Accepted publickey for SSH_LOGIN1 from SSH_CLIENT1_IP port 65093 ssh2

May 12 23:59:59 SRV2_IP sshd[4441]: Accepted publickey for SSH_LOGIN2 from SSH_CLIENT2_IP port 53704 ssh2
You can get needed fields using | rex " (?<srv_ip>\S+) sshd.*? for (?<local_user>\S+) from (?<src_ip>\S+)" as you did.
2) Firewall logs (sourcetype="firewall_logs"). Examples:
timestamp="1431464386" event="ADD (post-auth)" login="LOGIN1" assigned_ip="CLIENT1_IP"
timestamp="1431464446" event="ADD (post-auth)" login="LOGIN2" assigned_ip="CLIENT2_IP"

Users should use their own logins for ssh authentication. But they can use other ones. In such cases they can use other laptop (with IP for different login), or maybe they are hacked. In both cases login!=ssh_login, but client_ip==ssh_client_ip. So I want to know and alert about such cases. The algorithm is:
1) Get SSH events where we can see SSH_CLIENT_IP, SRV_IP and SSH_LOGIN. It is events with accepting connections.
2) Then we need to know what LOGIN SSH_CLIENT_IP has. That's why we need to merge every event from clause 1 with firewall event, where SSH_EVENTTIME > FW_EVENTTIME for paticular SSH_CLIEN_IP (which equal to CLIENT_IP from fw logs) and FW_EVENTTIME is the nearest to SSH_EVENTTIME.
3) After merging we can get events where SSH_LOGIN!=LOGIN.

0 Karma

IVV
Path Finder

I can use usetime=true earlier=true to make timebased join. But when I do something like:
sourcetype="sshd" "Accepted"| rex field=_raw " (?<srv_ip>\S+) sshd" | lookup dnslookup clientip AS srv_ip OUTPUT clienthost as fqdn | rex field=_raw "for (?<local_user>\S+) from" | rex field=_raw "from (?<src_ip>\S+)" | stats max(_time) as accesstimestamp by src_ip,fqdn,local_user | join type=inner usetime=true earlier=true src_ip [search sourcetype="firewall_logs" | rename assigned_ip as src_ip | eval fwtimestamp=_time | table fwtimestamp,login,src_ip | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(fwtimestamp) as fwtime | sort -fwtimestamp] | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(accesstimestamp) as accesstime| fields - accesstimestamp fwtimestamp | where local_user!="root" | where login!=local_user
I can see results where fwtime>accesstime. But it shouldn't be. What's the reason?

To make it more lightweight. I use the following query (similar example):
sourcetype="web_access" | stats max(_time) as accesstime by src_ip,web_login | join type=inner usetime=true earlier=true [ search sourcetype="fw_logs" | eval fwtime=_time | table fwtime,login,src_ip] | where web_login!=login
and I can see results where fwtime>accesstime. That's weird.

Maybe stats have no information about time?

0 Karma
Get Updates on the Splunk Community!

.conf24 | Registration Open!

Hello, hello! I come bearing good news: Registration for .conf24 is now open!   conf is Splunk’s rad annual ...

ICYMI - Check out the latest releases of Splunk Edge Processor

Splunk is pleased to announce the latest enhancements to Splunk Edge Processor.  HEC Receiver authorization ...

Introducing the 2024 SplunkTrust!

Hello, Splunk Community! We are beyond thrilled to announce our newest group of SplunkTrust members!  The ...