Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Part 1 (db tables & units)

  1. create "ITaskQueueRunnerITaskHandler" interface with "processTaskQueueRunhandleTaskRun(kDBItem $task_queue_run)" method
  2. create "TaskQueue" database table (unit name "task-queue") with following columns:
    • Id
    • QueuedOn - when task was queued
    • QueuedById - who queued the task
    • ScheduledOn - when task needs to be processed (set at queuing time)
    • TaskClass - TaskHandlerClass - FQCN of PHP class, which is responsible for processing this task queue record (must implement "ITaskQueueRunnerITaskHandler" interface)
    • TaskData - JSON-encoded data, that is needed for task execution (e.g. e-mail recipient, IDs of records to be processed)
    • ScheduledOn - when task needs to be processed (set at queuing time)
    • QueuedOn - when task was queued
    • QueuedById - who queued the task
    • LastStatus - status from last attempt of this queue record processing - {scheduled (default), processing, success, failed, timeout}; when queue processed in parallel, then last ended task status(same values as for "TaskRuns.Status" field)
    • MaxRetries - if task fails specified number of times (5 by default), then don't retry it
    • FailedRetries TaskRunsFailed - number failed retry count task runs (number is reset, when task execution was successful)
  3. create "TaskQueueRunsTaskRuns" database table with table (unit name: "task-run") with following columns:
    • Id
    • TaskQueueId - associated task queue recordID of record from "TaskQueue" table, that is responsible for creation this run
    • StartedOn - when task run was started executing; NULL set to moment, when status changes from "scheduled" to "processing"
    • PercentsCompleted - "0" by default, but will be updated as task is being processed
    • FinishedOn - when task was finished executing (regardless of status)
    • Results - JSON-encoded results in any format, that be later displayed in human-readable form
    • Status - same statuses as for "TaskQueue.LastStatus" column
      • "scheduled" - initially, when record is created;
      • "processingrunning" - when somebody is processing the record;
      • "success" - when execution finished without errors;
      • "error" - when known error happened during processing;
      • "timeout" - when status wasn't updated within 1 day (configurable per-queue record or system-wide, but can't be empty)associated task runner died unexpectedly
    • ErrorCode - non-empty when known error happened
    • ErrorMessage - non-empty when known error happened
    • ProcessId - the process TaskRunnerId - NULL by default; ID of process that runs this task run (can be a runner process id, or just a regular website visit process); NULL, when not being processed by anybody right now
    create "TaskQueueRunners" database table
    • task runner that processing/processed given task run
  4. create "TaskRunners" database table (unit name: "task-runner") with following columns:in "TaskQueueRunEventHandler::OnAfterItemUpdate" aggregate totals from all runs from associated task queue record and update it (task queue record)
    1. Id
    2. ProcessId - the PID of associated process, that started/created task runner process
    3. StartedOn - when process was started
    4. FinishedOn - when process was finished; NULL initially
  5. create units (the "task-queue" and "task-queue-run", "task-queue-runner"), that corresponding to above described database tables
    1. Status - status of task runner:
      1. "running" - default; means task runner is running
      2. "success" - set, when task runner decides to kill itself
      3. "timeout" - set by overseer when task runner in "running" status and associated process isn't running
  6. in "task-run:OnAfterItemUpdate" event will:
    1. load "task-queue" object associated with updated task run
    2. get all "task-run" records for that "task-queue" (via sql); then sort them from recent to old (via php)
    3. set following fields on "task-queue" object:
      1. "LastStatus" to "Status" of most recent "task-run"
      2. "TaskRunsFailed" to count of "task-run" records in "error" and "timeout" statuses (if last run is failed)
      3. "TaskRunsFailed" to "0" (if last run was successful)
  7. in "task-runner::OnBeforeItemCreate" event set "ProcessId" to PID of current process
  8. in "task-runner:OnAfterItemUpdate" event, when "Status" changes from "running" to "timeout" set all "task-run" status, that are processed by this task runner from "running" to "timeout" as well

Part 2 (adding tasks & runs)

  1. create "TaskQueueHelperTaskQueue" class
  2. add public "TaskQueueHelper::queueTask($taskprotected "TaskQueue::createTaskHandler($class_name)" method, that will:
    1. create instance of given class or throw an exception when failed
    2. if created object doesn't implement "ITaskHandler" interface, then throw an exception
    3. return the object
  3. add public "TaskQueue::addTask($task_handler_class, $task_data, $scheduled_on, $max_retries = null)" method, that will:
    1. create task queue record with given settingscall "TaskQueue::createTaskHandler" method to verify, that class given in "$task_handler_class" parameter is valid
    2. consider "$max_retries" as "5" when not giventhrow an exception, when specified "$task_class" doesn't exist
  4. add protected public "TaskQueueHelperTaskQueue::synchronizeTaskRunStatusrefreshTaskRunnersStatus()" method that will:
    1. get all task runs, that are running currently
    2. get status of their PIDs
    3. for all task runs which PIDs are dead set their status to "timeout"
    add protected "TaskQueueHelper
    1. "task-runner" in "running" status
    2. if associated process isn't running anymore, then set "task-runner" status from "running" to "timeout" (the "task-runner::OnAfterItemUpdate" would update connected task runs)
  5. add protected "TaskQueue::createTaskRun(kDBItem $task_queue)" method, that:
    1. will create new task run (and return it's ID) for given queue record, when all of following rules aren't violated:
      1. only 1 active (status = processing) task run can exist at same time (for a given queue record)sequential failed task run count (both "error" and "timeout" statuses are considered as failed) can't be more than max allowed retry countor less) running "task-run" can exist for one "task-queue" record
      2. "TaskRunsFailed" must be smaller, then "MaxRetries" on associated "task-queue" record
    2. return "null" otherwise
  6. add protected "TaskQueueHelperTaskQueue::createMissingTaskRuns()" method, that will:
    1. get all records from "TaskQueue" table, for which task runs can be created:
      1. ScheduledOn < NOW()
      2. Status is not "processingrunning"
      3. FailedRetries < TaskRunsFailed must be smaller, then MaxRetries
    2. call the "TaskQueueHelperTaskQueue::createTaskRun" method on each of them (method can return "NULL" in some cases, but that's ok)
  7. add public "TaskQueueHelperTaskQueue::createTaskRunsprocessQueue()" method, that will:
    1. call "TaskQueueHelperTaskQueue::synchronizeTaskRunStatusrefreshTaskRunnersStatus" method
    2. call "TaskQueueHelperTaskQueue::createMissingTaskRuns" method
  8. add create "TaskQueueEventHandler::OnCreateTaskRunstask-queue:OnProcess" event, that:
    1. would be called as Scheduled Task on a regular basis (e.g. each 5 minutes)
    2. would call "TaskQueue::processQueue" method

Part 3 (running runs)

  1. add "declare(ticks = 1);" on top of "/tools/run_event.php" file
  2. create new "TaskQueueRunnerLimitTaskRunnerLimit" system setting set to "8" by default
  3. create "TaskRunner" class with:
    1. add "TaskRunner::taskRunnerId" property
    2. add "TaskRunner::lastSignal" property
  4. add protected "TaskRunner::signalHandler" method, that will store received signal in the "TaskRunner::lastSignal" property
  5. add "TaskRunner::__construct($task_runner_id)" method, that will:
    1. store given "$task_runner_id" into "TaskRunner:taskRunnerId" property
    2. if executed from CLI (PHP_SAPI constant check), then use "pcntl_signal" function to register "TaskRunner::signalHandler" method as signal listeners for following signals:
      1. SIGINT
      2. SIGTERM
      3. SIGKILL
      4. SIGHUP
  6. add protected "TaskQueue::getTaskRunnerCount()" method, that will return number of "task-runner" in "running" status
  7. add public "TaskQueueHelper"TaskQueue::getTaskRunner()" method, that will:
    1. get value of  "TaskRunnerLimit" system setting
    2. call "TaskQueue::getTaskRunnerCount" method
    3. if currently running task runner count is larger or equal to allowed count, then return "null"
    4. create new "task-runner" object
    5. return instance of "TaskRunner" class initialized with ID of just created task runner
  8. add protected "TaskRunner::getNextTaskRunId()" method, that will:
    1. acquire WRITE lock "TaskRuns" database table (solves racing condition in parallel environment)
    2. pick 1st available "task-run" in "scheduled" status (FIFO logic)
    3. release above acquired lock
    4. return found task run id or "null" when nothing was found
  9. add protected "TaskRunner::processTaskRun($task_run_id)", that will:
    1. load "task-run" by given ID from the database (if failed or throw an exception if wasn't found
    2. if given "task-run" isn't in "scheduled" status, then throw an exception)
    3. set following fields and save changes to db immediately:
      1. "ProcessIdTaskRunnerId"  to current process idvalue of "TaskRunner::taskRunnerId" property
      2. "Status" to "processing"
    4. create instance of class from "TaskClass" field of associated task queue recordtask handler by calling "TaskQueue::createTaskHandler" method
    5. call the "processTaskQueueRunhandleTaskRun" (wrapped within try/catch block) on that object providing task run object (was loaded above) as an argument
    6. the above method can update given object fields at will and save to db (e.g. "PercentsCompleted" and "Results")
    7. when exception was caught, then:
      1. set "Status" to "error"
      2. set "ErrorCode" to exception code
      3. set "ErrorMessage" to exception message
    8. when no exception was caught, then:
      1. set "Status" to "success"
      2. set "ErrorCode" and "ErrorMessage" to empty value
    9. set "FinishedOn" to time, when task was finished (with error or not)
    1. save changes to db
  10. add protected public "TaskQueueHelperTaskRunner::getQueueTaskRunnerCountprocess()" method, that will consist of while loop, where each iteration will:
    1. call "TaskRunner::
    1. get all task queue runners, that are running currently ("FinishedOn IS NULL")
    2. get status of their PIDs
    3. for all task queue runners which PIDs are dead set their "FinishedOn" field to NOW()

    4. return number of running task runners (don't include ones updated above)
  11. add public "TaskQueueHelper::registerAsTaskQueueRunner()" method, that will:
    1. get value of  "TaskQueueRunnerLimit" system setting
    2. call "TaskQueueHelper::getTaskQueueRunnerCount" method
    3. if currently running task queue runner count is larger or equal to allowed count, then return "false"
    4. add record to "TaskQueueRunners" table with current PID
    5. return "true"
  12. add "TaskQueueEventHandler::OnProcessTaskRun
    1. getNextTaskRunId" method
    2. call "TaskRunner::processTaskRun" with ID found above (if ID was found)
    3. in either of following cases set "FinishedOn" to NOW() on associated "task-runner" record and exit
      1. "TaskRunner::lastSignal" is set
      2. overall memory consumption is more than 100MB
      3. it's not CLI mode
    4. sleep for X of seconds
  13. add "task-runner:OnProcess" event, that will:
    1. call the "TaskQueueHelperTaskQueue::registerAsTaskQueueRunnergetTaskRunner" method
    2. if it returns "false", then do nothing and exit
    3. acquire WRITE lock "TaskQueueRuns" database table (prevent 2 events executed at same time using same task run)
    4. pick 1st available run (FIFO logic)
    5. release above acquired lock
    6. if none found, then exit
    7. call "TaskQueueHelper::processTaskRun" method with found task run idan object is returned, then call "->process()" method on it

Part 4 (rotation)

  1. create "TaskQueueRotationInterval" setting (same configuration as for e-mail logs)
  2. create "TaskQueueEventHandler::OnRotate" event (scheduled task), that would delete old (same concert as for e-mail logs) successful task queue records along with their runs

...

  1. configure either of following, but not as scheduled task, because it will block all other scheduled tasks:

    • command: /usr/bin/env php /path/to/in-portal/tools/run_event.php task-queuerunner:OnProcessTaskRun OnProcess password_here

    • setup "upstart" or "supervisord" or any other tool to ensure presence of X processes powered by above command

    • add X records to "crontab" file powered by above command

  2. there won't be any built-in UI for this functionality, because it's too general to be usable by user, but specialized sections (e.g. "E-mail Queue") can read data from these tables to keep user informed

  3. task can be created through calling "TaskQueueHelperTaskQueue::queueTaskaddTask" method by whoever needs it, e.g.:
    • user presses a button
    • scheduled task decides to offload some work
    • etc.

...