Thursday, December 26, 2013

Pipelining and flow control

If you are about to create your own application level protocol on top of TCP to load your backend to its limit you should know about how to design such a protocol. Two things that go into mind immediately are pipelining and flow control.
Pipelining is what you have from the box if you are using stream based transport layer. For higher level protocols one needs not to throw it away, for instance, HTTP 1.1 supports pipelining. In brief this is to eliminate latency and jitter between client and server.
To see what flow control is take a look at Akka IO Write models. This gives an ability to the server to say don't push at me, slow down. Indeed TCP too implements ack based flow control.
How to use it? For instance, imagine you have a queue of tasks on server side that is filled by clients and processed by backend. In case clients send tasks too quick the length of the queue grows. One needs to introduce so named high watermark and low watermark. If queue length is greater than high watermark stop reading from sockets and queue length will decrease. When queue length becomes less than low watermark start reading tasks from sockets again.
Note, to make it possible for clients to adapt to speed you process tasks (actually to adapt window size) one shouldn't make a big gap between high and low watermarks. From the other side small gap means you'll be too often add/remove sockets from the event loop.
Some excerpt from real project that uses libev below
//------------------------------------------------------------------

static request_s *request_new(connection_s *con) {
    request_s *new_request;

    new_request = alloc_data(request_mem_mng);
    if (!new_request) {
        log_err("cannot allocate memory");
        goto err;
    }
    memset(new_request, 0, sizeof(request_s));
    
    // Add to connection's list of requests
    list_add_tail(&new_request->request_list, &con->request_list);
    
    new_request->con = con;
    
    {
        // Flow control
        num_reqs++;
        
        if (num_reqs == REQUEST_HIGH_WATERMARK) {
            list_s *elt;
            connection_s *con;
            for (elt = connection_list.next; elt != &connection_list; elt = elt->next) {
                con = list_elt(elt, connection_s, connection_list);
                ev_io_stop(e_loop, &con->read_watcher);
            }
        }
    
    }
    
    return new_request;
err:
    return NULL;
}

//------------------------------------------------------------------

static void request_del(request_s *req) {
    list_del(&req->request_list);
    list_del(&req->request_wait);

    if (req->data)
        free_data(data_mem_mng, req->data);

    free_data(request_mem_mng, req);

    {
        // Flow control
        num_reqs--;
        
        if (num_reqs == REQUEST_LOW_WATERMARK) {
            list_s *elt;
            connection_s *con;
            for (elt = connection_list.next; elt != &connection_list; elt = elt->next) {
                con = list_elt(elt, connection_s, connection_list);
                ev_io_start(e_loop, &con->read_watcher);
            }
        }
    
    }
}
Full source is available.

No comments:

Post a Comment