...
Part 1 (db tables & units)
- create "
TaskQueueRunner
ITaskQueueRunner
" interface with "processTaskQueueRun(kDBItem $task_queue_run)
" method - create "
TaskQueue
" database table with following columns:Id
QueuedOn
- when task was queuedQueuedById
- who queued the taskScheduledOn
- when task needs to be processed (set at queuing time)TaskClass
- FQCN of PHP class, which is responsible for processing this task queue record (must implement "ITaskQueueRunner
" interface)TaskData
- JSON-encoded data, that is needed for task execution (e.g. e-mail recipient, IDs of records to be processed)LastStatus
- status from last attempt of this queue record processing - {scheduled
(default),processing
,success
,failed
,timeout
}; when queue processed in parallel, then last ended task statusMaxRetries
- if task fails specified number of times (5 by default), then don't retry itFailedRetries
- failed retry count (number is reset, when task execution was successful)
- create "
TaskQueueRuns
" database table with following columns:Id
TaskQueueId
- associated task queue recordStartedOn
- when task was started executingPercentsCompleted
- "0
" by default, but will be updated as task is being processedFinishedOn
- when task was finished executing (regardless of status)Results
- JSON-encoded results in any format, that be later displayed in human-readable formStatus
- same statuses as for "TaskQueue.LastStatus" column- "
scheduled
" - initially, when record is created; - "
processing
" - when somebody is processing the record; - "
success
" - when execution finished without errors; - "
error
" - when known error happened during processing; - "
timeout
" - when status wasn't updated within 1 day (configurable per-queue record or system-wide, but can't be empty)
- "
ErrorCode
- non-empty when known error happenedErrorMessage
- non-empty when known error happenedProcessId
- the PID process ID of the process , that is handling (or was handling in past) this task (process that starts running this task run will put own PID in here)that runs this task run (can be a runner process id, or just a regular website visit process); NULL, when not being processed by anybody right now
- create "TaskQueueRunners" database table with following columns:
- Id
- ProcessId - the PID of associated task runner process
- StartedOn - when process was started
- FinishedOn - when process was finished; NULL initially
- create units (the "
task-queue
" and "task-queue-run
", "task-queue-runner
"), that corresponding to above described database tables - in "
TaskQueueRunEventHandler::OnAfterItemUpdate
" aggregate totals from all runs from associated task queue record and update it (task queue record)
Part 2 (adding tasks & runs)
- create new "
ParallelTaskQueueRunCount
" system setting set to "8
" by defaultcreate "TaskQueueHelper
" class - add public "
TaskQueueHelper::queueTask($task_class, $task_data, $scheduled_on, $max_retries = null)
" method, that will:- create task queue record with given settings
- consider "
$max_retries
" as "5
" when not given - throw an exception, when specified "
$task_class
" doesn't exist
- add protected "
TaskQueueHelper::synchronizeTaskRunStatus
" method that will:- get all task runs, that are running currently
- get status of their PIDs
- for all task runs which PIDs are dead set their status to "timeout"
- add protected "
TaskQueueHelper::createTaskRun(kDBItem $task_queue)
" method, that:- will create new task run (and return it's ID) for given queue record, when all of following rules aren't violated:
- only 1 active (status = processing) task run can exist at same time (for a given queue record)
- sequential failed task run count (both "error" and "timeout" statuses are considered as failed) can't be more than max allowed retry count
- return "null" otherwise
- will create new task run (and return it's ID) for given queue record, when all of following rules aren't violated:
- add protected "
TaskQueueHelper::createMissingTaskRuns()
" method, that will:- get value of "
ParallelTaskQueueRunCount
" system setting and compare it to number of currently running task runsif there are smaller number of tasks running, then possible:
get all task queue records, for which all records from "TaskQueue
" table, for which task runs can be created (:ScheduledOn
<NOW()
+ not running right now + failed retry count < max retry count)Status
is not "processing
"FailedRetries
<MaxRetries
- call the "
TaskQueueHelper::createTaskRun
" on each of them (method can return "NULL" in some cases, but that's ok)
- get value of "
- add public "
TaskQueueHelper::createTaskRuns()
" method, that will:- call "
TaskQueueHelper::synchronizeTaskRunStatus
" method - call "
TaskQueueHelper::createMissingTaskRuns
" method
- call "
- add "
TaskQueueEventHandler::OnCreateTaskRuns
" event, that would be called as Scheduled Task on a regular basis (e.g. each 5 minutes)
Part 3 (running runs)
- create new "
TaskQueueRunnerLimit
" system setting set to "8
" by default - add public "
TaskQueueHelper::processTaskRun($task_run_id)
", that will:- load task run by given ID from the database (if failed throw an exception)
- set following fields and save changes to db immediately:
- "
ProcessId
" to current process id - "
Status
" to "processing
"
- "
- create instance of class from "
TaskClass
" field of associated task queue record - call the "
processTaskQueueRun
" (wrapped within try/catch block) on that object providing task run object (was loaded above) as an argument - the above method can update given object fields at will and save to db (e.g. "
PercentsCompleted
" and "Results
") - when exception was caught, then:
- set "
Status
" to "error
" - set "
ErrorCode
" to exception code - set "
ErrorMessage
" to exception message
- set "
- when no exception was caught, then:
- set "
Status
" to "success
" - set "
ErrorCode
" and "ErrorMessage
" to empty value
- set "
- set "
FinishedOn
" to time, when task was finished (with error or not)
- save changes to db
- add protected "
TaskQueueHelper::getQueueTaskRunnerCount()
" method, that will:- get all task queue runners, that are running currently ("
FinishedOn IS NULL
") - get status of their PIDs
for all task queue runners which PIDs are dead set their "FinishedOn" field to NOW()
- return number of running task runners (don't include ones updated above)
- get all task queue runners, that are running currently ("
- add public "
TaskQueueHelper::registerAsTaskQueueRunner()
" method, that will:- get value of "
TaskQueueRunnerLimit
" system setting - call "
TaskQueueHelper::
getTaskQueueRunnerCount
" method - if currently running task queue runner count is larger or equal to allowed count, then return "false"
- add record to "
TaskQueueRunners
" table with current PID - return "
true
"
- get value of "
- add "
TaskQueueEventHandler::OnProcessTaskRun
" event, that will:- call the "
TaskQueueHelper::
- when executed from web context throw an exception
registerAsTaskQueueRunner
" method - if it returns "false", then do nothing and exit
- acquire WRITE lock "TaskQueueRuns" database table (prevent 2 events executed at same time using same task run)
- pick 1st available run (FIFO logic)
- release above acquired lock
- if none found, then exit
- call "
TaskQueueHelper::
" method with found task run idprocessTaskRun
- call the "
...