All Apps and Add-ons

REST API Modular Input: How to implement a custom response handler to deal with pagination limits pulling from AirTable REST API?

gvmorley
Contributor

Hi All,

I've been trying to pull together a proof of concept using Splunk and a product called Airtable (Can't post URLs, but: https://airtable.com).

They have a RESTful API that allows records to be accessed, which is great for use with the REST API Modular Input.

The API structure is:

https://api.airtable.com/v0/[Base]/[Table]

I'm 90% there, but I'm struggling with a 'pagination' problem.

The Airtable REST API allows you to pull all of the records in a table, but limits results to a maximum of 100. If there are more results than this, then the response includes an 'offset' field in the JSON response. You then need to issue the request again, with this as a parameter.

I know very little about JSON and even less about Python, so I don't have the first clue about how to implement a Custom Response Handler to deal with this.

So far, thanks to previous questions and answers, I've got this far with my Response Handler:

class AirTableEventHandler:

    def __init__(self,**args):
        pass

    def __call__(self, response_object,raw_response_output,response_type,req_args,endpoint):
        if response_type == "json":
            output = json.loads(raw_response_output)
            for record in output["records"]:
                print_xml_stream(json.dumps(record))
        else:
            print_xml_stream(raw_response_output)

This works for breaking up the results into Events, but doesn't begin to deal with the pagination question.

I've truncated the output to 4 records, but this is the response that you'd actually get back when there are more than 100 records:

{"records":[{"id":"rec4rlLs4jDeqhFRR","fields":{"Outlet":"20-001","Location":["rec5lh6NgFH33y6ax"],"Rack":["recUOCOlAycENRdo5"],"Type":"Wall Socket","Line checked":true,"Found":true,"On-Drawing":true,"Location_Static":"AP.1.004","Rack_Static":"Rack 20","Location Name":["Physiotherapy Area (Male)"]},"createdTime":"2016-08-22T08:27:05.000Z"},{"id":"recrFjR0pyu2RIjp6","fields":{"Outlet":"20-002","Location":["rec5lh6NgFH33y6ax"],"Rack":["recUOCOlAycENRdo5"],"Type":"Wall Socket","Line checked":true,"Found":true,"On-Drawing":true,"Location_Static":"AP.1.004","Rack_Static":"Rack 20","Location Name":["Physiotherapy Area (Male)"]},"createdTime":"2016-08-22T08:27:05.000Z"},{"id":"recQxL1aNe7wdpM4g","fields":{"Outlet":"20-003","Location":["rec5lh6NgFH33y6ax"],"Rack":["recUOCOlAycENRdo5"],"Type":"High Socket","Line checked":true,"Found":true,"On-Drawing":true,"Location_Static":"AP.1.004","Rack_Static":"Rack 20","Location Name":["Physiotherapy Area (Male)"]},"createdTime":"2016-08-22T08:27:05.000Z"},{"id":"recLK0w2w1HTF1Jwu","fields":{"Outlet":"20-004","Location":["rec5lh6NgFH33y6ax"],"Rack":["recUOCOlAycENRdo5"],"Type":"High Socket","Found":true,"On-Drawing":true,"Location_Static":"AP.1.004","Rack_Static":"Rack 20","Location Name":["Physiotherapy Area (Male)"]},"createdTime":"2016-08-22T08:27:05.000Z"}],"offset":"itrATjKH4HwrwifHj/rectLCyqqrnhD0jr9"}

The crucial bit here being the:

"offset":"itrBTjKH4HwrwifHj/rectLCyqqrnhD0jr9"

This changes with each 100 record response.

So to get the next 100 results, I'd need to do something like:

https://api.airtable.com/v0/[Base]/[Table]?offset=itrBTjKH4HwrwifHj/rectLCyqqrnhD0jr9

(And then the rest of the "URL Arguments" as defined in the Modular Input)

Would anyone be able to suggest a Custom Response Handler that could iterate over this offset key, to capture all of the results?

In general, Airtable looks to be a very flexible DB solution for some SMB use-cases, so would be great if I could get this working with Splunk.

Thanks in advance,

Graham.

0 Karma
1 Solution

Damien_Dallimor
Ultra Champion

Try something along these lines (may need tweaking/debugging) :

class AirTableEventHandler2:

     def __init__(self,**args):
         pass

     def __call__(self, response_object,raw_response_output,response_type,req_args,endpoint):
         if response_type == "json":
             output = json.loads(raw_response_output)

             #first response
             for record in output["records"]:
                 print_xml_stream(json.dumps(record))

             offset = output["offset"]   
             #pagination loop    
             while offset is not None:

                 next_url = response_object.url+'?offset='+offset
                 next_response = requests.get(next_url)
                 output = json.loads(next_response.text)
                 #print out results from pagination looping
                 for record in output["records"]:
                     print_xml_stream(json.dumps(record))
                 #hopefully (guessing) at the end of the pagination , there will be
                 #no more "offset" values in the JSON response , so this will cause the while
                 #loop to exit    
                 offset = output["offset"]


         else:
             print_xml_stream(raw_response_output)

View solution in original post

Damien_Dallimor
Ultra Champion

Try something along these lines (may need tweaking/debugging) :

class AirTableEventHandler2:

     def __init__(self,**args):
         pass

     def __call__(self, response_object,raw_response_output,response_type,req_args,endpoint):
         if response_type == "json":
             output = json.loads(raw_response_output)

             #first response
             for record in output["records"]:
                 print_xml_stream(json.dumps(record))

             offset = output["offset"]   
             #pagination loop    
             while offset is not None:

                 next_url = response_object.url+'?offset='+offset
                 next_response = requests.get(next_url)
                 output = json.loads(next_response.text)
                 #print out results from pagination looping
                 for record in output["records"]:
                     print_xml_stream(json.dumps(record))
                 #hopefully (guessing) at the end of the pagination , there will be
                 #no more "offset" values in the JSON response , so this will cause the while
                 #loop to exit    
                 offset = output["offset"]


         else:
             print_xml_stream(raw_response_output)

gvmorley
Contributor

Hi Damien,

Thanks so much for taking a look at this for me.

The logic makes sense. I had to add

import requests

at the top of responsehandlers.py for it to initially run.

Interestingly it does 2 iterations (so pulls 200 'records'), but that's it. Looking in the splunkd.log I get:

08-26-2016 08:38:55.457 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py" Exception in thread Thread-1:
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py" Traceback (most recent call last):
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"   File "/opt/splunk/lib/python2.7/threading.py", line 801, in __bootstrap_inner
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"     self.run()
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"   File "/opt/splunk/lib/python2.7/threading.py", line 754, in run
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"     self.__target(*self.__args, **self.__kwargs)
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"   File "/opt/splunk/etc/apps/rest_ta/bin/rest.py", line 521, in do_run
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"     handle_output(r,r.text,response_type,req_args,endpoint)
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"   File "/opt/splunk/etc/apps/rest_ta/bin/rest.py", line 614, in handle_output
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"     RESPONSE_HANDLER_INSTANCE(response,output,type,req_args,endpoint)
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"   File "/opt/splunk/etc/apps/rest_ta/bin/responsehandlers.py", line 69, in __call__
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py"     for record in output["records"]:
08-26-2016 08:38:55.458 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py" KeyError: 'records'

So does that mean that some part of the while loop logic isn't working?

Regards,

Graham

0 Karma

gvmorley
Contributor

OK,

After a bit of trail and error, I needed to change:

        next_url = response_object.url+'?offset='+offset

to:

        next_url = response_object.url+'&offset='+offset

So replacing the '?' with '&'.

It will now pull all of the records that I'm expecting (4 REST requests/iterations in total). But then doesn't run at the next polling interval.

The splunkd.log has this error:

08-26-2016 09:17:05.240 +0100 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/rest_ta/bin/rest.py" KeyError: 'offset'

So it looks like the process bombs out due to the missing 'offset' key (which is correct - it's not there in the last iteration).

Therefore just need a way to exit cleanly at the end of the while loop.

0 Karma

Damien_Dallimor
Ultra Champion

Just put some logic in to check for when there is no "offset" key in the JSON.

if "offset" in output:
    offset = output["offset"]
else:
    offset = None

gvmorley
Contributor

That's the one!

Working great now.

Thanks again for your support with this. I really appreciate it.

I'm completely new to Python, so it's been a bit of a voyage of discovery.

The final version that's working for me is:

class AirTableEventHandler:

    def __init__(self,**args):
        pass

    def __call__(self, response_object,raw_response_output,response_type,req_args,endpoint):
        if response_type == "json":
            output = json.loads(raw_response_output)

            #Print results from the first response
            for record in output["records"]:
                print_xml_stream(json.dumps(record))

            offset = output["offset"]

            #Create the pagination loop
            while offset is not None:

                #Construct and Get the next URL
                next_url = response_object.url+'&offset='+offset
                next_response = requests.get(next_url)
                output = json.loads(next_response.text)

                #Print out results from pagination looping
                for record in output["records"]:
                    print_xml_stream(json.dumps(record))

                #There is no "offset" value in the last JSON response.
                #Need to check for its existence, otherwise get KeyError
                #If doesn't exist, set to None and the while loop will exit
                if "offset" in output:
                    offset = output["offset"]
                else:
                    offset = None

        else:
            print_xml_stream(raw_response_output)

cudgel
Path Finder

I have tried implementing this code for another REST input (a Cloudflare endpoint) and it does not work after the first page of results. The error I receive is below:

RESPONSE_HANDLER_INSTANCE(response,output,type,req_args,endpoint)
for result in output["result"]:
handle_output(r,r.text,response_type,req_args,endpoint)
self.__target(*self.__args, **self.__kwargs)
self.run()
File "/opt/splunk/etc/apps/rest_ta/bin/responsehandlers.py", line 171, in __call__
File "/opt/splunk/etc/apps/rest_ta/bin/rest.py", line 521, in do_run
File "/opt/splunk/etc/apps/rest_ta/bin/rest.py", line 616, in handle_output
File "/opt/splunk/lib/python2.7/threading.py", line 754, in run
File "/opt/splunk/lib/python2.7/threading.py", line 801, in __bootstrap_inner
Exception in thread Thread-1:
Traceback (most recent call last):
TypeError: 'NoneType' object is not iterable

I am guessing the inner loop is not getting some of the settings from the original input, specifically the HTTP header properties Cloudflare uses for authentication.

0 Karma

gvmorley
Contributor

Just a follow-up to my initial question.

Thanks to a suggestion from Chester McLaughlin at the Airtable Community forum (https://community.airtable.com/t/list-records-without-pagination/1464/4)

I'm now using this as the Custom Response Handler:

class AirTableEventHandler2:

    def __init__(self,**args):
        pass

    def __call__(self, response_object,raw_response_output,response_type,req_args,endpoint):
        if response_type == "json":
            output = json.loads(raw_response_output)
            for record in output["records"]:
                print_xml_stream(json.dumps(record))

            if not "params" in req_args:
                req_args["params"] = {}

            if output["offset"] is not None:
                req_args["params"]["offset"] = output["offset"]

        else:
            print_xml_stream(raw_response_output)

The last piece of the puzzle, is that the 'offset' parameter gets added into the 'URL Arguments' field on the Data Input (as seen in the GUI).

This then breaks to process, as at the next Data Input polling interval, this is used as the starting point.

Is there a way to 'default' this parameter or stop it being added into the Data Input URL Arguments?

Thanks,

Graham

0 Karma
Get Updates on the Splunk Community!

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...

What's new in Splunk Cloud Platform 9.1.2312?

Hi Splunky people! We are excited to share the newest updates in Splunk Cloud Platform 9.1.2312! Analysts can ...

What’s New in Splunk Security Essentials 3.8.0?

Splunk Security Essentials (SSE) is an app that can amplify the power of your existing Splunk Cloud Platform, ...