feat(legacy): replace massivescale/celery-php with jooola/celery-php

This commit is contained in:
jo 2022-03-04 18:46:57 +01:00 committed by Kyle Robbertze
parent f5bb31e971
commit dc7560e1df
3 changed files with 161 additions and 134 deletions

View file

@ -1,5 +1,9 @@
<?php
use Celery\Celery;
use Celery\CeleryException;
use Celery\CeleryTimeoutException;
class CeleryManager
{
/**
@ -38,12 +42,11 @@ class CeleryManager
$config['rabbitmq']['user'],
$config['rabbitmq']['password'],
$config['rabbitmq']['vhost'],
$exchange, // Exchange name
$queue, // Binding/queue
$config['rabbitmq']['port'],
$exchange,
$queue,
$config['rabbitmq']['port'],
false,
true, // Persistent messages
self::$_CELERY_MESSAGE_TIMEOUT
self::$_CELERY_MESSAGE_TIMEOUT
); // Result expiration
}
@ -93,13 +96,11 @@ class CeleryManager
// track reference here in case it was a deletion that failed, for example.
$task->setDbStatus(CELERY_FAILED_STATUS)->save();
throw new CeleryTimeoutException('Celery task ' . $task->getDbName()
. ' with ID ' . $task->getDbTaskId() . ' timed out');
throw new CeleryTimeoutException('Celery task ' . $task->getDbName() . ' with ID ' . $task->getDbTaskId() . ' timed out');
}
// The message hasn't timed out, but it's still false, which means it hasn't been
// sent back from Celery yet.
throw new CeleryException('Waiting on Celery task ' . $task->getDbName()
. ' with ID ' . $task->getDbTaskId());
throw new CeleryException('Waiting on Celery task ' . $task->getDbName() . ' with ID ' . $task->getDbTaskId());
}
return $message;
@ -132,8 +133,9 @@ class CeleryManager
*/
public static function pollBrokerTaskQueue($taskName = '', $serviceName = '')
{
$pendingTasks = empty(self::$_pendingTasks) ? static::_getPendingTasks($taskName, $serviceName)
: self::$_pendingTasks;
$pendingTasks = empty(self::$_pendingTasks)
? static::_getPendingTasks($taskName, $serviceName)
: self::$_pendingTasks;
foreach ($pendingTasks as $task) {
try {
$message = static::_getTaskMessage($task);