where('entity', $endpoint->name) ->where('date_from', $dateFrom) ->when( $dateTo === null, fn($q) => $q->whereNull('date_to'), fn($q) => $q->where('date_to', $dateTo) ) ->lockForUpdate() ->first(); if (!$sync) { $sync = SyncState::create([ 'entity' => $endpoint->name, 'date_from' => $dateFrom, 'date_to' => $dateTo, 'status' => SyncStatus::PENDING, 'started_at' => $now, ]); Log::info('WB sync created', [ 'id' => $sync->id, 'entity' => $endpoint->name, 'date_from' => $dateFrom, ]); WbSyncJob::dispatch($endpoint, $dateFrom, $dateTo, 1); return; } if ($sync->status === SyncStatus::SUCCESS) { Log::info('WB sync skipped, already SUCCESS', [ 'id' => $sync->id, ]); return; } if ($sync->status === SyncStatus::PENDING) { if ($sync->started_at && $sync->started_at->diffInMinutes($now) < config('wb-sync.timeout')) { Log::info('WB sync already PENDING, skipping', [ 'id' => $sync->id, ]); return; } Log::warning('WB sync stuck > wb-sync.timeout, restart', [ 'id' => $sync->id, 'started_at' => $sync->started_at, ]); } $sync->update([ 'status' => SyncStatus::PENDING, 'started_at' => $now, 'updated_at' => $now, ]); Log::info('WB sync started', ['id' => $sync->id]); WbSyncJob::dispatch($endpoint, $dateFrom, $dateTo, 1); }); } /** * @throws Throwable */ public function handleFirstPage( WbEndpoint $endpoint, string $dateFrom, ?string $dateTo, int $lastPage ): void { DB::transaction(function () use ($endpoint, $dateFrom, $dateTo, $lastPage) { $sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo); if ($sync->batch_id) { Log::info('WB sync batch already created', [ 'id' => $sync->id, ]); return; } $sync->update([ 'total_pages' => $lastPage, 'processed_pages' => 1, ]); if ($lastPage <= 1) { $this->markSuccess($endpoint, $dateFrom, $dateTo); return; } $jobs = []; for ($page = 2; $page <= $lastPage; $page++) { $jobs[] = new WbSyncJob( $endpoint, $dateFrom, $dateTo, $page ); } $batch = Bus::batch($jobs) ->then(fn() => $this->markSuccess($endpoint, $dateFrom, $dateTo)) ->catch(fn() => $this->markFailed($endpoint, $dateFrom, $dateTo)) ->name("sync-$endpoint->name-$dateFrom") ->dispatch(); $sync->update([ 'batch_id' => $batch->id, ]); Log::info('WB batch dispatched', [ 'batch_id' => $batch->id, 'pages' => $lastPage, ]); }); } public function incrementProgress( WbEndpoint $endpoint, string $dateFrom, ?string $dateTo ): void { $sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo); $sync->incrementProcessedPages(); Log::info('WB progress incremented', [ 'id' => $sync->id, 'processed_pages' => $sync->processed_pages + 1, ]); } public function markSuccess( WbEndpoint $endpoint, string $dateFrom, ?string $dateTo ): void { $sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo); $sync->update([ 'status' => SyncStatus::SUCCESS, ]); Log::info('WB sync success', [ 'id' => $sync->id, ]); } public function markFailed( WbEndpoint $endpoint, string $dateFrom, ?string $dateTo ): void { $sync = $this->getSyncForUpdate($endpoint, $dateFrom, $dateTo); $sync->update([ 'status' => SyncStatus::FAILED, ]); Log::error('WB sync failed', [ 'id' => $sync->id, ]); } private function getSyncForUpdate( WbEndpoint $endpoint, string $dateFrom, ?string $dateTo ): SyncState { return SyncState::query() ->where('entity', $endpoint->name) ->where('date_from', $dateFrom) ->when( $dateTo === null, fn($q) => $q->whereNull('date_to'), fn($q) => $q->where('date_to', $dateTo) ) ->lockForUpdate() ->firstOrFail(); } }