So far we've managed to play with NATS pub/sub and extended it with streaming service to create a more reliable messaging queue. The problem is, that by using the default configuration, we have little to no control over where the messages, subscriptions, and information about queue's clients reside. Fortunately, NATS Streaming allows you to use SQL database as a storage, which we will explore in this very post.

Default configuration

By default, when you start your NATS Streaming instance, you get a bunch of useful information printed out to the terminal:

2018/05/23 18:15:50.024705 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.9.2
2018/05/23 18:15:50.025236 [INF] STREAM: ServerID: 2XfP8bSut2sc4duyL9S0Qc
2018/05/23 18:15:50.025253 [INF] STREAM: Go version: go1.9.5
2018/05/23 18:15:50.035027 [INF] Starting nats-server version 1.0.7
2018/05/23 18:15:50.035083 [INF] Git commit [not set]
2018/05/23 18:15:50.042019 [INF] Starting http monitor on 0.0.0.0:8222
2018/05/23 18:15:50.042985 [INF] Listening for client connections on 0.0.0.0:4222
2018/05/23 18:15:50.043060 [INF] Server is ready
2018/05/23 18:15:50.072117 [INF] STREAM: Recovering the state...
2018/05/23 18:15:50.072158 [INF] STREAM: No recovered state
2018/05/23 18:15:50.325427 [INF] STREAM: Message store is MEMORY
2018/05/23 18:15:50.335761 [INF] STREAM: ---------- Store Limits ----------
2018/05/23 18:15:50.335818 [INF] STREAM: Channels:                  100 *
2018/05/23 18:15:50.335862 [INF] STREAM: --------- Channels Limits --------
2018/05/23 18:15:50.335872 [INF] STREAM:   Subscriptions:          1000 *
2018/05/23 18:15:50.335881 [INF] STREAM:   Messages     :       1000000 *
2018/05/23 18:15:50.335889 [INF] STREAM:   Bytes        :     976.56 MB *
2018/05/23 18:15:50.335898 [INF] STREAM:   Age          :     unlimited *
2018/05/23 18:15:50.335906 [INF] STREAM:   Inactivity   :     unlimited *
2018/05/23 18:15:50.335915 [INF] STREAM: ----------------------------------

As you can see, if not defined otherwise, all everything that happens in your NATS cluster will be held in the server's memory, which is, as you probably realize, quite risky. If your server is managed for example by Kubernetes, and for some reason, it has to be torn down and recreated on a different node, you lose the memory and everything that was stored there. This should be the main reason why you should consider using some persistent storage, like files or databases. In my opinion, files come with a similar price as memory, because you need to create a specific place, accessible from different nodes, to make sure your server can use them even after being redeployed. This leaves us with SQL databases, which have been around for such a long time, that we've developed tools to keep them running 24/7, this is why I would recommend using them as the storage here.

Preparing SQL database

In order to be able to use SQL as the storage for NATS, we need to seed the database with appropriate schemas, which are available in nats-io/nats-streaming-server on Github (seeds for Postgres and MySQL). Once you do that, you have to configure NATS Server to actually use that database, by setting flags (--store, --sql_driver and --sql_source) in the initialization command:

# docker-compose.yml
services:
    nats:
        image: nats-streaming:latest
        command: -m 8222 --store SQL --sql_driver postgres --sql_source postgres://postgres@db?sslmode=disable
        ports:
        - "4222:4222"
        - "8222:8222"
    db:
        image: postgres:10
        ports:
        - "15432:5432"
...

The rest of our docker-compose configuration stays the same because after all the services don't care how the subscriptions and messages are stored by the server, as long as it allows them to publish and subscribe. Once you start the environment, you can take a look at the database to see what information is stored there:

postgres=# \dt
        List of relations
Schema |     Name      | Type  |  Owner
--------+---------------+-------+----------
public | channels      | table | postgres
public | clients       | table | postgres
public | messages      | table | postgres
public | serverinfo    | table | postgres
public | storelock     | table | postgres
public | subscriptions | table | postgres
public | subspending   | table | postgres
(7 rows)

We would be immediately interested in the data stored in channels, clients, messages and subscriptions tables:

postgres=# SELECT * FROM channels LIMIT 1;
 id |       name       | maxseq | maxmsgs |  maxbytes  | maxage | deleted
----+------------------+--------+---------+------------+--------+---------
  1 | episodes:publish |      0 | 1000000 | 1024000000 |      0 | f

The data about channels seem simple enough, there is some ID and the name. So far so good. What about clients?

postgres=# SELECT * FROM clients LIMIT 1;
                  id                  |            hbinbox
--------------------------------------+-------------------------------
 09956893-cd6b-44d8-a829-aec0bc706e0b | _INBOX.kEOCu1EHiUDTa6173STzb3

Well, not as pretty as we'd like, but still we got some client ID which may come in handy at some point. Now let's look at subscriptions:

 id | subid | lastsent |                                                                                                                                                                                                 proto                                                                                                       | deleted
----+-------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------
  1 |    13 |        0 | \x080d122464383237346636312d396235632d343737392d383038352d643961316562666338643563221d5f494e424f582e6c7a596163763737326130526a355071537a355173302a485f5354414e2e61636b2e7061354f7856387941576f4d4355524d59336d5478422e657069736f6465733a7075626c6973682e7061354f7856387941576f4d4355524d59336e425a56308008381e422438666136613535632d646530382d346233362d393965392d373632636133333333366138480550015801 | f

Wait, what? Oh, the data is stored in as a binary representation of a proto message... I guess we should not have high expectations about message table either:

postgres=# SELECT * FROM messages LIMIT 1;
 id | seq |      timestamp      | size |                                                                                data
----+-----+---------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------
  1 |   1 | 1526502359154219800 |   80 | \x08011210657069736f6465733a7075626c69736822300a07467269656e6473100c1804222168747470733a2f2f667269656e2e64732f31322f342f657069736f64652e6d70342898b6bb91edb9ce9715

Exactly as we thought. The data is stored in a binary form, which is unreadable to us directly. Fortunately, there is a NATS Streaming prototol documentation and proto definitions which can help us in digging a bit deeper.

Taking a peak

In order to see what are the contents of those mysterious binary fields, we need to create a small tool, which will be called sql-spy. It will connect to the database and decipher the information so that we can actually understand what is stored in the database.

First, we would like to see how the data in subscriptions table looks like. We need to unmarshal the data into SubscriptionResponse proto message, which has two fields:

// github.com/nats-io/go-nats-streaming/pb/protocol.proto
...
type SubscriptionResponse struct {
    AckInbox string `protobuf:"bytes,2,opt,name=ackInbox,proto3" json:"ackInbox,omitempty"`
    Error    string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
}
...

When we process the rows from the database, we can see the following:

...
2018/05/23 20:34:07 Read subscription:
2018/05/23 20:34:07 Ack Inbox: 09956893-cd6b-44d8-a829-aec0bc706e0b
...

As you can see, the Ack Inbox matches the ID of one of the clients found in clients table.

Now we would like to move to arguably the most important table, messages. Those are stored using MsgProto proto message, which looks as follows:

// github.com/nats-io/go-nats-streaming/pb/protocol.proto
...
type MsgProto struct {
    Sequence    uint64 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
    Subject     string `protobuf:"bytes,2,opt,name=subject,proto3" json:"subject,omitempty"`
    Reply       string `protobuf:"bytes,3,opt,name=reply,proto3" json:"reply,omitempty"`
    Data        []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
    Timestamp   int64  `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
    Redelivered bool   `protobuf:"varint,6,opt,name=redelivered,proto3" json:"redelivered,omitempty"`
    CRC32       uint32 `protobuf:"varint,10,opt,name=CRC32,proto3" json:"CRC32,omitempty"`
}
...

Although for example Subject and Timestamp might be useful, we should also unmarshal the Data field as it is stored as proto message as well, as we chose to do that in the first part of the NATS series. The proto message is called PublishEpisodeMessage, so in order to get the full information about the NATS message, we need to run the proto.Unmarshal(..) function on both levels:

// sql-spy/main.go
var p pb.MsgProto
if err := proto.Unmarshal(data, &p); err != nil {
    return errors.Wrap(err, "failed to unmarshal proto")
}

var msg mypb.PublishEpisodeMessage
if err := proto.Unmarshal(p.Data, &msg); err != nil {
    return errors.Wrap(err, "failed to unmarshal message data")
}

Running this code against the database results in the following output:

...
2018/05/23 20:41:34 Read message:
2018/05/23 20:41:34 Subject: episodes:publish
2018/05/23 20:41:34 Timestamp: 1526587460522460800
2018/05/23 20:41:34 Message: {SeriesName:Friends SeasonNo:12 EpisodeNo:6 EpisodeUrl:https://frien.ds/12/6/episode.mp4}
...

Does this look familiar? This is the exact data we send when we hit /publish endpoint in the neatflyx service, which acts as a publisher in our small environment.

If you wish, you could look at the other information stored in the SQL as well, like serverinfo using ServerInfo proto message as well, but those described here should be enough for you to see that the most important, business-related data is stored in the reliable data source in an accessible way. The full source code of this example is available on Github.

Links

Versions

  • go -> 1.10
  • protoc -> 3.0.0
  • nats-server -> 1.1.0
  • postgres -> 10.4
  • docker -> 18.03.0-ce, API -> 1.37
  • docker-compose -> 1.20.1