cult3

Marshalling SQS jobs in Laravel

Nov 09, 2015

Table of contents:

  1. What will the end product look like?
  2. Creating the Queue implementation
  3. Creating the Job implementation
  4. Creating the Connector class
  5. Configuring the Queue
  6. Using the new Queue
  7. Conclusion

The ease of being able to queue jobs in Laravel is one of my favourite features of the framework. All you have to do is implement the ShouldQueue interface and the job will automatically be placed onto the queue. It’s so easy!

However, if you are like me and you use AWS, one drawback of the SQS provider is it’s not so easy to accept the jobs from the queue and process them.

Ideally we could just call the marshall() method on the Queue facade from the Controller that accepts the job.

In today’s tutorial we’ll be looking at what you need to build to add this functionality to your Laravel application.

What will the end product look like?

If you are not familiar with the marshall() method, the introduction of this article might be a bit abstract.

When a job is dispatched from SQS to the application, we need to accept it through a Controller and then run the job.

This involves a little bit of jiggery-pokery and so we can hide this behind a simple method call.

The final product of this tutorial will allow your Controller method to look like this:

/**
 * Accept and process the queued job
 *
 * @return Response
 */
public function queue()
{
    return Queue::marshal();
}

Nice and easy, right?

Creating the Queue implementation

The first thing we need to do is to create our SQSQueue implementation:

use Illuminate\Queue\Queue as AbstractQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;

class SQSQueue extends AbstractQueue implements QueueContract
{
}

The whole reason why this is even possible is because Laravel allows you to use your own implementation as long as it abides by the Queue contract.

First I inject an instance of the SQSClient as well as the Request object:

/**
 * The Amazon SQS instance
 *
 * @var SqsClient
 */
protected $sqs;

/**
 * The current request instance
 *
 * @var Request
 */
protected $request;

/**
 * The name of the default
 *
 * @var string
 */
protected $default;

/**
 * @param SqsClient $sqs
 * @param Request $request
 * @param string $default
 * @return void
 */
public function __construct(SqsClient $sqs, Request $request, $default)
{
    $this->sqs = $sqs;
    $this->request = $request;
    $this->default = $default;
}

Next we need to implement each method of the Contract. I’m not going to go through each method as I don’t think it really matters how it works for this tutorial:

/**
 * Push a new job onto the queue
 *
 * @param string $job
 * @param mixed $data
 * @param string $queue
 * @return mixed
 */
public function push($job, $data = "", $queue = null)
{
    return $this->pushRaw($this->createPayload($job, $data), $queue);
}

/**
 * Push a raw payload onto the queue
 *
 * @param string $payload
 * @param string $queue
 * @param array $options
 * @return mixed
 */
public function pushRaw($payload, $queue = null, array $options = array())
{
    $response = $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue),
        'MessageBody' => $payload
    ]);

    return $response->get('MessageId');
}

/**
 * Push a new job onto the queue after a delay
 *
 * @param DateTime|int $delay
 * @param string $job
 * @param mixed $data
 * @param string $queue
 * @return mixed
 */
public function later($delay, $job, $data = "", $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $delay = $this->getSeconds($delay);

    return $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue),
        'MessageBody' => $payload,
        'DelaySeconds' => $delay,
    ])->get('MessageId');
}

/**
 * Pop the next job off of the queue
 *
 * @param string $queue
 * @return Job|null
 */
public function pop($queue = null)
{
    $queue = $this->getQueue($queue);

    $response = $this->sqs->receiveMessage([
        'QueueUrl' => $queue,
        'AttributeNames' => ['ApproximateReceiveCount']
    ]);

    if (count($response['Messages']) > 0) {
        return new Job($this->container, $this->sqs, $response['Messages'][0]);
    }
}

/**
 * Marshal a push queue request and fire the job
 *
 * @return Response
 */
public function marshal()
{
    try {
        $this->createPushedSqsJob($this->marshalPushedJob())->fire();
    } catch (Exception $e) {
        if ('Release' == $e->getMessage()) {
            return new Response('Release job', 412);
        }

        throw $e;
    }

    return new Response('OK');
}

/**
 * Marshal out the pushed job and payload
 *
 * @return array
 */
protected function marshalPushedJob()
{
    $r = $this->request;

    return [
        'MessageId' => $r->header('X-aws-sqsd-msgid'),
        'Body' => $r->getContent(),
        'Attributes' => ['ApproximateReceiveCount' => $r->header('X-aws-sqsd-receive-count')],
    ];
}

/**
 * Create a new SqsPushJob for a pushed job
 *
 * @param array $job
 * @return Job
 */
protected function createPushedSqsJob($job)
{
    return new Job($this->container, $this->sqs, $job);
}

/**
 * Get the queue or return the default
 *
 * @param string|null $queue
 * @return string
 */
public function getQueue($queue)
{
    return $queue ?: $this->default;
}

/**
 * Get the underlying SQS instance
 *
 * @return SqsClient
 */
public function getSqs()
{
    return $this->sqs;
}

/**
 * Get the request instance
 *
 * @return Request
 */
public function getRequest()
{
    return $this->request;
}

/**
 * Set the request instance
 *
 * @param Request $request
 * @return void
 */
public function setRequest(Request $request)
{
    $this->request = $request;
}

The interesting bit of this class is the marshal() method which we will be calling from the Controller.

Creating the Job implementation

Again we need to supply our own implementation of the Job class:

use Illuminate\Queue\Jobs\Job as AbstractJob;
use Illuminate\Contracts\Queue\Job as JobContract;

class Job extends AbstractJob implements JobContract
{
}

First I inject the dependencies:

/**
 * @var Container
 */
private $container;

/**
 * @var SqsClient
 */
protected $sqs;

/**
 * @var array
 */
protected $job;

/**
 * Create a new job instance
 *
 * @param Container $container
 * @param SqsClient $sqs
 * @param array $job
 * @return void
 */
public function __construct(Container $container, SqsClient $sqs, array $job)
{
    $this->sqs = $sqs;
    $this->job = $job;
    $this->container = $container;
}

Next I’ll implement the methods:

/**
 * Fire the job
 *
 * @return void
 */
public function fire()
{
    $body = json_decode($this->getRawBody(), true);

    $this->resolveAndFire($body);
}

/**
 * Get the raw body string for the job
 *
 * @return string
 */
public function getRawBody()
{
    return $this->job['Body'];
}

/**
 * Release the job back into the queue
 *
 * @param int $delay
 * @return void
 */
public function release($delay = 0)
{
    throw new Exception('Release');
}

/**
 * Get the number of times the job has been attempted
 *
 * @return int
 */
public function attempts()
{
    return (int) $this->job['Attributes']['ApproximateReceiveCount'];
}

/**
 * Get the job identifier
 *
 * @return string
 */
public function getJobId()
{
    return $this->job['MessageId'];
}

/**
 * Get the IoC container instance
 *
 * @return Container
 */
public function getContainer()
{
    return $this->container;
}

/**
 * Get the underlying SQS client instance
 *
 * @return SqsClient
 */
public function getSqs()
{
    return $this->sqs;
}

/**
 * Get the underlying raw SQS job
 *
 * @return array
 */
public function getSqsJob()
{
    return $this->job;
}

This class deals with actually firing the queued job.

Creating the Connector class

Finally we can create the Connector class which will be responsible for setting up the SQSClient and the Queue object:

use Aws\Sqs\SqsClient;
use Illuminate\Http\Request;
use Illuminate\Queue\SqsQueue;
use Illuminate\Queue\Connectors\ConnectorInterface;

class Connector implements ConnectorInterface
{
    /**
     * @var Request
     */
    protected $request;

    /**
     * @param Request $request
     * @return void
     */
    public function __construct(Request $request)
    {
        $this->request = $request;
    }

    /**
     * Establish a queue connection
     *
     * @param array $config
     * @return QueueInterface
     */
    public function connect(array $config)
    {
        $config += [
            "version" => "latest",
            "credentials" => [
                "key" => $config["key"],
                "secret" => $config["secret"],
            ],
        ];

        unset($config["key"], $config["secret"]);

        $sqs = SqsClient::factory($config);

        return new Queue($sqs, $this->request, $config["queue"]);
    }
}

Configuring the Queue

Now that we have each of the classes set up, we can configure it to work as part of Laravel.

First I’m going to add a new config option to the queue.php config file:

    'sqspush' => [
    'driver' => 'sqspush',
    'key' => env('SQS_KEY', "),
    'secret' => env('SQS_SECRET', "),
    'queue' => env('SQS_URL', "),
    'region' => env('SQS_REGION', "),
],

Next I will create a QueueServiceProvider and I will place the following in the boot() method.

First I will extend the queue to add my new Connector:

$this->app["queue"]->extend("sqspush", function () use ($app) {
    return new Connector($app["request"]);
});

Next I will rebind the request:

$this->app->rebinding("request", function ($app, $request) {
    if ($app["queue"]->connected("sqspush")) {
        $app["queue"]->connection("sqspush")->setRequest($request);
    }
});

Using the new Queue

Now that you have everything set up you can start using the Queue!

There’s actually nothing to do really. Implementing the ShouldQueue interface will automatically push your jobs onto the queue.

To accept and process the jobs you can simply create a Controller with the following method:

/**
 * Accept and process the queued job
 *
 * @return Response
 */
public function queue()
{
    return Queue::marshal();
}

Conclusion

I really love the simplicity of queuing work in Laravel. It’s incredibly easy to change a job to become asynchronous.

However, it’s not so easy to accept the job from SQS to deal with them.

Fortunately, Laravel is built in a way that makes satisfying our own requirements pretty straight forward.

All we have to do is define three classes that implement the required interfaces.

Although this can seem like we’re getting into the murky depths of the framework, hopefully you’ve seen today that is needn’t be too scary.

Philip Brown

@philipbrown

© Yellow Flag Ltd 2024.