Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to simple built-in FileFetcher on import #3881

Merged
merged 16 commits into from
Dec 14, 2022
Merged
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"fmizzell/maquina": "^1.1.0",
"getdkan/contracts": "^1.0.0",
"getdkan/csv-parser": "^1.2.3",
"getdkan/file-fetcher" : "^4.1.0",
"getdkan/harvest": "^1.0.0",
"getdkan/json-schema-provider": "^0.1.2",
"getdkan/locker": "^1.1.0",
Expand Down
5 changes: 0 additions & 5 deletions modules/common/common.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ services:
factory: entity_type.manager:getStorage
arguments: ['node']

dkan.common.file_fetcher:
class: \Drupal\common\FileFetcher\Factory
arguments:
- '@dkan.common.job_store'

dkan.common.dataset_info:
class: \Drupal\common\DatasetInfo
calls:
Expand Down
56 changes: 0 additions & 56 deletions modules/common/src/FileFetcher/Factory.php

This file was deleted.

4 changes: 2 additions & 2 deletions modules/common/tests/src/Traits/CleanUp.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Drupal\Tests\common\Traits;

use Drupal\node\Entity\Node;
use FileFetcher\FileFetcher;
use Drupal\datastore\Plugin\QueueWorker\FileFetcherJob;

/**
*
Expand Down Expand Up @@ -50,7 +50,7 @@ private function removeAllFileFetchingJobs() {
$jobStoreFactory = \Drupal::service('dkan.common.job_store');

/** @var \Drupal\common\Storage\JobStore $jobStore */
$jobStore = $jobStoreFactory->getInstance(FileFetcher::class);
$jobStore = $jobStoreFactory->getInstance(FileFetcherJob::class);
foreach ($jobStore->retrieveAll() as $id) {
$jobStore->remove($id);
}
Expand Down
18 changes: 9 additions & 9 deletions modules/common/tests/src/Unit/Storage/JobStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
use Drupal\Core\Database\Schema;
use Drupal\Core\Database\StatementWrapper;
use Drupal\common\Storage\JobStore;
use Drupal\datastore\Plugin\QueueWorker\FileFetcherJob;

use Contracts\Mock\Storage\Memory;
use FileFetcher\FileFetcher;
use MockChain\Chain;
use MockChain\Sequence;
use PHPUnit\Framework\TestCase;
Expand All @@ -30,7 +30,7 @@ public function testConstruction() {
->add(Connection::class, "schema", Schema::class)
->add(Schema::class, "tableExists", FALSE);

$jobStore = new JobStore(FileFetcher::class, $chain->getMock());
$jobStore = new JobStore(FileFetcherJob::class, $chain->getMock());
$this->assertTrue(is_object($jobStore));
}

Expand Down Expand Up @@ -59,8 +59,8 @@ public function testRetrieve() {
->add(Connection::class, 'query', StatementWrapper::class)
->add(StatementWrapper::class, 'fetchAll', $fieldInfo);

$jobStore = new JobStore(FileFetcher::class, $chain->getMock());
$this->assertEquals($job_data, $jobStore->retrieve("1", FileFetcher::class));
$jobStore = new JobStore(FileFetcherJob::class, $chain->getMock());
$this->assertEquals($job_data, $jobStore->retrieve("1", FileFetcherJob::class));
}

/**
Expand Down Expand Up @@ -90,7 +90,7 @@ public function testRetrieveAll() {
->add(Connection::class, 'query', StatementWrapper::class)
->add(StatementWrapper::class, 'fetchAll', $sequence);

$jobStore = new JobStore(FileFetcher::class, $chain->getMock());
$jobStore = new JobStore(FileFetcherJob::class, $chain->getMock());
$this->assertTrue(is_array($jobStore->retrieveAll()));
}

Expand Down Expand Up @@ -127,7 +127,7 @@ public function testStore() {
->add(StatementWrapper::class, 'fetchAll', $fieldInfo)
->getMock();

$jobStore = new JobStore(FileFetcher::class, $connection);
$jobStore = new JobStore(FileFetcherJob::class, $connection);

$this->assertEquals("1", $jobStore->store(json_encode($jobObject), "1"));
}
Expand All @@ -151,16 +151,16 @@ public function testRemove() {
->add(StatementWrapper::class, 'fetchAll', $fieldInfo)
->getMock();

$jobStore = new JobStore(FileFetcher::class, $connection);
$jobStore = new JobStore(FileFetcherJob::class, $connection);

$this->assertEquals("", $jobStore->remove("1", FileFetcher::class));
$this->assertEquals("", $jobStore->remove("1", FileFetcherJob::class));
}

/**
* Private.
*/
private function getFileFetcher() {
return FileFetcher::get("1", new Memory(), ["filePath" => "file://" . __DIR__ . "/../../data/countries.csv"]);
return FileFetcherJob::get("1", new Memory(), ["filePath" => "file://" . __DIR__ . "/../../data/countries.csv"]);
}

}
1 change: 0 additions & 1 deletion modules/datastore/datastore.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ services:
class: \Drupal\datastore\Service\ResourceLocalizer
arguments:
- '@dkan.metastore.resource_mapper'
- '@dkan.common.file_fetcher'
- '@dkan.common.drupal_files'
- '@dkan.common.job_store'

Expand Down
165 changes: 165 additions & 0 deletions modules/datastore/src/Plugin/QueueWorker/FileFetcherJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
<?php

namespace Drupal\datastore\Plugin\QueueWorker;

use GuzzleHttp\Client;
use Procrastinator\Job\AbstractPersistentJob;
use Procrastinator\Result;

/**
* These can be utilized to make a local copy of a remote file aka fetch a file.
*/
class FileFetcherJob extends AbstractPersistentJob {

/**
* Constructor.
*/
public function __construct(string $identifier, $storage, array $config = NULL) {
parent::__construct($identifier, $storage, $config);

if (!isset($config['filePath'])) {
throw new \Exception("Constructor missing expected config filePath.");
}

$state = [
'source' => $config['filePath'],
'total_bytes' => 0,
'total_bytes_copied' => 0,
'temporary' => FALSE,
'temporary_directory' => $config['temporaryDirectory'] ?? '/tmp',
];

$this->getResult()->setData(json_encode($state));
}

/**
* {@inheritdoc}
*/
protected function runIt() {
$state = $this->setupState($this->getState());
$this->getResult()->setData(json_encode($state));
$info = $this->copy($this->getState(), $this->getResult(), $this->getTimeLimit());
$this->setState($info['state']);
return $info['result'];
}

/**
* Set up the job state.
*
* @param array $state
* Incoming state array.
*
* @return array
* Modified state array.
*/
protected function setupState(array $state): array {
$state['total_bytes'] = PHP_INT_MAX;
$state['temporary'] = TRUE;
$state['destination'] = $this->getTemporaryFilePath($state);

return $state;
}

/**
* Get temporary file path, depending on flag keep_original_filename value.
*
* @param array $state
* State.
*
* @return string
* Temporary file path.
*/
private function getTemporaryFilePath(array $state): string {
$file_name = basename($state['source']);
return "{$state['temporary_directory']}/{$file_name}";
}

/**
* Copy the file to local storage.
*
* @param array $state
* State array.
* @param \Procrastinator\Result $result
* Job result object.
*
* @return array
* Array with two elements: state and result.
*/
public function copy(array $state, Result $result): array {
if (stream_is_local($state['source'])) {
return $this->copyLocal($state, $result);
}
else {
return $this->copyRemote($state, $result);
}
}

/**
* Copy local file to proper local storage.
*
* @param array $state
* State array.
* @param \Procrastinator\Result $result
* Job result object.
*
* @return array
* Array with two elements: state and result.
*/
protected function copyLocal(array $state, Result $result): array {
$this->ensureCreatingForWriting($state['destination']);
if (copy($state['source'], $state['destination'])) {
$result->setStatus(Result::DONE);
}
else {
throw new \Exception("File copy failed.");
}
$state['total_bytes_copied'] = $state['total_bytes'] = filesize($state['destination']);
return ['state' => $state, 'result' => $result];
}

/**
* Copy remote file to local storage.
*
* @param array $state
* State array.
* @param \Procrastinator\Result $result
* Job result object.
*
* @return array
* Array with two elements: state and result.
*/
protected function copyRemote(array $state, Result $result): array {
$client = new Client();
try {
$fout = $this->ensureCreatingForWriting($state['destination']);
$client->get($state['source'], ['sink' => $fout]);
$result->setStatus(Result::DONE);
}
catch (\Exception $e) {
$result->setStatus(Result::ERROR);
$result->setError($e->getMessage());
}

$state['total_bytes_copied'] = $state['total_bytes'] = filesize($state['destination']);
return ['state' => $state, 'result' => $result];
}

/**
* Ensure the destination file can be created.
*
* @param string $to
* The destination filename.
*
* @return false|resource
* File resource.
*/
private function ensureCreatingForWriting(string $to) {
// Delete destination first to avoid appending if existing.
if (file_exists($to)) {
unlink($to);
}
$fout = fopen($to, "w");
return $fout;
}

}
2 changes: 1 addition & 1 deletion modules/datastore/src/Plugin/QueueWorker/ImportJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ protected function setStorageSchema(array $header) {
* Verify headers are unique.
*
* @param array $header
* List of strings
* List of strings.
*
* @throws \Exception
*/
Expand Down
11 changes: 9 additions & 2 deletions modules/datastore/src/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Drupal\datastore;

use Drupal\common\DataResource;
use Drupal\datastore\Plugin\QueueWorker\FileFetcherJob;
use Drupal\common\Storage\JobStoreFactory;
use Procrastinator\Result;
use Symfony\Component\DependencyInjection\ContainerInterface;
Expand All @@ -12,7 +13,6 @@
use Drupal\datastore\Service\ResourceLocalizer;
use Drupal\datastore\Service\Factory\ImportFactoryInterface;
use Drupal\datastore\Service\Info\ImportInfoList;
use FileFetcher\FileFetcher;

/**
* Main services for the datastore.
Expand All @@ -33,6 +33,13 @@ class Service implements ContainerInjectionInterface {
*/
private $importServiceFactory;

/**
* Import info list service.
*
* @var \Drupal\datastore\Service\Info\ImportInfoList
*/
private ImportInfoList $importInfoList;

/**
* Drupal queue.
*
Expand Down Expand Up @@ -204,7 +211,7 @@ public function drop(string $identifier, ?string $version = NULL, bool $local_re
if ($local_resource) {
$this->resourceLocalizer->remove($identifier, $version);
$this->jobStoreFactory
->getInstance(FileFetcher::class)
->getInstance(FileFetcherJob::class)
->remove(substr(str_replace('__', '_', $resource_id), 0, -11));
}
}
Expand Down
Loading