pollora / datamorph
Powerful Flow PHP ETL integration for Laravel
Requires
- php: ^8.2
- flow-php/etl: ^0.5
- guzzlehttp/guzzle: ^7.0
- illuminate/support: ^12.0
- league/csv: ^9.0
- spatie/data-transfer-object: ^3.0
Requires (Dev)
- orchestra/testbench: ^10.0
- phpunit/phpunit: ^11.0
README
Datamorph is a Laravel package that allows you to create and run Flow PHP ETL (Extract, Transform, Load) pipelines in a structured and extensible way. This documentation will guide you through the installation, configuration, and usage of the package.
Table of Contents
- Installation
- Concepts
- Configuration
- Creating an ETL Pipeline
- Running an ETL Pipeline
- Hooks
- Concrete Examples
Installation
Install the package via Composer:
composer require pollora/datamorph
Concepts
Datamorph is built around three main components:
- Extractors: Retrieve data from various sources (databases, APIs, files, etc.)
- Transformers: Transform the retrieved data according to your needs
- Loaders: Load the transformed data to their final destination
These three components are orchestrated in a Pipeline that also manages Hooks that allow you to intervene at different stages of the process.
Configuration
Publish the configuration file:
php artisan vendor:publish --tag=datamorph-config
This will create a config/datamorph.php
file where you can configure your ETL pipelines:
return [ 'pipelines' => [ 'stock' => [ 'hooks' => [ 'before_extract' => [ App\ETL\Stock\Hooks\BeforeStockExtract::class, ], 'after_extract' => [ // Hooks to execute after extraction ], 'before_transform' => [ // Hooks to execute before transformation ], 'after_transform' => [ // Hooks to execute after transformation ], 'before_load' => [ // Hooks to execute before loading ], 'after_load' => [ // Hooks to execute after loading ], 'before_run' => [ App\ETL\Stock\Hooks\BeforeStockRun::class, ], 'after_run' => [ // Hooks to execute after complete execution ], ], ], // Other pipelines... ], ];
Creating an ETL Pipeline
Automatic File Generation
Datamorph includes an Artisan command that automatically generates the necessary files for a new pipeline:
php artisan datamorph:make product
This command will create the following files in the app/ETL/Product/
directory:
ProductExtractor.php
- For data extractionProductTransformer.php
- For data transformationProductLoader.php
- For loading transformed data
Structure of Generated Files
Extractor
<?php declare(strict_types=1); namespace App\ETL\Product; use Flow\ETL\FlowContext; use Pollora\Datamorph\Contracts\Extractor; class ProductExtractor extends Extractor { public function handle(FlowContext $context): array { // Data extraction logic // Returns an array of raw data return []; } }
Transformer
<?php declare(strict_types=1); namespace App\ETL\Product; use Flow\ETL\FlowContext; use Pollora\Datamorph\Contracts\Transformer; class ProductTransformer extends Transformer { public function handle(array $rows, FlowContext $context): array { // Data transformation logic // Receives raw data and returns transformed data return $rows; } }
Loader
<?php declare(strict_types=1); namespace App\ETL\Product; use Flow\ETL\Rows; use Flow\ETL\FlowContext; use Pollora\Datamorph\Contracts\Loader; use Flow\ETL\Loader as FlowLoader; class ProductLoader extends Loader { public function handle(FlowContext $context): FlowLoader { // Data loading logic // Returns a Flow ETL loader return to_memory(); } }
Running an ETL Pipeline
Once your components are implemented and your pipeline is configured, you can run it with the Artisan command:
php artisan datamorph:run stock
This command:
- Checks that the pipeline exists in the configuration
- Checks that the Extractor, Transformer, and Loader classes exist
- Instantiates these classes and creates a Pipeline
- Runs the Pipeline with the configured hooks
Running in Code
You can also run a pipeline programmatically:
use Pollora\Datamorph\Pipeline; use App\ETL\Stock\StockExtractor; use App\ETL\Stock\StockTransformer; use App\ETL\Stock\StockLoader; $pipeline = new Pipeline( 'stock', new StockExtractor(), new StockTransformer(), new StockLoader() ); $pipeline->run();
Hooks
Hooks are a powerful mechanism in Datamorph that allows you to intervene at different stages of an ETL pipeline. There are three ways to implement hooks in Datamorph, each with its own use cases.
Configuration-based Hooks
The first approach is to define hooks in the config/datamorph.php
configuration file. This method is ideal for recurring hooks that need to be applied to every pipeline execution.
Configuration
// config/datamorph.php return [ 'pipelines' => [ 'stock' => [ 'hooks' => [ 'before_extract' => [ App\ETL\Stock\Hooks\BeforeStockExtract::class, ], 'after_extract' => [ App\ETL\Stock\Hooks\AfterStockExtract::class, ], 'before_transform' => [ App\ETL\Stock\Hooks\BeforeStockTransform::class, ], 'after_transform' => [ App\ETL\Stock\Hooks\AfterStockTransform::class, ], 'before_load' => [ App\ETL\Stock\Hooks\BeforeStockLoad::class, ], 'after_load' => [ App\ETL\Stock\Hooks\AfterStockLoad::class, ], 'before_run' => [ App\ETL\Stock\Hooks\BeforeStockRun::class, ], 'after_run' => [ App\ETL\Stock\Hooks\AfterStockRun::class, ], ], ], ], ];
Hook Implementation
Each hook must implement the HookInterface
:
<?php namespace App\ETL\Stock\Hooks; use Closure; use Flow\ETL\DataFrame; use Flow\ETL\Filesystem\SaveMode; use Pollora\Datamorph\Contracts\HookInterface; class BeforeStockRun implements HookInterface { /** * Execute the hook with the given dataframe. * * @param mixed $dataframe The dataframe to process * @param Closure|null $next The next hook to execute * @return mixed */ public function handle(mixed $dataframe, ?Closure $next = null): mixed { // Apply hook logic if ($dataframe instanceof DataFrame) { $dataframe = $dataframe->mode(SaveMode::Overwrite); } // Pass to the next hook in the chain return $next ? $next($dataframe) : $dataframe; } }
Dynamic Hooks
The second approach uses the $context->pipeline->after()
methods to register hooks dynamically during pipeline execution. This method is particularly useful for conditional behaviors or hooks that depend on the current state of the pipeline.
Usage
// In an ETL component (Extractor, Transformer, Loader) public function handle(FlowContext $context): array { // Add a hook after the current operation $context->pipeline->after(function ($dataframe) { // Hook logic log::info("After extraction"); return $dataframe; }); // The operation is automatically detected // ... }
Supported Hook Types
You can pass different types of hooks to the after()
method:
- A Closure (anonymous function) - Will be automatically wrapped in a
DynamicHook
- An instance of a class implementing
HookInterface
- Will be used directly - A class name - The class will be resolved via Laravel's IoC container
Automatic Operation Detection
If you don't explicitly specify the operation, Datamorph will detect it automatically based on the calling context:
- In an
Extractor
, the operation will beextract
- In a
Transformer
, the operation will betransform
- In a
Loader
, the operation will beload
Hooks via before/after Methods
The third approach is to directly implement the before()
and after()
methods in your Extractor
, Transformer
, and Loader
classes. This method is the simplest and most direct for standard behaviors.
Implementation
<?php namespace App\ETL\Stock; use Flow\ETL\FlowContext; use Illuminate\Support\Facades\Log; use Pollora\Datamorph\Contracts\Extractor; class StockExtractor extends Extractor { /** * Extract stock data. */ public function handle(FlowContext $context): array { // Extraction logic return $results; } /** * Method executed before extraction. */ public function before(mixed $dataframe, FlowContext $context): mixed { Log::info("Preparing extraction"); return $dataframe; } /** * Method executed after extraction. */ public function after(mixed $dataframe, FlowContext $context): mixed { Log::info("Extraction completed"); return $dataframe; } }
The before()
and after()
methods are automatically called by the pipeline at the appropriate times, without any additional configuration.
Execution Order and Combining Approaches
All three approaches can be combined in the same pipeline. The execution order is as follows:
- Hooks configured in
config/datamorph.php
- The
before()
method of the relevant ETL component - Main operation (extraction, transformation, loading)
- The
after()
method of the relevant ETL component - Dynamic hooks registered via
$pipeline->before()
and$pipeline->after()
This combination offers great flexibility and can address a variety of use cases.
Usage Examples
Example 1: Configuration for Validation Before Extraction
// config/datamorph.php 'hooks' => [ 'before_extract' => [ App\ETL\Stock\Hooks\ValidateSourceHook::class, ], ]
// App\ETL\Stock\Hooks\ValidateSourceHook.php public function handle(mixed $dataframe, ?Closure $next = null): mixed { // Check if the data source is available if (!$this->isSourceAvailable()) { throw new \RuntimeException("Data source is not available"); } return $next($dataframe); }
Example 2: Dynamic Hooks for Conditional Logging
// In StockExtractor public function handle(FlowContext $context): array { // Add logging hooks if in debug mode if (config('app.debug')) { $context->pipeline->before(function ($dataframe) { Log::debug("Starting transformation"); return $dataframe; }, 'transform'); $context->pipeline->after(function ($dataframe) { Log::debug("Transformation completed"); return $dataframe; }, 'transform'); } // ... }
Example 3: Before/After Methods for Connection Management
// In DatabaseExtractor public function before(mixed $dataframe, FlowContext $context): mixed { // Open database connection $this->connection = DB::connection('source'); Log::info("Database connection established"); return $dataframe; } public function after(mixed $dataframe, FlowContext $context): mixed { // Close connection after extraction if ($this->connection) { $this->connection = null; Log::info("Database connection closed"); } return $dataframe; }
Example 4: Combining Approaches for a Complete Pipeline
// config/datamorph.php - Global hooks 'hooks' => [ 'before_run' => [ App\ETL\Global\Hooks\LogStartHook::class, ], 'after_run' => [ App\ETL\Global\Hooks\LogEndHook::class, ], ] // StockExtractor.php - Before/After methods public function before(mixed $dataframe, FlowContext $context): mixed { // Extraction-specific preparation return $dataframe; } // In a component's handle method - Dynamic hooks public function handle(FlowContext $context): array { // Dynamic hook for a specific case if ($someCondition) { $context->pipeline->after(function ($dataframe) { // Conditional logic return $dataframe; }); } // ... }
This combination of approaches provides you with a flexible and powerful hook system capable of addressing a variety of needs in your ETL pipelines.