...
Part 1 (db tables & units)
- create "
ITaskQueueRunner
ITaskHandler
" interface with "processTaskQueueRunhandleTaskRun(kDBItem $task_queue_run)
" method - create "
TaskQueue
" database table (unit name "task-queue
") with following columns:Id
QueuedOn
- when task was queuedQueuedById
- who queued the taskScheduledOn
- when task needs to be processed (set at queuing time)TaskClass
-TaskHandlerClass
- FQCN of PHP class, which is responsible for processing this task queue record (must implement "ITaskQueueRunner
ITaskHandler
" interface)TaskData
- JSON-encoded data, that is needed for task execution (e.g. e-mail recipient, IDs of records to be processed)ScheduledOn
- when task needs to be processed (set at queuing time)QueuedOn
- when task was queuedQueuedById
- who queued the taskLastStatus
- status from last attempt of this queue record processing - {scheduled
(default),processing
,success
,failed
,timeout
}; when queue processed in parallel, then last ended task status(same values as for "TaskRuns.Status
" field)MaxRetries
- if task fails specified number of times (5 by default), then don't retry itFailedRetries
TaskRunsFailed
- number failed retry count task runs (number is reset, when task execution was successful)
- create "
TaskQueueRuns
TaskRuns
" database table with table (unit name: "task-run
") with following columns:Id
TaskQueueId
- associated task queue recordID of record from "TaskQueue
" table, that is responsible for creation this runStartedOn
- when task run was started executing; NULL set to moment, when status changes from "scheduled
" to "processing
"PercentsCompleted
- "0
" by default, but will be updated as task is being processedFinishedOn
- when task was finished executing (regardless of status)Results
- JSON-encoded results in any format, that be later displayed in human-readable formStatus
- same statuses as for "TaskQueue.LastStatus" column- "
scheduled
" - initially, when record is created; - "
processing
running
" - when somebody is processing the record; - "
success
" - when execution finished without errors; - "
error
" - when known error happened during processing; - "
timeout
" - when status wasn't updated within 1 day (configurable per-queue record or system-wide, but can't be empty)associated task runner died unexpectedly
- "
ErrorCode
- non-empty when known error happenedErrorMessage
- non-empty when known error happenedProcessId
- the processTaskRunnerId
- NULL by default; ID of process that runs this task run (can be a runner process id, or just a regular website visit process); NULL, when not being processed by anybody right now
- task runner that processing/processed given task run
- create "
TaskRunners
" database table (unit name: "task-runner
") with following columns:in "TaskQueueRunEventHandler::OnAfterItemUpdate
" aggregate totals from all runs from associated task queue record and update it (task queue record)Id
ProcessId
- the PID of associated process, that started/created task runner processStartedOn
- when process was startedFinishedOn
- when process was finished; NULL initially
- create units (the "
task-queue
" and "task-queue-run
", "task-queue-runner
"), that corresponding to above described database tables Status
- status of task runner:- "
running
" - default; means task runner is running - "
success
" - set, when task runner decides to kill itself - "
timeout
" - set by overseer when task runner in "running
" status and associated process isn't running
- "
- in "
task-run:OnAfterItemUpdate
" event will:- load "
task-queue
" object associated with updated task run - get all "
task-run
" records for that "task-queue
" (via sql); then sort them from recent to old (via php) - set following fields on "task-queue" object:
- "
LastStatus
" to "Status" of most recent "task-run" - "
TaskRunsFailed
" to count of "task-run" records in "error" and "timeout" statuses (if last run is failed) - "
TaskRunsFailed
" to "0" (if last run was successful)
- "
- load "
- in "
task-runner::OnBeforeItemCreate
" event set "ProcessId
" to PID of current process - in "
task-runner:OnAfterItemUpdate
" event, when "Status
" changes from "running
" to "timeout
" set all "task-run
" status, that are processed by this task runner from "running
" to "timeout
" as well
Part 2 (adding tasks & runs)
- create "
TaskQueueHelper
TaskQueue
" class - add public "TaskQueueHelper::queueTask($taskprotected "
TaskQueue::createTaskHandler($class_name)
" method, that will:- create instance of given class or throw an exception when failed
- if created object doesn't implement "
ITaskHandler
" interface, then throw an exception - return the object
- add public "
TaskQueue::addTask($task_handler_class, $task_data, $scheduled_on, $max_retries = null)
" method, that will:- create task queue record with given settingscall "
TaskQueue::createTaskHandler
" method to verify, that class given in "$task_handler_class
" parameter is valid - consider "
$max_retries
" as "5
" when not giventhrow an exception, when specified "$task_class
" doesn't exist
- create task queue record with given settingscall "
- add protected public "TaskQueueHelper
TaskQueue::
synchronizeTaskRunStatusrefreshTaskRunnersStatus()
" method that will:- get all task runs, that are running currently
- get status of their PIDs
- for all task runs which PIDs are dead set their status to "timeout"
- "
task-runner
" in "running
" status - if associated process isn't running anymore, then set "
task-runner
" status from "running
" to "timeout
" (the "task-runner::OnAfterItemUpdate
" would update connected task runs)
- add protected "
TaskQueue::createTaskRun(kDBItem $task_queue)
" method, that:- will create new task run (and return it's ID) for given queue record, when all of following rules aren't violated:
- only 1 active (status = processing) task run can exist at same time (for a given queue record)sequential failed task run count (both "error" and "timeout" statuses are considered as failed) can't be more than max allowed retry countor less) running "
task-run
" can exist for one "task-queue
" record - "
TaskRunsFailed
" must be smaller, then "MaxRetries
" on associated "task-queue
" record
- only 1 active (status = processing) task run can exist at same time (for a given queue record)sequential failed task run count (both "error" and "timeout" statuses are considered as failed) can't be more than max allowed retry countor less) running "
- return "
null
" otherwise
- will create new task run (and return it's ID) for given queue record, when all of following rules aren't violated:
- add protected "
TaskQueueHelperTaskQueue::createMissingTaskRuns()
" method, that will:- get all records from "
TaskQueue
" table, for which task runs can be created:ScheduledOn
<NOW()
Status
is not "processing
running
"FailedRetries
<TaskRunsFailed
must be smaller, thenMaxRetries
- call the "
TaskQueueHelperTaskQueue::createTaskRun
" method on each of them (method can return "NULL" in some cases, but that's ok)
- get all records from "
- add public "
TaskQueueHelperTaskQueue::createTaskRunsprocessQueue()
" method, that will:- call "
TaskQueueHelperTaskQueue::synchronizeTaskRunStatus
" methodrefreshTaskRunnersStatus
- call "
TaskQueueHelperTaskQueue::createMissingTaskRuns
" method
- call "
- add create "
TaskQueueEventHandler::OnCreateTaskRuns
task-queue:OnProcess
" event, that:- would be called as Scheduled Task on a regular basis (e.g. each 5 minutes)
- would call "
TaskQueue::processQueue
" method
Part 3 (running runs)
- add "
declare(ticks = 1);
" on top of "/tools/run_event.php
" file - create new "
TaskQueueRunnerLimit
TaskRunnerLimit
" system setting set to "8
" by default - create "
TaskRunner
" class with:- add "
TaskRunner::taskRunnerId
" property - add "
TaskRunner::lastSignal
" property
- add "
- add protected "
TaskRunner::signalHandler
" method, that will store received signal in the "TaskRunner::lastSignal
" property - add "
TaskRunner::__construct($task_runner_id)
" method, that will:- store given "
$task_runner_id
" into "TaskRunner:taskRunnerId
" property - if executed from CLI (PHP_SAPI constant check), then use "
pcntl_signal
" function to register "TaskRunner::signalHandler
" method as signal listeners for following signals:SIGINT
SIGTERM
SIGKILL
SIGHUP
- store given "
- add protected "
TaskQueue::getTaskRunnerCount()
" method, that will return number of "task-runner
" in "running
" status - add public "TaskQueueHelper"
TaskQueue::getTaskRunner()
" method, that will:- get value of "
TaskRunnerLimit
" system setting - call "
TaskQueue::getTaskRunnerCount
" method - if currently running task runner count is larger or equal to allowed count, then return "null"
- create new "
task-runner
" object - return instance of "
TaskRunner
" class initialized with ID of just created task runner
- get value of "
- add protected "
TaskRunner::getNextTaskRunId()
" method, that will:- acquire WRITE lock "
TaskRuns
" database table (solves racing condition in parallel environment) - pick 1st available "
task-run
" in "scheduled
" status (FIFO logic) - release above acquired lock
- return found task run id or "null" when nothing was found
- acquire WRITE lock "
- add protected "
TaskRunner
", that will:::processTaskRun($task_run_id)
- load "
task-run
" by given ID from the database (if failed or throw an exception if wasn't found - if given "
task-run
" isn't in "scheduled
" status, then throw an exception) - set following fields and save changes to db immediately:
- "
ProcessId
TaskRunnerId
" to current process idvalue of "TaskRunner::taskRunnerId
" property - "
Status
" to "processing
"
- "
- create instance of class from "
TaskClass
" field of associated task queue recordtask handler by calling "TaskQueue::createTaskHandler
" method - call the "
processTaskQueueRun
handle
TaskRun
" (wrapped within try/catch block) on that object providing task run object (was loaded above) as an argument - the above method can update given object fields at will and save to db (e.g. "
PercentsCompleted
" and "Results
") - when exception was caught, then:
- set "
Status
" to "error
" - set "
ErrorCode
" to exception code - set "
ErrorMessage
" to exception message
- set "
- when no exception was caught, then:
- set "
Status
" to "success
" - set "
ErrorCode
" and "ErrorMessage
" to empty value
- set "
- set "
FinishedOn
" to time, when task was finished (with error or not)
- save changes to db
- load "
- add protected public "TaskQueueHelper
TaskRunner::
getQueueTaskRunnerCountprocess()
" method, that will consist of while loop, where each iteration will:- call "
TaskRunner::
- get all task queue runners, that are running currently ("
FinishedOn IS NULL
") - get status of their PIDs
for all task queue runners which PIDs are dead set their "FinishedOn" field to NOW()
- return number of running task runners (don't include ones updated above)
- call "
- add public "
TaskQueueHelper::registerAsTaskQueueRunner()
" method, that will:- get value of "
TaskQueueRunnerLimit
" system setting - call "
TaskQueueHelper::
getTaskQueueRunnerCount
" method - if currently running task queue runner count is larger or equal to allowed count, then return "false"
- add record to "
TaskQueueRunners
" table with current PID - return "
true
"
- get value of "
- add "
TaskQueueEventHandler::OnProcessTaskRun
getNextTaskRunId
" method- call "
TaskRunner::
processTaskRun
" with ID found above (if ID was found) - in either of following cases set "
FinishedOn
" to NOW() on associated "task-runner
" record and exit- "
TaskRunner::lastSignal
" is set - overall memory consumption is more than 100MB
- it's not CLI mode
- "
- sleep for X of seconds
- add "
task-runner:OnProcess
" event, that will:- call the "
TaskQueueHelperTaskQueue::registerAsTaskQueueRunnergetTaskRunner
" method - if it returns "false", then do nothing and exit
- acquire WRITE lock "TaskQueueRuns" database table (prevent 2 events executed at same time using same task run)
- pick 1st available run (FIFO logic)
- release above acquired lock
- if none found, then exit
- call "
TaskQueueHelper::
" method with found task run idan object is returned, then call "processTaskRun
->process()
" method on it
- call the "
Part 4 (rotation)
- create "
TaskQueueRotationInterval
" setting (same configuration as for e-mail logs) - create "
TaskQueueEventHandler::OnRotate
" event (scheduled task), that would delete old (same concert as for e-mail logs) successful task queue records along with their runs
...
configure either of following, but not as scheduled task, because it will block all other scheduled tasks:
command:
/usr/bin/env php /path/to/in-portal/tools/run_event.php task-queuerunner:OnProcessTaskRun OnProcess password_here
setup "
upstart
" or "supervisord
" or any other tool to ensure presence of X processes powered by above commandadd X records to "crontab" file powered by above command
there won't be any built-in UI for this functionality, because it's too general to be usable by user, but specialized sections (e.g. "E-mail Queue") can read data from these tables to keep user informed
- task can be created through calling "
TaskQueueHelperTaskQueue::queueTaskaddTask
" method by whoever needs it, e.g.:- user presses a button
- scheduled task decides to offload some work
- etc.
...