hello, i have a subset of results from a search. i now that if I have a clientIP=x.x.x.x, this is proxied and i need to do another search to the outer proxy logs to retrieve the real IP address of the client; this can be achieved by doing a separate search on another sourcetype in the same exact timestamp.
what i need to do, if possibile, is to rewrite the content of the clientIP value with another retrieved with a subsearch, passing the time range and username field.
something like this (but this one cannot work):
sourcetype=foo
| eval clientIP=if(clientIP == "x.x.x.x", ([ sourcetype=proxy earliest=$earliest$ latest=$latest$ user=$username$ |fields realIP|rename realIP AS search]), clientIP)
the hard part (for me) is passing the field values to the subsearch.
i need to present the data as a single result, thus i need to to do all the trasnforms inside that search.
thanks in advance
Thank you both for you exhaustive and helpful answers.
After a lot of effort in trial and error, i ended up piping search results to a custom lookup script.
The script does as search by itself, but only if needed; it leaves the field as-is, spitting out the same if the ip is NOT proxied, and not doing anything to preserve resources) .
The script works by passing the old (proxied - in the script below - is 1.2.3.4) SourceIP value, and a timestamp (to be matched against the other log) and a discriminator (in my case, since this is a pop3 authentication, is the customer username).
the _time value needs to be rewritten, so do an eval (see TS below).
Please note that you need to pass an empty value to be populated by the output. you can 'construct' this field in the search by using a eval.
yoursearch | eval TS=_time | lookup RealPopIP TS Account SourceIP OUTPUT RealIP
This took me a lot of time to understand, otherwise it doesn't work.
transform stanza needed:
[RealPopIP]
external_cmd = RealIP.py TS Account SourceIP RealIP
fields_list = TS,Account,SourceIP,RealIP
for sake of completeness: the script works by being passed (via STDIN) by the splunk server a partially filled CSV file, and spitting out a filled csv file by stdout. you also need to pass the field names as arguments (see the former transform above)
the script is not perfect to take it as-is, maybe can be useful to others 🙂
#!/usr/bin/env python
#
# script to change specific field values
# eg. if IP is proxied
# leave the value as is if doesn't match
#
# 21.09.2011
#
import csv,sys,commands,socket
import time,splunk.auth,splunk.search
USER_NAME = 'admin'
PASSWORD = 'password'
def main():
r = csv.reader(sys.stdin)
if len(sys.argv) != 5:
for l in r:
csv.writer(sys.stdout).writerow(l)
sys.exit(0)
# read cmd line arguments
timestampf = sys.argv[1]
userf = sys.argv[2]
sourceipf = sys.argv[3]
realipf = sys.argv[4]
w = None
header = []
first = True
# authenticate to splunk server
key = splunk.auth.getSessionKey(USER_NAME, PASSWORD)
for line in r:
if first:
header = line
t=header
headers=[]
for e in t:
e=e.lstrip('[')
e=e.rstrip(']')
e=e.replace('\'','')
e=e.replace(' ','')
headers.append(e)
csv.writer(sys.stdout).writerow(header)
w = csv.DictWriter(sys.stdout, header)
first = False
continue
# Read the result
result = {}
i = 0
while i < len(header):
if i < len(line):
result[header[i]] = line[i]
else:
result[header[i]] = ''
i += 1
if len(result[timestampf]) and len(result[userf]) and len(result[sourceipf]):
# proxied IP - need to rewrite to real customer IP address
if result[sourceipf] == "1.2.3.4":
searchstring = 'search sourcetype="realipsourcetype" clientIP=* user=%s | head 1' % (result[userf])
# Perform the lookup (splunk search)
ts=int(result[timestampf])
job = splunk.search.dispatch(searchstring, earliest_time=ts-15, latest_time=ts+15, required_field_list='clientIP')
# wait for results
while not job.isDone:
time.sleep(.25)
result[realipf]=str(job.results[0].fields['clientIP'])
w.writerow(result)
else:
# ip is not proxied, leave as is
result[realipf]=result[sourceipf]
w.writerow(result)
else:
# format is not correct - spit out as came
w.writerow(result)
main()
Hope will be useful to others.
Oh, you should also be aware of a bug in early 4.1.x releases: http://splunk-base.splunk.com/answers/3238/can-a-custom-search-command-launch-a-splunk-search
thanks a lot, i'll try to use it this way.
Just FYI, if you use a custom search command instead of a external "lookup" you can pass in authentication and not have to hard-code your login credentials; which raises security concerns. (I created an app that provides an example of that http://splunk-base.splunk.com/apps/22370/runsavedsearch-alert-action)
Kudos on getting back with such a detailed explanation on how you managed to solve the problem. Nice!
In addition to Lowell's answer that covers the most: from what I've gathered, what you want to achieve cannot be easily/efficiently achieved by issuing searches on indexed data. I've been wishing for a similar feature myself, i.e. something that can perform "lookups" on indexed data as easily as you can perform lookups using .csv files and external scripts.
With manageable volumes of data, Splunk can perform time-based lookups that automatically grab values that were valid for an event's specific time period. Typical use-case is to feed Splunk with a DHCP log with time, IP and MAC address, and then use it as a time-based lookup in order to automatically tie an IP to a MAC address for each event in your searches (as long as the IP address exists in the DHCP log of course). I'm guessing your proxy logs are pretty big so feeding them to Splunk as a lookup csv is far from an ideal solution. Nevertheless, should you want to try it there's information on time-based lookups available in the docs: http://docs.splunk.com/Documentation/Splunk/latest/Knowledge/Addfieldsfromexternaldatasources#Exampl...
If I'm understanding what your trying to do correctly, it sounds like you want to launch a sub-search for each and every event returned by your base search (sourcetype=foo
). It also looks like you only want the sub-search to run conditionally if you don't have a valid clientIP
value. Does that sound correct?
First off, I don't think there is any way to conditionally launch a subsearch. Each subsearch is only run once, and is evaluated and expanded into the main search, and then the main search runs. Now, you can launch a search per-event using the map
command, so you may need to do something like that, however in that case, the output from the individual "map" run search is our final output, so you lose all the events from your base search in the output. So, you'll need to do all of this inside of a sub-search and then recombine the two sets of results using a join
command, or something...
For the record, this is a complete guess. This would take lots of examples and probably a few hours of messing around to get something that actually works properly...
sourcetype=foo | join type=outer user [ search sourcetype=foo clientIP=="x.x.x.x" | map search="search earliest=$earliest$ latest=$latest$ sourcetype=proxy user=$username$" | fields _time, clientIP, user ]
I'm not sure if you can get join
to do the correct time correlation that you need. It's possible that you'll need to do some sort of funky transaction
instead (since it's the only search command that lets you pull events together based on a range of time, but there are also a number of down-sides to that approach.)
Good luck 😉
Alternate approaches: Don't forget that you can always programatically call splunk searches. It sounds like you have something complex enough that it may warrant that kind of effort, and that would give you FULL control. It's also possible that generating lookups periodically could be a much better way to handle this.