I have a question similar to:
https://answers.splunk.com/answers/2602
and
https://answers.splunk.com/answers/448796
I would like to get a search match (for which I define a field) and also search the subsequent daemon log for another search. If the second search repeats x count, then save this field as an Error; otherwise (if search contains < x count but > 0), it's a Warning. If the next line does not contain an Error or a Warning, then it’s a Pass.
The daemon is atftpd and its logs of interest are:
Sep 25 10:58:07 caffeine atftpd[6596]: Serving kernels/vmlinuz to IP:1668
Sep 25 10:58:07 caffeine atftpd[6596]: Serving kernels/vmlinuz to IP:1669
Sep 25 10:58:23 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:28 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:33 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:38 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:43 caffeine atftpd[6596]: timeout: retrying...
Sep 25 11:08:07 caffeine atftpd[6596]: Serving kernels/vmlinuz to anotherIP:1211
There is a deterministic pattern to the timeout: retrying...
entries (every 5 seconds) and also a configurable count (5).
So if I see a Serving...
line followed by exactly 5 retrying...
I know for sure it's a failure.
My search so far saves the IPs and the errors in some fields, but the transaction facility in Splunk returns only the first hit of "timeout":
sourcetype=syslog AND atftpd AND caffeine | rex field=_raw "Serving.* to (?<ip_address>[0-9]*.[0-9]*.[0-9]*.[0-9]*)" | rex field=_raw ".* (?<error>timeout).*" | transaction endswith=(: timeout: retrying...) maxcount=5
I would have assumed that maxcount=5 gave the count of the transaction search match, not the total line count of the previous search.
Try something like this...
| your search that gets the data with either "serving" or "timeout" records
| rename COMMENT as "Put in time order, mark timeout records, copy each onto next record"
| sort 0 _time
| eval Timeout=if(match(_raw,"timeout"),1,0)
| streamstats current=f last(Timeout) as priorTimeout
| rename COMMENT as "Mark as new group if it is the first record or the timeout value changes, calculate the group number"
| eval newgroup=case(isnull(priorTimeout),1, priorTimeout!=Timeout,1, true(),0 )
| streamstats sum(newgroup) as groupno
| rename COMMENT as "Determine how many timeout records are in the group, set to zero if not a timeout group"
| eventstats count as groupcount by groupno
| eval groupcount=if(Timeout=1,groupcount,0)
| rename COMMENT as "Run backwards through the data to copy the number of timeouts onto the PRECEDING serving record."
| reverse
| streamstats current=f last(groupcount) as timeoutCount
| rename COMMENT as "Drop the Timeout records, set the flag ."
| where Timeout=0
| eval Flag=case(timeoutCount>=5,"Error", timeoutCount>0,"Warning", timeoutCount=0,"Pass", true(),"Unknown")