Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Part 1 (db tables & units)

  1. create "TaskQueueRunnerITaskQueueRunner" interface with "processTaskQueueRun(kDBItem $task_queue_run)" method
  2. create "TaskQueue" database table with following columns:
    • Id
    • QueuedOn - when task was queued
    • QueuedById - who queued the task
    • ScheduledOn - when task needs to be processed (set at queuing time)
    • TaskClass - FQCN of PHP class, which is responsible for processing this task queue record (must implement "ITaskQueueRunner" interface)
    • TaskData - JSON-encoded data, that is needed for task execution (e.g. e-mail recipient, IDs of records to be processed)
    • LastStatus - status from last attempt of this queue record processing - {scheduled (default), processing, success, failed, timeout}; when queue processed in parallel, then last ended task status
    • MaxRetries - if task fails specified number of times (5 by default), then don't retry it
    • FailedRetries - failed retry count (number is reset, when task execution was successful)
  3. create "TaskQueueRuns" database table with following columns:
    • Id
    • TaskQueueId - associated task queue record
    • StartedOn - when task was started executing
    • PercentsCompleted - "0" by default, but will be updated as task is being processed
    • FinishedOn - when task was finished executing (regardless of status)
    • Results - JSON-encoded results in any format, that be later displayed in human-readable form
    • Status - same statuses as for "TaskQueue.LastStatus" column
      • "scheduled" - initially, when record is created;
      • "processing" - when somebody is processing the record;
      • "success" - when execution finished without errors;
      • "error" - when known error happened during processing;
      • "timeout" - when status wasn't updated within 1 day (configurable per-queue record or system-wide, but can't be empty)
    • ErrorCode - non-empty when known error happened
    • ErrorMessage - non-empty when known error happened
    • ProcessId - the PID process ID of the process , that is handling (or was handling in past) this task (process that starts running this task run will put own PID in here)that runs this task run (can be a runner process id, or just a regular website visit process); NULL, when not being processed by anybody right now
  4. create "TaskQueueRunners" database table with following columns:
    1. Id
    2. ProcessId - the PID of associated task runner process
    3. StartedOn - when process was started
    4. FinishedOn - when process was finished; NULL initially
  5. create units (the "task-queue" and "task-queue-run", "task-queue-runner"), that corresponding to above described database tables
  6. in "TaskQueueRunEventHandler::OnAfterItemUpdate" aggregate totals from all runs from associated task queue record and update it (task queue record)

Part 2 (adding tasks & runs)

  1. create new "ParallelTaskQueueRunCount" system setting set to "8" by defaultcreate "TaskQueueHelper" class
  2. add public "TaskQueueHelper::queueTask($task_class, $task_data, $scheduled_on, $max_retries = null)" method, that will:
    1. create task queue record with given settings
    2. consider "$max_retries" as "5" when not given
    3. throw an exception, when specified "$task_class" doesn't exist
  3. add protected "TaskQueueHelper::synchronizeTaskRunStatus" method that will:
    1. get all task runs, that are running currently
    2. get status of their PIDs
    3. for all task runs which PIDs are dead set their status to "timeout"
  4. add protected "TaskQueueHelper::createTaskRun(kDBItem $task_queue)" method, that:
    1. will create new task run (and return it's ID) for given queue record, when all of following rules aren't violated:
      1. only 1 active (status = processing) task run can exist at same time (for a given queue record)
      2. sequential failed task run count (both "error" and "timeout" statuses are considered as failed) can't be more than max allowed retry count
    2. return "null" otherwise
  5. add protected "TaskQueueHelper::createMissingTaskRuns()" method, that will:
    1. get value of "ParallelTaskQueueRunCount" system setting and compare it to number of currently running task runsif there are smaller number of tasks running, then possible:
      get all task queue records, for which all records from "TaskQueue" table, for which task runs can be created (:
      1. ScheduledOn < NOW() + not running right now + failed retry count < max retry count)
      2. Status is not "processing"
      3. FailedRetries < MaxRetries
    2. call the "TaskQueueHelper::createTaskRun" on each of them (method can return "NULL" in some cases, but that's ok)
  6. add public "TaskQueueHelper::createTaskRuns()" method, that will:
    1. call "TaskQueueHelper::synchronizeTaskRunStatus" method
    2. call "TaskQueueHelper::createMissingTaskRuns" method
  7. add "TaskQueueEventHandler::OnCreateTaskRuns" event, that would be called as Scheduled Task on a regular basis (e.g. each 5 minutes)

Part 3 (running runs)

  1. create new "TaskQueueRunnerLimit" system setting set to "8" by default
  2. add public "TaskQueueHelper::processTaskRun($task_run_id)", that will:
    1. load task run by given ID from the database (if failed throw an exception)
    2. set following fields and save changes to db immediately:
      1. "ProcessId"  to current process id
      2. "Status" to "processing"
    3. create instance of class from "TaskClass" field of associated task queue record
    4. call the "processTaskQueueRun" (wrapped within try/catch block) on that object providing task run object (was loaded above) as an argument
    5. the above method can update given object fields at will and save to db (e.g. "PercentsCompleted" and "Results")
    6. when exception was caught, then:
      1. set "Status" to "error"
      2. set "ErrorCode" to exception code
      3. set "ErrorMessage" to exception message
    7. when no exception was caught, then:
      1. set "Status" to "success"
      2. set "ErrorCode" and "ErrorMessage" to empty value
    8. set "FinishedOn" to time, when task was finished (with error or not)
    1. save changes to db
  3. add protected "TaskQueueHelper::getQueueTaskRunnerCount()" method, that will:
    1. get all task queue runners, that are running currently ("FinishedOn IS NULL")
    2. get status of their PIDs
    3. for all task queue runners which PIDs are dead set their "FinishedOn" field to NOW()

    4. return number of running task runners (don't include ones updated above)
  4. add public "TaskQueueHelper::registerAsTaskQueueRunner()" method, that will:
    1. get value of  "TaskQueueRunnerLimit" system setting
    2. call "TaskQueueHelper::getTaskQueueRunnerCount" method
    3. if currently running task queue runner count is larger or equal to allowed count, then return "false"
    4. add record to "TaskQueueRunners" table with current PID
    5. return "true"
  5. add "TaskQueueEventHandler::OnProcessTaskRun" event, that will:
    1. call the "TaskQueueHelper::
    2. when executed from web context throw an exceptionregisterAsTaskQueueRunner" method
    3. if it returns "false", then do nothing and exit
    4. acquire WRITE lock "TaskQueueRuns" database table (prevent 2 events executed at same time using same task run)
    5. pick 1st available run (FIFO logic)
    6. release above acquired lock
    7. if none found, then exit
    8. call "TaskQueueHelper::processTaskRun" method with found task run id

...