Tech
5
mins read

Google Cloud Pub/Sub with Clojure

Google Cloud Pub/Sub is simple to use from Python, but what about Clojure? There is no official support and the Java interop is not straightforward. There are few libraries but none is active. We ended up building our internal mini-library jonotin, and today we have published the code.

Cheaper infrastructure with Pub/Sub

We process a lot of data. Big part of this is transforming all the patent text into knowledge graphs. That means that every time we improve the code, we need to process text documents into ~30 million graphs for the benefits. With 300 machines this takes around one week. In the beginning we split the data into monthly batches which the machines went through, one by one. This was fine, but as the data grew, the expenses grew too.

We optimised the parsing to the point that the next step could have been moving to C language. The problem with the batches became apparent, as we would have wanted to use the much cheaper preemptible machines. They are cheap machines that have a chance to be shut down any minute, and as processing a single batch could take an hour, a shutdown would cause bad data loss. Our data guru Juuso came up with the plan to process one document at a time, instead of a batch. This required new tools.

Today we are serious about queues.

Message queues are the standard way of solving such a problem. You publish messages to a queue which then can be processed by multiple subscribers. Pub/Sub is Google's version of this pattern. Ideally, we would have wanted to use cloud functions, but the parsing required more memory than the maximum of 2GB the cloud functions offer. Also, the parser machine startup time is never going to be super fast, as only the loading of the required 1GB language model to memory takes few seconds. So, we needed machines that could process a queue. This was actually straightforward in Python with google-cloud-pubsub library. With one week effort, Juuso had saved 75% of graph parsing expenses and all was great.

Pub/Sub from Clojure

We do everything except AI in Clojure, mostly because I love the language. We did the earliest data imports with monthly batches, and it worked well. Not fancy or the most optimal, but saving 100€ for such one time job was not worth neither the effort nor adding a new tool to our system. As we started getting new data sources in and some of the data needed to be processed again, queues became attractive. We already had them in use too, so it wouldn’t be that much extra mental load.

There are few pubsub implementations for Clojure. clj-gcp seems to do the trick, but its pubsub part is written to be used with integrant and is outdated. google-cloud is even more outdated, but the main problem for us was that it didn’t allow setting maximum number of pulled messages.

So why not to use Java interop? With the good experiences from Python and examples from the old libraries, this should be easy. Except that it wasn’t. Clojure interop with Java is decent, but Google has injected some heavy OOP to the Java google-cloud-pubsub. We needed to read through not only the google-cloud-pubsub code but also some code of its dependencies.

Our Pub/Sub wrapper jonotin

We endured more pain than we expected, and that is why we have published the code (github). Hopefully it helps someone out there. It’s 70 lines of Java interop with just two functions: publish! and subscribe!. There will be use cases it doesn’t fit right away, but for us this was the simplest solution that gets the job done. Easiest way to test it is use it as a Leiningen dependency.

[jonotin "0.2.1"]

Publishing messages is minimal. The published messages are any strings and jonotin takes only the parameters needed for identifying the Pub/Sub queue.

(require `[jonotin.core :as jonotin])

(jonotin/publish! {:project-name "my-gcloud-project"
                   :topic-name "my-pubsub-topic"
                   :messages ["message to queue"
                              "another message"]})

Subscribing is simple as well. The messages are read as long as there are any in the queue. Every error is caught and handle-error-fn can do whatever is needed with them. Whatever happens, message is acked in the end.

(require `[jonotin.core :as jonotin])

(jonotin/subscribe! {:project-name "my-gcloud-project"
                     :subscription-name "my-pubsub-subscription"
                     :handle-msg-fn (fn [msg]
                                      (println "Handling" msg))
                     :handle-error-fn (fn [e]
                                        (println "Oops!" e))})

The name jonotin means “a thing that queues” in Finnish, a word that nobody ever uses. Most of the Clojure open source libraries from Finland are made by Metosin, a software house from Tampere. They usually come with funky Finnish names, like reitit, muuntaja or  kekkonen. We wanted to follow this tradition.

jonotin from inside

This section might be of interested for you if you plan to modify jonotin, or you are simply interested of the trouble we had to go through,

One detail to remember about publishing to Google Pub/Sub is to convert the string  into byte string before creating the message object. Publisher seems to behave strangely when large amount of messages is passed to it. We use safety limit of 10000 messages.

(-> (PubsubMessage/newBuilder)
    (.setData (ByteString/copyFromUtf8 "message to queue"))
    .build)

Subscribe contains most of the real tricks. For handling the messages we need to implement com.google.cloud.pubsub.v1.MessageReceiver interface and pass it to subscriber.

(let [msg-receiver (reify MessageReceiver
                     (receiveMessage [_ message consumer]
                       (let [data (.toStringUtf8 (.getData message))]
                         (try
                           (handle-msg-fn data)
                           (catch Exception e
                             (if (some? handle-error-fn)
                               (handle-error-fn e)
                               (throw e)))
                           (finally
                             (.ack consumer))))))
      subscriber (.build (Subscriber/newBuilder subscription-name-obj msg-receiver))]

jonotin in Github

Juho Kallio
September 3, 2019
5 min read