Splunk Search

dynamically rewrite field value by a eval + subsearch

johnnymc
Path Finder

hello, i have a subset of results from a search. i now that if I have a clientIP=x.x.x.x, this is proxied and i need to do another search to the outer proxy logs to retrieve the real IP address of the client; this can be achieved by doing a separate search on another sourcetype in the same exact timestamp.

what i need to do, if possibile, is to rewrite the content of the clientIP value with another retrieved with a subsearch, passing the time range and username field.
something like this (but this one cannot work):

sourcetype=foo 
| eval clientIP=if(clientIP == "x.x.x.x", ([ sourcetype=proxy earliest=$earliest$ latest=$latest$ user=$username$ |fields realIP|rename realIP AS search]), clientIP)

the hard part (for me) is passing the field values to the subsearch.

i need to present the data as a single result, thus i need to to do all the trasnforms inside that search.

thanks in advance

0 Karma

johnnymc
Path Finder

Thank you both for you exhaustive and helpful answers.

After a lot of effort in trial and error, i ended up piping search results to a custom lookup script.

The script does as search by itself, but only if needed; it leaves the field as-is, spitting out the same if the ip is NOT proxied, and not doing anything to preserve resources) .

The script works by passing the old (proxied - in the script below - is 1.2.3.4) SourceIP value, and a timestamp (to be matched against the other log) and a discriminator (in my case, since this is a pop3 authentication, is the customer username).
the _time value needs to be rewritten, so do an eval (see TS below).

Please note that you need to pass an empty value to be populated by the output. you can 'construct' this field in the search by using a eval.

yoursearch | eval TS=_time | lookup RealPopIP TS Account SourceIP OUTPUT RealIP

This took me a lot of time to understand, otherwise it doesn't work.

transform stanza needed:

[RealPopIP]
external_cmd = RealIP.py TS Account SourceIP RealIP
fields_list = TS,Account,SourceIP,RealIP

for sake of completeness: the script works by being passed (via STDIN) by the splunk server a partially filled CSV file, and spitting out a filled csv file by stdout. you also need to pass the field names as arguments (see the former transform above)
the script is not perfect to take it as-is, maybe can be useful to others 🙂

#!/usr/bin/env python
#
# script to change specific field values
# eg. if IP is proxied
# leave the value as is if doesn't match
#
# 21.09.2011
#
import csv,sys,commands,socket
import time,splunk.auth,splunk.search

USER_NAME = 'admin'
PASSWORD = 'password'

def main():
    r = csv.reader(sys.stdin)

    if len(sys.argv) != 5:
        for l in r:
            csv.writer(sys.stdout).writerow(l)
        sys.exit(0)

    # read cmd line arguments
    timestampf = sys.argv[1]
    userf = sys.argv[2]
    sourceipf = sys.argv[3]
    realipf = sys.argv[4]

    w = None
    header = []
    first = True

    # authenticate to splunk server
    key = splunk.auth.getSessionKey(USER_NAME, PASSWORD)

    for line in r:   
        if first:
            header = line
            t=header
            headers=[]

            for e in t:
                e=e.lstrip('[')
                e=e.rstrip(']')
                e=e.replace('\'','')
                e=e.replace(' ','')
                headers.append(e)

            csv.writer(sys.stdout).writerow(header)
            w = csv.DictWriter(sys.stdout, header)

            first = False
            continue

        # Read the result
        result = {}
        i = 0
        while i < len(header):
            if i < len(line):
                result[header[i]] = line[i]
            else:
                result[header[i]] = ''
            i += 1

        if len(result[timestampf]) and len(result[userf]) and len(result[sourceipf]):
            # proxied IP - need to rewrite to real customer IP address
         if result[sourceipf] == "1.2.3.4":
                searchstring = 'search sourcetype="realipsourcetype" clientIP=* user=%s | head 1' % (result[userf])

                # Perform the lookup (splunk search)
                ts=int(result[timestampf])
                job = splunk.search.dispatch(searchstring, earliest_time=ts-15, latest_time=ts+15, required_field_list='clientIP')

                # wait for results
                while not job.isDone:
                   time.sleep(.25)

                result[realipf]=str(job.results[0].fields['clientIP'])

                w.writerow(result)

            else:
                # ip is not proxied, leave as is
                result[realipf]=result[sourceipf]
                w.writerow(result)

        else:
            # format is not correct - spit out as came
            w.writerow(result)

main()

Hope will be useful to others.

Lowell
Super Champion

Oh, you should also be aware of a bug in early 4.1.x releases: http://splunk-base.splunk.com/answers/3238/can-a-custom-search-command-launch-a-splunk-search

0 Karma

johnnymc
Path Finder

thanks a lot, i'll try to use it this way.

0 Karma

Lowell
Super Champion

Just FYI, if you use a custom search command instead of a external "lookup" you can pass in authentication and not have to hard-code your login credentials; which raises security concerns. (I created an app that provides an example of that http://splunk-base.splunk.com/apps/22370/runsavedsearch-alert-action)

Ayn
Legend

Kudos on getting back with such a detailed explanation on how you managed to solve the problem. Nice!

Ayn
Legend

In addition to Lowell's answer that covers the most: from what I've gathered, what you want to achieve cannot be easily/efficiently achieved by issuing searches on indexed data. I've been wishing for a similar feature myself, i.e. something that can perform "lookups" on indexed data as easily as you can perform lookups using .csv files and external scripts.

With manageable volumes of data, Splunk can perform time-based lookups that automatically grab values that were valid for an event's specific time period. Typical use-case is to feed Splunk with a DHCP log with time, IP and MAC address, and then use it as a time-based lookup in order to automatically tie an IP to a MAC address for each event in your searches (as long as the IP address exists in the DHCP log of course). I'm guessing your proxy logs are pretty big so feeding them to Splunk as a lookup csv is far from an ideal solution. Nevertheless, should you want to try it there's information on time-based lookups available in the docs: http://docs.splunk.com/Documentation/Splunk/latest/Knowledge/Addfieldsfromexternaldatasources#Exampl...

Lowell
Super Champion

If I'm understanding what your trying to do correctly, it sounds like you want to launch a sub-search for each and every event returned by your base search (sourcetype=foo). It also looks like you only want the sub-search to run conditionally if you don't have a valid clientIP value. Does that sound correct?

First off, I don't think there is any way to conditionally launch a subsearch. Each subsearch is only run once, and is evaluated and expanded into the main search, and then the main search runs. Now, you can launch a search per-event using the map command, so you may need to do something like that, however in that case, the output from the individual "map" run search is our final output, so you lose all the events from your base search in the output. So, you'll need to do all of this inside of a sub-search and then recombine the two sets of results using a join command, or something...

For the record, this is a complete guess. This would take lots of examples and probably a few hours of messing around to get something that actually works properly...

sourcetype=foo | join type=outer user [ search sourcetype=foo clientIP=="x.x.x.x" | map search="search earliest=$earliest$ latest=$latest$ sourcetype=proxy user=$username$" | fields _time, clientIP, user ]

I'm not sure if you can get join to do the correct time correlation that you need. It's possible that you'll need to do some sort of funky transaction instead (since it's the only search command that lets you pull events together based on a range of time, but there are also a number of down-sides to that approach.)

Good luck 😉

Alternate approaches: Don't forget that you can always programatically call splunk searches. It sounds like you have something complex enough that it may warrant that kind of effort, and that would give you FULL control. It's also possible that generating lookups periodically could be a much better way to handle this.

Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...