214 lines
5.9 KiB
PHP
214 lines
5.9 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Services\WbService;
|
|
|
|
use App\Contracts\WbSyncManagerInterface;
|
|
use App\Enums\SyncStatus;
|
|
use App\Enums\WbEndpoint;
|
|
use App\Jobs\WbSyncJob;
|
|
use App\Models\SyncState;
|
|
use Illuminate\Support\Facades\Bus;
|
|
use Illuminate\Support\Facades\DB;
|
|
use Illuminate\Support\Facades\Log;
|
|
use Throwable;
|
|
|
|
final readonly class WbSyncManager implements WbSyncManagerInterface
|
|
{
|
|
/**
|
|
* @throws Throwable
|
|
*/
|
|
public function start(
|
|
WbEndpoint $endpoint,
|
|
string $dateFrom,
|
|
?string $dateTo = null
|
|
): void {
|
|
$now = now();
|
|
|
|
DB::transaction(function () use ($endpoint, $dateFrom, $dateTo, $now) {
|
|
$sync = SyncState::query()
|
|
->where('entity', $endpoint->name)
|
|
->where('date_from', $dateFrom)
|
|
->when(
|
|
$dateTo === null,
|
|
fn($q) => $q->whereNull('date_to'),
|
|
fn($q) => $q->where('date_to', $dateTo)
|
|
)
|
|
->lockForUpdate()
|
|
->first();
|
|
|
|
if (!$sync) {
|
|
$sync = SyncState::create([
|
|
'entity' => $endpoint->name,
|
|
'date_from' => $dateFrom,
|
|
'date_to' => $dateTo,
|
|
'status' => SyncStatus::PENDING,
|
|
'started_at' => $now,
|
|
]);
|
|
|
|
Log::info('WB sync created', [
|
|
'id' => $sync->id,
|
|
'entity' => $endpoint->name,
|
|
'date_from' => $dateFrom,
|
|
]);
|
|
|
|
WbSyncJob::dispatch($endpoint, $dateFrom, $dateTo, 1);
|
|
return;
|
|
}
|
|
|
|
if ($sync->status === SyncStatus::SUCCESS) {
|
|
Log::info('WB sync skipped, already SUCCESS', [
|
|
'id' => $sync->id,
|
|
]);
|
|
return;
|
|
}
|
|
|
|
if ($sync->status === SyncStatus::PENDING) {
|
|
if ($sync->started_at && $sync->started_at->diffInMinutes($now) < config('wb-sync.timeout')) {
|
|
Log::info('WB sync already PENDING, skipping', [
|
|
'id' => $sync->id,
|
|
]);
|
|
return;
|
|
}
|
|
|
|
Log::warning('WB sync stuck > wb-sync.timeout, restart', [
|
|
'id' => $sync->id,
|
|
'started_at' => $sync->started_at,
|
|
]);
|
|
}
|
|
|
|
$sync->update([
|
|
'status' => SyncStatus::PENDING,
|
|
'started_at' => $now,
|
|
'updated_at' => $now,
|
|
]);
|
|
|
|
Log::info('WB sync started', ['id' => $sync->id]);
|
|
|
|
WbSyncJob::dispatch($endpoint, $dateFrom, $dateTo, 1);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* @throws Throwable
|
|
*/
|
|
public function handleFirstPage(
|
|
WbEndpoint $endpoint,
|
|
string $dateFrom,
|
|
?string $dateTo,
|
|
int $lastPage
|
|
): void {
|
|
DB::transaction(function () use ($endpoint, $dateFrom, $dateTo, $lastPage) {
|
|
$sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo);
|
|
|
|
if ($sync->batch_id) {
|
|
Log::info('WB sync batch already created', [
|
|
'id' => $sync->id,
|
|
]);
|
|
return;
|
|
}
|
|
|
|
$sync->update([
|
|
'total_pages' => $lastPage,
|
|
'processed_pages' => 1,
|
|
]);
|
|
|
|
if ($lastPage <= 1) {
|
|
$this->markSuccess($endpoint, $dateFrom, $dateTo);
|
|
return;
|
|
}
|
|
|
|
$jobs = [];
|
|
|
|
for ($page = 2; $page <= $lastPage; $page++) {
|
|
$jobs[] = new WbSyncJob(
|
|
$endpoint,
|
|
$dateFrom,
|
|
$dateTo,
|
|
$page
|
|
);
|
|
}
|
|
|
|
$batch = Bus::batch($jobs)
|
|
->then(fn() => $this->markSuccess($endpoint, $dateFrom, $dateTo))
|
|
->catch(fn() => $this->markFailed($endpoint, $dateFrom, $dateTo))
|
|
->name("sync-$endpoint->name-$dateFrom")
|
|
->dispatch();
|
|
|
|
$sync->update([
|
|
'batch_id' => $batch->id,
|
|
]);
|
|
|
|
Log::info('WB batch dispatched', [
|
|
'batch_id' => $batch->id,
|
|
'pages' => $lastPage,
|
|
]);
|
|
});
|
|
}
|
|
|
|
public function incrementProgress(
|
|
WbEndpoint $endpoint,
|
|
string $dateFrom,
|
|
?string $dateTo
|
|
): void {
|
|
$sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo);
|
|
|
|
$sync->incrementProcessedPages();
|
|
|
|
Log::info('WB progress incremented', [
|
|
'id' => $sync->id,
|
|
'processed_pages' => $sync->processed_pages + 1,
|
|
]);
|
|
}
|
|
|
|
public function markSuccess(
|
|
WbEndpoint $endpoint,
|
|
string $dateFrom,
|
|
?string $dateTo
|
|
): void {
|
|
$sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo);
|
|
|
|
$sync->update([
|
|
'status' => SyncStatus::SUCCESS,
|
|
]);
|
|
|
|
Log::info('WB sync success', [
|
|
'id' => $sync->id,
|
|
]);
|
|
}
|
|
|
|
public function markFailed(
|
|
WbEndpoint $endpoint,
|
|
string $dateFrom,
|
|
?string $dateTo
|
|
): void {
|
|
$sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo);
|
|
|
|
$sync->update([
|
|
'status' => SyncStatus::FAILED,
|
|
]);
|
|
|
|
Log::error('WB sync failed', [
|
|
'id' => $sync->id,
|
|
]);
|
|
}
|
|
|
|
private function getSyncForUpdate(
|
|
WbEndpoint $endpoint,
|
|
string $dateFrom,
|
|
?string $dateTo
|
|
): SyncState {
|
|
return SyncState::query()
|
|
->where('entity', $endpoint->name)
|
|
->where('date_from', $dateFrom)
|
|
->when(
|
|
$dateTo === null,
|
|
fn($q) => $q->whereNull('date_to'),
|
|
fn($q) => $q->where('date_to', $dateTo)
|
|
)
|
|
->lockForUpdate()
|
|
->firstOrFail();
|
|
}
|
|
}
|