Consuming SQS Events in Lambda. But with Clojure.

Sean Schulte
6 min readFeb 7, 2018

Clojure is for writing functions. AWS Lambda is for executing functions. It’s a match made in heaven!

I’ve never had a pair of pants that fit this well in my life.

Sometimes, you want your code to execute in response to events on an SQS queue. Since SQS is pull-based, AWS won’t spin up your Lambda function in response to a new event; you have to do a little bit of work to make it happen. In this article, we’ll do that work.

You’ll need a Lambda function that’s running Java8, and you’ll need to deploy the results of lein uberjar to it. You’ll need to set up a Cloudwatch Event that fires on some periodic interval to start your Lambda function. And you’ll need an SQS queue. For our purposes, I’ll assume you’ve done all that.

Our Lambda instance has a time limit. There might be a lot of events on our queue, and if we tried to process them all, we’d run out of time. So: the first thing we need to do is try to avoid that. The heuristic we’re going with is that if the time left before we hit the limit is twice the average time an event takes to process, then we figure we still have enough time.

(defn ample-time-remaining?
"We have ample time for another round if the remaining time is more
than double the average duration so far. If there are somehow no
recorded durations, we assume we have enough time."
[durations time-remaining]
(if (seq durations)
(let [avg-duration (/ (apply + durations) (count durations))]
(> time-remaining (* 2 avg-duration)))
true))

It’s possible this isn’t the ideal timing heuristic for your needs. You can fool around with this for your own needs. If you run out of time while processing an event, your Lambda invocation will end immediately before finishing that event; you want to avoid that.

The next thing we’re going to do is “handle all the SQS messages”.

(defn handle-sqs-msgs
[ctx queue-name work-fn msg->args]
(md/loop [durations []]
(let [start-ms (remaining-ms ctx)]
(md/chain
(sqs/next-message! queue-name)
(fn [msg]
(when msg
(md/chain
(->
(apply work-fn (msg->args msg))
(md/catch
(fn [e]
(println "failed" (.getMessage e)))))
(fn [_]
(let [end-ms (remaining-ms ctx)
duration (- start-ms end-ms)
durations (conj durations duration)]
(when (ample-time-remaining? durations end-ms)
(md/recur durations)))))))))))

What is that function doing? Let’s walk through it.

First, it takes four parameters:

  • ctx is the Lambda “context”; we care about this because it’ll tell us how much time we have until we hit our limit
  • queue-name is the full ARN of the SQS queue we’re reading from
  • work-fn is the function we’re going to execute for each SQS event
  • msg->args is the function that converts an SQS event into a vector we can use to pass arguments into work-fn (this way, our function that actually does work can be decoupled from the need to know anything about the structure of SQS events, or even the fact that it’s executing in response to SQS at all)

Oh, and what’s that remaining-ms function?

(defn remaining-ms
[ctx]
(.getRemainingTimeInMillis ctx))

The ctx is a Java object, so we’ve got a little function that wraps it so we don’t have that ugly “I’m calling a Java method” syntax cluttering up our nice code. You don’t have to do it that way if you don’t want. I guess.

We’re going to loop as long as a) there are still events on the queue, and b) there is still “ample” time remaining for this Lambda invocation.

We take each event from the queue and convert it to arguments with our msg->args function, and apply those to our work-fn.

After that, we calculate how long that event took to handle, and determine if we think we have enough time to handle another event.

Oh. What’s that sqs/next-message! function, you ask? It’s just a Manifold-y wrapper around Amazonica’s SQS receive-message function.

(defn next-message!
[queue]
(md/chain
(md/future
(sqs/receive-message
:queue-url queue
:max-number-of-messages 1
:delete true))
:messages
first
:body
#(json/parse-string % ->kebab-case-keyword)))

It’s possible you’ll want to write yours differently, but this one has the benefit of only grabbing one event at a time so our loop can process it and move on.

You will notice, my new best friend, that handle-sqs-msgs is pretty generic. That’s good! It means we can use this exact same code for multiple Lambda functions that read from SQS queues, and none of them actually need to know anything SQS-specific. Let’s take a look at that.

We know we need to pass in that work-fn to actually do our work. Let’s write that first: it’ll be pretty dumb for this example, because as the writer of this article, I don’t know what important work you’re going to be doing.

(defn important-work!
[foo bar]
(your-db/save! foo bar))

I bet you can do something more interesting. Onward we go!

In our example, we’re going to be sending SQS events that have a “foo” and a “bar”. Maybe more … but we don’t have to care.

(defn msg->args
[{:keys [foo bar]}]
[foo bar])

Our msg->args function just needs to take the foo and bar keys and return them in a vector. That’s how we can go from an SQS event that looks like {:foo ”FOO” :bar ”BAR”} right into being able to call (important-work! ”FOO” ”BAR”):

(apply important-work! (msg->args {:foo ”FOO” :bar ”BAR”}))

Okay. Anyway. We have the functions that will actually do our work, and we have the code that’ll consume SQS events and pass them along to said functions. How do we hook them up?

Your namespace needs to say that it implements the right Java interface. Remember that we’re running in Lambda’s Java8 environment: AWS doesn’t know we’re writing in Clojure, we’re just giving them a JAR that they can treat as if it’s written in Java.

(:gen-class
:implements [com.amazonaws.services.lambda.runtime.RequestStreamHandler])

Now that we’re claiming we’re going to implement that interface, we actually have to implement it. That means we need a -handleRequest function. See that camelCase naming? Thanks, Java.

(defn -handleRequest
[this input-stream output-stream ctx]
@(handle-sqs-msgs ctx queue-name important-work! msg->args))

We have that @ there because our code is Manifold-y and we want to make sure it actually gets executed. If you’ve used Manifold, you know about this. In this example we’ve got queue-name available; you’ll probably want to pass that in via configuration, or by looking it up dynamically. And we pass in our two logic-specific functions that do the work, so that handle-sqs-msgs can call them for us.

Note that since we’re invoking our Lambda function in response to a periodic Cloudwatch Event, we don’t care about input-stream. And since we’re not, in this case, “returning” anything, we also don’t care about output-stream. For other Lambda functions, you might care about these, but probably not when you’re consuming an SQS queue.

And that’s it! Toss that handle-sqs-msgs function somewhere, and you can use it in any number of actual Lambda functions that need to consume events from an SQS queue. You get to focus on your important-work!, which is the whole point.

Now get back to work, Stewart.

--

--