OpenXE/classes/Modules/Pipedrive/RequestQueues/PipedriveRequestQueuesService.php
2021-05-21 08:49:41 +02:00

329 lines
11 KiB
PHP

<?php
declare(strict_types=1);
namespace Xentral\Modules\Pipedrive\RequestQueues;
use ApplicationCore;
use Exception;
use Xentral\Components\Database\Database;
use Xentral\Components\Database\Exception\DatabaseExceptionInterface;
use Xentral\Core\DependencyInjection\Exception\ServiceNotFoundException;
use Xentral\Modules\Pipedrive\Exception\PipedriveConfigurationException;
use Xentral\Modules\Pipedrive\Exception\PipedriveEventException;
use Xentral\Modules\Pipedrive\Exception\PipedriveMetaException;
use Xentral\Modules\Pipedrive\Exception\PipedriveRequestQueuesException;
use Xentral\Modules\Pipedrive\Service\PipedriveConfigurationService;
use Xentral\Modules\Pipedrive\Service\PipedriveEventService;
use Xentral\Modules\Pipedrive\Service\PipedriveServerResponseInterface;
use DateTime;
use RuntimeException;
final class PipedriveRequestQueuesService
{
/** @var int */
private const _WAITING_TIME = 10000000;
/** @var int */
private const _BATCH = 1;
/** @var PipedriveRequestQueuesGateway $gateway */
private $gateway;
/** @var ApplicationCore $app */
private $app;
/** @var Database $db */
private $db;
/** @var array $completedIds */
private $completedIds = [];
/** @var PipedriveConfigurationService $confService */
private $confService;
/** @var PipedriveEventService $eventService */
private $eventService;
/**
* @param PipedriveRequestQueuesGateway $gateway
* @param Database $database
* @param ApplicationCore $app
* @param PipedriveConfigurationService $confService
* @param PipedriveEventService $eventService
*/
public function __construct(
PipedriveRequestQueuesGateway $gateway,
Database $database,
ApplicationCore $app,
PipedriveConfigurationService $confService,
PipedriveEventService $eventService
) {
$this->gateway = $gateway;
$this->app = $app;
$this->db = $database;
$this->confService = $confService;
$this->eventService = $eventService;
}
/**
* @param array $option
*
* @throws PipedriveRequestQueuesException
*
* @return int
*/
public function addRequest(array $option): int
{
$default = [
'check_sum' => '',
'command' => '',
'on_after_done' => '',
'not_before' => 0,
'call_type' => 'pipedrive',
'is_looped' => 0,
'setting_name' => '',
];
$hCommand = [];
if (array_key_exists('method', $option)) {
$hCommand['method'] = $option['method'];
unset($option['method']);
}
if (array_key_exists('args', $option)) {
$hCommand['args'] = $option['args'];
unset($option['args']);
}
if (!empty($hCommand)) {
$option['command'] = json_encode($hCommand, JSON_HEX_TAG | JSON_HEX_APOS | JSON_HEX_AMP | JSON_HEX_QUOT);
}
if (!empty($option['check_sum'])) {
$default['check_sum'] = $option['check_sum'];
} else {
$default['check_sum'] = array_key_exists('command', $option) ? md5($option['command']) : '';
}
$option = array_merge($default, $option);
if (!array_key_exists('runner', $option)) {
throw new PipedriveRequestQueuesException('Runner is missing!');
}
if (!empty($option['on_after_done']) && is_array($option['on_after_done'])) {
$option['on_after_done'] = json_encode(
$option['on_after_done'],
JSON_HEX_TAG | JSON_HEX_APOS | JSON_HEX_AMP | JSON_HEX_QUOT
);
}
$check = 'SELECT EXISTS(
SELECT prq.id FROM `pipedrive_request_queues` AS `prq`
WHERE prq.deleted=0 AND prq.completed=0 AND prq.check_sum=:check_sum AND prq.runner=:runner
)';
if (empty(
$this->db->fetchValue(
$check,
['runner' => $option['runner'], 'check_sum' => $option['check_sum']]
)
)) {
$add = 'INSERT INTO `pipedrive_request_queues` (`command`, `on_after_done`, `runner`, `not_before`, `check_sum`, `call_type`, `is_looped`, `setting_name`, `created_at` )
VALUES (:command, :on_after_done, :runner, :not_before, :check_sum, :call_type, :is_looped, :setting_name, NOW())';
try {
$this->db->perform($add, $option);
} catch (DatabaseExceptionInterface $exception) {
throw new PipedriveRequestQueuesException($exception->getMessage());
}
return $this->db->lastInsertId();
}
return 0;
}
/**
* @param string|null $callType
*
* @throws PipedriveConfigurationException
* @throws PipedriveEventException
* @throws PipedriveRequestQueuesException
* @throws PipedriveMetaException
* @throws Exception
*
* @return void
*/
public function execute(?string $callType = null): void
{
$callType = $callType ?? 'pipedrive';
$jobs = $this->gateway->getNewRequestsByCallType($callType);
if (!empty($jobs)) {
$batch_loop = self::_BATCH;
$iCount = 0;
$bSkipWait = count($jobs) <= 1;
foreach ($jobs as $job) {
$settings = $this->confService->getSettings();
// HANDLE CAN EXECUTE
if (!empty($job['setting_name']) && array_key_exists(
$job['setting_name'],
$settings
) && $settings[$job['setting_name']] === false) {
// SKIP
continue;
}
// HANDLE is_looped
if (!empty($job['is_looped']) && !empty($job['setting_name'])) {
$settingInterval = sprintf('%s_interval', $job['setting_name']);
if (!array_key_exists($settingInterval, $settings)) {
// SKIP or ERROR?
continue;
}
$interval = $settings[$settingInterval];
$modifiedAt = new DateTime($job['modified_at']);
$iModifiedAt = $modifiedAt->getTimestamp();
if (time() < $iModifiedAt + $interval) {
// SKIP, waiting for next running time
continue;
}
}
if (!empty($job['is_looped'])) {
$this->db->perform(
'UPDATE `pipedrive_request_queues` SET `modified_at` = NOW() WHERE `id` = :id',
['id' => $job['id']]
);
} else {
$this->db->perform(
'UPDATE `pipedrive_request_queues`
SET `amount_attempts` = `amount_attempts` +1 WHERE `id` = :id',
['id' => $job['id']]
);
}
try {
$oClass = $this->app->Container->get($job['runner']);
} catch (ServiceNotFoundException $exception) {
throw new PipedriveRequestQueuesException($exception->getMessage());
}
$hCommand = json_decode($job['command'], true);
if (is_array($hCommand)) {
$xArg = $hCommand['args'];
$sMethod = $hCommand['method'];
try {
/** @var PipedriveServerResponseInterface $xReturn */
$xReturn = call_user_func_array([$oClass, $sMethod], $xArg);
} catch (RuntimeException $exception) {
$this->eventService->add($exception->getMessage());
continue;
}
$isLooped = !empty($job['is_looped']);
if (empty($job['on_after_done'])) {
$this->markJobAsComplete($job['id'], $isLooped);
} else {
$this->onAfterDone($job['id'], $xReturn, $job['on_after_done'], $isLooped);
}
//BATCH PROCESS CHECK - take a nap
if ($bSkipWait === false) {
if ($iCount === $batch_loop) {
$batch_loop += self::_BATCH;
@usleep(self::_WAITING_TIME);
}
echo 'Script always alive... ';
}
} else {
continue;
}
}
}
}
/**
* @param int $id
* @param PipedriveServerResponseInterface|null $response
* @param string|null $onAfter
* @param bool $looped
*
* @throws PipedriveEventException
*
* @return void
*/
private function onAfterDone(
int $id,
?PipedriveServerResponseInterface $response,
?string $onAfter = null,
bool $looped = false
): void {
$hOnAfter = !empty($onAfter) ? json_decode($onAfter, true) : [];
if (!empty($hOnAfter) && array_key_exists('runner', $hOnAfter) && array_key_exists(
'method',
$hOnAfter
) && array_key_exists('args', $hOnAfter)) {
$oClass = $this->app->Container->get($hOnAfter['runner']);
if (!empty($hOnAfter['replace_in_args']) && in_array($response->getStatusCode(), [200, 201], true)) {
$jsonData = $response->getData();
foreach ($hOnAfter['args'] as &$xArg) {
if (is_string($xArg)) {
foreach ($hOnAfter['replace_in_args'] as $replace_with) {
if (!empty($jsonData[$replace_with])) {
$xArg = sprintf($xArg, $jsonData[$replace_with]);
}
}
}
}
}
unset($xArg);
try {
call_user_func_array(
[$oClass, $hOnAfter['method']],
$hOnAfter['args']
);
// ADD EVENT
if (array_key_exists('event', $hOnAfter) && !empty($hOnAfter['event']) && is_string(
$hOnAfter['event']
) &&
in_array($response->getStatusCode(), [200, 201], true)) {
$this->eventService->add($hOnAfter['event']);
}
} catch (RuntimeException $exception) {
$this->eventService->add($exception->getMessage());
}
}
$this->markJobAsComplete($id, $looped);
}
/**
* @param int $id
* @param bool $looped
*
* @return void
*/
private function markJobAsComplete(int $id, bool $looped): void
{
if (is_numeric($id) && $looped === false) {
$this->db->perform('UPDATE `pipedrive_request_queues` SET `completed` = 1 WHERE `id` = :id', ['id' => $id]);
$this->completedIds[] = $id;
}
}
/**
* @return void
*/
public function cleanup(): void
{
$this->db->perform('DELETE FROM `pipedrive_request_queues` WHERE `completed` = 1');
$this->eventService->deleteByInterval();
unset($this->completedIds);
}
}