FlowEvents

YaEtl extends events implemented by NodalFlow through symfony/event-dispatcher. You can easily register any existing dispatcher implementing Symfony's EventDispatcherInterface.

YaEtl events are compatible and tested with symfony/event-dispatcher versions 2.8.*, 3.4.* and 4.0.* (php > 7.1).

YaEtl provides each dispatch() call with a YaEtlEvent instance, extending NodalFlow's FlowEvent which extends Symfony Event and implement FlowEventInterface. Each YaEtlEvent instance carries the dispatcher's Flow instance, and eventually a Node instance, when the event is tied to a specific Node.

Have a look at NodalFlow event documentation for more details.

Usage

In order to make it simple to use any kind of EventDispatcherInterface implementation, YaEtl does not instantiate the default Symfony implementation until you actually call $yaEtl->getDispatcher() or register a Callback (the old way).

Flow event are stored as constants in YaEtlEvent, but since it extends FlowEvent, follows the same naming convention (flow.name) and late static binding is used, you can refer to FlowEvent events either by FlowEvent::FLOW_NAME or YaEtlEvent::FLOW_NAME, which means that NodalFlow event subscribers and handler will work with YaEtl. Only the events added by YaEtl can only be mentioned as YaEtlEvent::FLOW_NAME.

You can set your own dispatcher before you use it to register YaEtl events (or just set it already setup) :

$yaEtl->setDispatcher(new CustomDispatcher);

or :

 $yaEtl->setDispatcher($alreadySetupDispatcher);

But you can also just let YaEtl handle instantiation :

$yaEtl->getDispatcher()->addListener('flow.name', function(FlowEventInterface $event) {
    // always set 
    $yaEtl = $event->getFlow();
    // not always set
    $node = $event->getNode();

    // do stuff ...
});

or even :

$yaEtl->getDispatcher()->addSubscriber(new EventSubscriberInterfaceImplementation());

It is important to note that each Flow instance carries its own dispatcher instance. In most cases, it is ok to just register events on the Root Flow as it is the one controlling the executions of all its eventual children. You still get the big picture with Root Flow events, such as start, end and exceptions, but you do not have access to children iteration events (the YaEtlEvent::FLOW_PROGRESS event). If you need more granularity, you will need to register events in each Flow you want to observe.

YaEtlEvent::FLOW_START

Triggered when the Flow starts, the event only carries the flow instance.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_START, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    // do stuff ...
});

YaEtlEvent::FLOW_PROGRESS

Triggered when node iterates in the Flow, the event carries the flow and the iterating node instances.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_PROGRESS, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    $node = $event->getNode();
    // do stuff ...
});

As this is the most called event, a modulo is implemented to only fire it once every $progressMod iteration, plus one at the first record. The default is 1024, you can set it directly on the Flow :

// increase to 100k
$yaEtl->setProgressMod(100000);
// or for full granularity
$yaEtl->setProgressMod(1);

Since the $progressMod modulo is applied to each iterating node iteration count, each iterating node will have an opportunity to fire the event. For example, using a $progressMod of 10 and extracting 10 categories chained to another extractor extracting items in these categories, the FlowEvent::FLOW_PROGRESS event will be fired once with the first extractor and each 10 items found in each categories from the second extractor.

YaEtlEvent::FLOW_CONTINUE

Triggered when a node triggers a continue on the Flow, the event carries the flow and the node instance triggering the continue.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_CONTINUE, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    $node = $event->getNode();
    // do stuff ...
});

YaEtlEvent::FLOW_BREAK

Triggered when a node triggers a break on the Flow, the event carries the flow and the node instance triggering the break.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_BREAK, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    $node = $event->getNode();
    // do stuff ...
});

YaEtlEvent::FLOW_FLUSH

Triggered when a node triggers a break on the Flow, the event carries the flow and the node instance being flushed. This event is purely YaEtl.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_BREAK, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    $node = $event->getNode();
    // do stuff ...
});

YaEtlEvent::FLOW_SUCCESS

Triggered when the Flow completes successfully (eg with no exceptions), the event only carries the flow instance.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_SUCCESS, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    // do stuff ...
});

YaEtlEvent::FLOW_FAIL

Triggered when an exception is raised during Flow execution, the event carries the flow instance, and the node current Node instance in the Flow when the exception was thrown.

$yaEtl->getDispatcher()->addListener(FlowEvent::FLOW_FAIL, function(FlowEventInterface $event) {
    $yaEtl = $event->getFlow();
    $node = $event->getNode();
    // if you need to inspect the exeption 
    $exception = $yaEtl->getFlowStatus()->getException();
    // do stuff ...
});

The original exception is re-thrown by NodalFlow after the execution of FlowEvent::FLOW_FAIL events.

ProgressBarSubscriber

YaEtl comes with a ProgressBarSubscriber class you can use if you want to display a symfony ProgressBar during Flow execution. It is tested against symfony/console versions 2.8.*, 3.4.* and 4.0.* (php > 7.1).

TL;DR:

// this is enough to let YaEtl handle all the setup
new ProgressBarSubscriber($yaEtl);

This will trigger a display similar to (example with one extractor, 100 records and a progressMod of 10):

[YaEtl] Start
    0 [>---------------------------]
   ...
  100 [---------------->-----------]

[YaEtl] Clean Success
[YaEtl](clean) 1 Extractor - 1 Extract - 100 Record (100 Iterations)
[YaEtl] 0 Joiner - 0 Join - 0 Continue - 0 Break - 0 Qualifier - 0 Qualify
[YaEtl] 1 Transformer - 100 Transform - 0 Loader - 0 Load
[YaEtl] 0 Branch - 0 Continue - 0 Break - 0 Flush
[YaEtl] Time : 33ms - Memory: 6 MiB

In details:

$progressSubscriber = new ProgressBarSubscriber;

// you can set a custom ConsoleOutputInterface,
$progressSubscriber->setOutput(new ConsoleOutput);

// a custom OutputInterface (useful for tests)
$progressSubscriber->setOutput(new StreamOutput(fopen('php://memory', 'r+', false));

// then inject it form progress instance:
$progressSubscriber->registerFlow($yaEtl);

// or at a lower level:
$yaEtl->getDispatcher()->addSubscriber($progressSubscriber);

// or even, let YaEtl instantiate a new ConsoleOutput automatically
$yaEtl->getDispatcher()->addSubscriber(new ProgressBarSubscriber);

If you know the number of records in advance, you can set the count in the ProgressBarSubscriber:

$progressSubscriber->setNumRecords($count);

This will set the count in the underlying ProgressBar instance and trigger a more accurate advance with percents.

Verbosity can be set in the injected Console instance or by getting a fresh one from ProgressBarSubscriber:

$output = $progressSubscriber->getOutput();
$output->setVerbosity(OutputInterface::VERBOSITY_DEBUG);

Compatibility

The event implementation is fully compatible with the old Callback strategy. Although deprecated, you do not have to do anything to continue using your CallbackInterface implementations.

Under the hood, a CallbackWrapper implementation of Symfony EventSubscriberInterface is used as a proxy to the CallbackInterface implementation.