grpc

Before reading this you might want to check out my posts on Minikube, protobufs and gRPC client-server example.

Recently at work, we ran into an interesting problem (challenge?). We are running our microservice architecture on Kubernetes, and we keep all of our internal communication over gRPC. Even though we are able to scale up particular parts of our applications, we started to face some bottlenecks. After a while, we realized, that even though we have multiple instances of some parts, we have one active gRPC (HTTP/2) connections between each service, which results in a single connection between particular pods.

Like everyone, we would like to find a solution that would be as simple and as little invasive as possible. For a purpose of this post, I created a simple infrastructure that recreates the issue we had. In this case, we have a single Employer service that acts as an HTTP server that is an endpoint for calculating the power of given base and exponent. In order to do the calculations, it connects to Employee service that waits 10 seconds and returns the result. That sleep time simulates some heavy calculations that might be performed to introduce a bottleneck, but more important to measure how much time does it take for each approach to set up the connection, and not to be distracted by the work that has to be performed either way.

Setting the stage

As we'll run the example on Minikube, we need to build the binaries, then Docker images and deploy them to our local cluster. Before we do that, we need to define protobuf messages:

// proto/message/message.proto
syntax = 'proto3';
...
service Worker {
    rpc Work(JobRequest) returns (JobResponse) {}
}

message JobRequest {
    string id = 1;
    float base = 2;
    float exponent = 3;
}

message JobResponse {
    string id = 1;
    string worker_id = 2;
    float result = 3;
}

As you can see, apart from a Job's id and the result, we want to see worker_id which will show us which pod did the calculations. Our Employer application looks like this:

// employer/main.go
...
func main() {
    ...
    conn, err := grpc.Dial(employeeAddr, grpc.WithInsecure())
    ...
    client := pb.NewWorkerClient(conn)
    ...
    http.HandleFunc("/power", func(rw http.ResponseWriter, req *http.Request) {
        ...
        workResp, err := client.Work(req.Context(), &pb.JobRequest{
            Id:       uuid.NewV4().String(),
            Base:     float32(base),
            Exponent: float32(exponent),
        })
        ...
        result := workResp.GetResult()
        log.Infof("Job %s, Worker: %s, Result: %f", workResp.GetId(), workResp.GetWorkerId(), result)
        rw.WriteHeader(http.StatusOK)
        rw.Write([]byte(fmt.Sprintf("(%f)^(%f) = %f\n", base, exponent, result)))
        ...
    })
    ...
}

We create a connection once, at the start of the application, then the same connection is reused for each request between services. In most cases it is sufficient, but you might hit the wall just as we did. Now the simpler part, the Employee service:

// employee/main.go
...
func main() {
    ...
    hn, err := os.Hostname()
    ...
    worker := employeeServer{WorkerID: hn}
    server := grpc.NewServer()
    pb.RegisterWorkerServer(server, worker)
    ...
}

type employeeServer struct {
    WorkerID string
}

func (eS employeeServer) Work(ctx context.Context, req *pb.JobRequest) (*pb.JobResponse, error) {
    base := req.GetBase()
    exponent := req.GetExponent()

    result := math.Pow(float64(base), float64(exponent))

    select {
    case <-time.After(time.Second * 10):
        return &pb.JobResponse{
            Id:       req.GetId(),
            WorkerId: eS.WorkerID,
            Result:   float32(result),
        }, nil
    }
}

Since we have the applications and some simple Dockerfiles, we may deploy them to Kubernetes. We'll have a single replica of an employer service pod and five of employee.

// employer/manifests/employer.yaml
apiVersion: v1
kind: ReplicationController
metadata:
name: employer
spec:
replicas: 1
selector:
    app: employer
...

// employee/manifests/employee.yaml
apiVersion: v1
kind: ReplicationController
metadata:
name: employee
spec:
replicas: 5
selector:
    app: employee
...

We can build binaries, images and deploy them with simple commands, which are available in the source code in Makefile file:

$ make employer/deploy
...
$ make employee/deploy
...

The problem

First, we check if the services were set up correctly:

$ kubectl get pods
NAME                          READY     STATUS    RESTARTS   AGE
employee-895js                1/1       Running   0          14h
employee-92rqz                1/1       Running   0          14h
employee-c95zg                1/1       Running   0          14h
employee-mjv3t                1/1       Running   0          14h
employee-rph33                1/1       Running   0          14h
employer-dt3m4                1/1       Running   0          4h

Now let's check the HTTP endpoint we can hit:

$ kubectl get svc
NAME         CLUSTER-IP   EXTERNAL-IP   PORT(S)          AGE
employee     10.0.0.89    <nodes>       8000:30817/TCP   14h
employer     10.0.0.252   <nodes>       8000:30180/TCP   4h
...
$ minikube ip
192.168.99.100

We can access employee service on 192.168.99.100:30180:

$ curl "http://192.168.99.100:30180/power?base=1&exponent=4" (1.000000)^(4.000000) = 1.000000

Now if we do this multiple times and check the logs, we can se the same worker_id (hostname) doing all the work all the time:

$ kubectl logs employer-dt3m4
...
time="2017-09-25T15:31:07Z" level=info msg="Finished in time: 10.001236803s" 
time="2017-09-25T15:31:07Z" level=info msg="Job cf1cd8b6-8306-4815-a91c-e93f4693cc0f, Worker: employee-895js, Result: 1.000000" 
time="2017-09-25T15:31:07Z" level=info msg="Finished in time: 10.002781024s" 
time="2017-09-25T15:31:07Z" level=info msg="Job 65f47e03-b16b-41a3-a1e6-2b2c85345b3a, Worker: employee-895js, Result: 1.000000" 
time="2017-09-25T15:31:07Z" level=info msg="Finished in time: 10.001165404s" 
time="2017-09-25T15:31:07Z" level=info msg="Job d482a227-db74-4795-a7af-b93ace8877ff, Worker: employee-895js, Result: 1.000000" 
time="2017-09-25T15:31:07Z" level=info msg="Finished in time: 10.001037447s" 
time="2017-09-25T15:31:07Z" level=info msg="Job f5842890-5020-4c9b-a82e-5307d6f5188c, Worker: employee-895js, Result: 1.000000"
...

Ad-hoc connections

The simplest approach would be not to create a single connection at startup, but do that on each request. This makes the overall performance much slower, but we'll set up as many connections as we need:

// employer/main.go
...
func main() {
    ...
    http.HandleFunc("/power", func(rw http.ResponseWriter, req *http.Request) {
        ...
        conn, err := grpc.Dial(employeeAddr, grpc.WithInsecure())
        if err != nil {
            log.Fatalf("Failed to start gRPC connection: %v", err)
        }
        log.Debugf("Connected to employee at %s", employeeAddr)
        defer conn.Close()
        ctx := req.Context()
        client := pb.NewWorkerClient(conn)

        workResp, err := client.Work(ctx, &pb.JobRequest{
            Id:       uuid.NewV4().String(),
            Base:     float32(base),
            Exponent: float32(exponent),
        })
        ...
        log.Infof("Job %s, Worker: %s, Result: %f", workResp.GetId(), workResp.GetWorkerId(), result)
        ...
    })
    ...
}

Now the service logs look much better:

$ kubectl logs employer-zb5lv
...
time="2017-09-25T19:51:56Z" level=info msg="Job ad6d2b0d-b022-45d2-a539-493e2addeb33, Worker: employee-416nw, Result: 1.000000" 
time="2017-09-25T19:51:56Z" level=info msg="Finished in time: 10.004083968s" 
time="2017-09-25T19:51:56Z" level=info msg="Job 942afa78-085c-4e0e-bc33-d27b5f3b0ef7, Worker: employee-hxh9l, Result: 1.000000" 
time="2017-09-25T19:51:56Z" level=info msg="Finished in time: 10.005112791s" 
time="2017-09-25T19:51:56Z" level=info msg="Job 5edf6d87-2f15-4e80-88aa-aeadd91dd255, Worker: employee-416nw, Result: 1.000000" 
time="2017-09-25T19:51:56Z" level=info msg="Finished in time: 10.005831464s" 
time="2017-09-25T19:51:56Z" level=info msg="Job ac298546-6d77-4414-bed1-d6e6e09ce3d8, Worker: employee-3g0v8, Result: 1.000000" 
time="2017-09-25T19:51:56Z" level=info msg="Finished in time: 10.005964618s" 
...

Pooling

We can feel that the previous approach was not the best. We need to set up a connection upon every request, that has to take some time right? Maybe if we set up multiple connections upfront, and then reuse them? We can do that using a simple library called processout/grpc-go-pool: first, we need to define a Factory function that returns a connection. Then using this we create a Pool that exposes a method Get(context.Context) and returns one of the connections (or creates a new one if desired). This way we don't have to connect on every request, and we don't dump all the work on a single pod from employee service:

// employer/main.go
...
func main() {
    ...
    var factory grpcpool.Factory
    factory = func() (*grpc.ClientConn, error) {
        conn, err := grpc.Dial(employeeAddr, grpc.WithInsecure())
        if err != nil {
            log.Fatalf("Failed to start gRPC connection: %v", err)
        }
        log.Infof("Connected to employee at %s", employeeAddr)
        return conn, err
    }

    pool, err := grpcpool.New(factory, 5, 5, time.Second)
    if err != nil {
        log.Fatalf("Failed to create gRPC pool: %v", err)
    }

    http.HandleFunc("/power", func(rw http.ResponseWriter, req *http.Request) {
        ...
        ctx := req.Context()
        conn, err := pool.Get(ctx)
        defer conn.Close()
        if err != nil {
            msg := "failed to connect to worker"
            l.Errorln(errors.Wrap(err, msg))

            rw.WriteHeader(http.StatusInternalServerError)
            rw.Write([]byte(msg))
            return
        }
        client := pb.NewWorkerClient(conn)

        workResp, err := client.Work(ctx, &pb.JobRequest{
            Id:       uuid.NewV4().String(),
            Base:     float32(base),
            Exponent: float32(exponent),
        })
        ...
    })
    ...
}

Load tests

We might rely on our instincts, but it's always better to perform some actual tests and compare response times. To that I used rakyll/hey. I called each of the three versions 5000 times, with 100 concurrent requests:

$ hey -n 5000 -c 100 "http://192.168.99.100:30180/power?base=1&exponent=4" > reports/one-client.txt
...
$ hey -n 5000 -c 100 "http://192.168.99.100:30180/power?base=1&exponent=4" > reports/connection-per-request.txt
...
$ hey -n 5000 -c 100 "http://192.168.99.100:30180/power?base=1&exponent=4" > reports/pool-of-connections.txt
...

The measure I'm interested the most is Latency distribution. Not surprising, smaller percentiles look better for a single connection (10% in 10.0006s vs 10.0027 vs 10.0018s), but the bigger the %, the more we gain from pooling the connections: (99% 10.0267 vs 10.0633 vs 10.0190).

The full source code of this example is available on Github.