Hi - I have a data source which is ingested regularly via DB Connect. When indexed it has the same sourcetype and source.
I need to report on the latest set of events included from the last ingestion of that data source.
I can't simply set earliest time to be -1d as there are times when the ingestion runs will not generate events. So I need to search back over a 7 day period, find the time range for the last set of events ingested and then report on these events. In the example below I need to report on the events ingested between 2018-03-19 15:00:00 and 2018-03-19 15:00:01
index=stk source=sktest earliest=-7d | stats count by source , _time
source _time counts
sktest 2018-03-19 15:00:01 1800
sktest 2018-03-19 15:00:00 220
sktest 2018-03-17 15:00:01 200
sktest 2018-03-16 15:00:01 1623
sktest 2018-03-16 15:00:00 478
So I had been looking at a sub-search to capture the time range - its easy enough to get the latest event time in the range but not found a way to capture the actual value for the earliest time. I've got a workaround which uses the latest time and subtracts 30 seconds to give me a 30 second window which I can search for events on.
index=sk source=sksource [ |tstats latest(_time) as ltime where index=stk source=sksource earliest=-7d@
| eval ltime=etime-30
| return earliest=etime latest=ltime ]
However I would like to somehow capture the earliest and latest times using the indexed timestamps rather than using a static value of 30 seconds. My concern is that over time 30 second might not be sufficient and this time period would need to be regularly revisited.
Appreciate any comments.
could you add a bit of metadata when you execute the scheduled batch from dbconnect? say add another column to the output results based on now converted to epoch:
select A, B, DATEDIFF(SECOND, '19000101', GETDATE()) as JobExecutionTimeID
then in your splunk search since JobExecutionTimeID will be an additional field:
yoursearch | eventstats max(JobExecutionTimeID) as LatestRun | where JobExecutionTimeID=LatestRun
OR prehaps you can do something like this:
yoursearch | stats count by source , _time | sort _time desc | head 2 | eventstats max(_time) as ltime, min(_time) as etime