OTHER POSTS RSS flux

Webhooks solution

Some words about webhooks

In a P2P network, making webhooks is a classical solution to keep clients informed from what is happening on our node. This is written and explained in many books like Designing Web APIs, and often the best solution in a real time context like live bootstrapping.

Solution

Returning to our initial problem, whatever solution you choose, ordering is the main deal. Once you have a deterministic order, you can stream in every contexts.

A casual example of a bootstrap streaming of two instances. Imagine you have two programs that keep a kind of cache consistency security. If one instance crash, you always have the second one that store in his instance a cash of the data.

Start with a server that give the following endpoints:

Before reading that, you should be aware of hyper and his simple server syntax. I decide that solution because it’s fast to implements and easy to read.

(&Method::POST, "/bootstrap") => {
    // ask to be registered as a bootstraper,
    // - addr: entry point (ex http://127.0.0.1:3031/insert)
    // - begin: index of the first entry in the database we want
    // - end: index of the last entry in the database we want
    let (addr, begin, end) =
        deserialize::<(String, usize, usize)>(&to_bytes(req.into_body()).await);
    // add a subscriber into the db and start the bootstrap loop if not
    // already done.
    spawn_bootstapper_sender(addr, begin, end, db.clone()).await;
    Ok(Response::new(Body::default()))
}
(&Method::GET, "/size") => Ok(Response::new(Body::from(format!("{}", db.len().await)))),

The bootstrap route is the start of a bootstrapping strategy, it will activate the webhook for our node and start to stream in real time the database.

The loop that will send a stream is limited with a Period, a constant used in the code to limit the bandwidth and also the usage of the database, shared between multiple threads, for our example. The loop will be launched just once and will run since the end of the program life.

/// Add a subscriber to my db that will stream it in real time and start the
/// _"bootstrap thread"_ if not already done
pub async fn spawn_bootstapper_sender(addr: String, begin: usize, end: usize, db: SharedDB) {
    db.add_subscriber(Subscriber::new(addr, begin, end)).await;

    static SPAWN_ONCE: std::sync::Once = std::sync::Once::new();
    SPAWN_ONCE.call_once(|| {
        tokio::spawn(async move {
            loop {
                let f = async {
                    db.send_chunks().await;
                    tokio::time::sleep(BOOTSTRAP_SEND_PERIOD).await;
                };
                tokio::select! {
                    _ = f => continue,
                    // cancel and return if ctrl-c
                    _ = tokio::signal::ctrl_c() => return
                };
            }
        });
    });
}

An attentive reader would have noticed that we could also stop the thread when all subscribers would be “stale”. When nothing happen for a given arbitrary amount of time, we could stop the thread and restart after a next spawn_bootstapper_sender call. We will look how to do that in another article.

The database implement send_chunks function. In that function, we simply want to iterate through the subscribers and send a chunk for each one, in the range they asked for. The chunk size is defined dynamically as the minimum between what we still have to send to the remote node and the MAX_CHUNK_SIZE limit.

    pub async fn send_chunks(&self) {
        let guard = &mut *self.0.lock().await;
        for subscriber in guard.subscribers.iter_mut() {
            if subscriber.index == subscriber.end {
                continue;
            }
            let chunk_size = min(MAX_CHUNK_SIZE, subscriber.end - subscriber.index);
            let modifs = take_chunk(&guard.data, subscriber.index, chunk_size);
            forward_all(&subscriber.addr, &modifs).await;
            subscriber.index += chunk_size;
        }
    }

If you visit the repository, you can also find some ways to improve the implementation.

That could be a great improvement if someone wanted to implement that ;-)

To finish presenting the solution, I have to introduce you to the “/insert” route. As I said before, the insertion take an Delete or an Update. The insertions are managed as an update, so all the CRUD (excepted for the Read) are managed.

(&Method::POST, "/insert") => {
    // update or remove an element from my database
    let modifs = deserialize::<Vec<EntryModif>>(&to_bytes(req.into_body()).await);
    for modif in modifs.iter() {
        match modif {
            EntryModif::Delete(key) => db.remove(key).await,
            EntryModif::Update((key, value)) => db.update(key, value).await,
        };
    }
    Ok(Response::new(Body::default()))
}

The remove and the update are basically the same, just, one call a remove on our database and another call an insert. Each one will look at our subscribers that are streaming the db in the particular chunk of the map. That is why we’re very interested about memory ordering deterministic. We can say if we already sent the value, thanks to the index, and resend it if we want the remote streamer to be up-to-date.

// inform the bootstrapers of an update
let pos = guard.data.iter().position(|(k, _)| *k == *key).unwrap();
for subscriber in guard.subscribers.iter() {
    let end = match subscriber.eof {
        true => guard.data.len(),
        false => subscriber.end,
    };
    if pos >= subscriber.begin && end >= pos && pos <= subscriber.index {
        forward_all(
            &subscriber.addr,
            &vec![EntryModif::Update((key.clone(), value.to_string()))],
        )
        .await;
    }
}

The end value can be linked to the end of the database, when subscribing to the DB, if the end is the same as data.len(), we will always use the full length to keep consistency at the right part of the memory that can overflow the index of end for the last chunks.

Finally, the subscription part! There are two possible implementation. One is to bootstrap from one node only (the current example). The other split the bootstrap between multiple database that theoretically are synchronized copies.

pub async fn _subscribe_to_one(to_addr: String, my_addr: String) {
    let client = Client::new();
    let size = size_request(&client, &to_addr).await;
    // 0: begin, size: end
    subscribe_request(&client, &to_addr, &my_addr, 0, size).await;
}
pub async fn _subscribe_multiple(to_addr: &[String], my_addr: String) {
    assert!(!to_addr.is_empty());
    let client = Client::new();
    let size = size_request(&client, &to_addr[0]).await;
    let c = size / to_addr.len();
    let mut i = 0;
    for t in to_addr {
        subscribe_request(&client, t, &my_addr, i, i + c).await;
        i += c;
    }
    if i < size {
        subscribe_request(&client, &to_addr[0], &my_addr, i, size).await;
    }
}

The multiple subscription simply share his request between multiple nodes offering the bootstrap route. Instead, the “mono-subscription” ask only to one node. Both are working, but both need a possible “retry” management in case the remote nodes crashed.

Staling

In the previous part, we saw how a subscriber link to the DB, but we didn’t define when a DB can dismiss one of these.

One strategy, adapted to a blockchain, would be to give a heartbeat timeout to the subscriber. Each new information sent by our local node to a DB keep alive the subscriber. Since his heartbeat is alright, we continue to send him new information, and then we can remove him from the subscriber’s list.

But here, we have to insert a heartbeat strategy that can take some heavy lines of code.

Another strategy, is to stop to inform a subscriber when he start to be informed by another node!

In other words, when the bootstrap is about to finish, the subscriber can connect itself to the main network, or the main feeder. He can absolutely be informed from multiple sources.

If the “/insert” can return a True or False value, respectively, he accept the new insertion, or he’s already aware of the modification. The node that help the bootstrap know that the subscriber has joined the network, and can be notified as Stale when the index reach the end.

Bootstrap phase: bootstrap phase

Stale phase: staling phase

In conclusion, both methods are possible depending on the topology and the nature of the database you’re sharing. The heartbeat, even if it seems difficult to maintain, is more flexible and avoid desynchronization problems in some cases. The stale detection, is more precise but can lead to hard consistency bugs and has to be tested strongly before being in production.

The two strategies are welcome in the github repository, as well we could judge wich one is more maintanable.

Tests

You can look to the full code in this github repository. Some test cases are described in the README. Basically, we initialize one or two DB with randoms data. Then with the help of a python script like just below, we run in parallel, some bootstrapping nodes. After a while, we stop the python script and we check if values are the same for each instance of linked DB.

# Ex: python insert.py & cargo run -- 3001 3000
# random insertion in the database for perhaps 5 seconds
for _ in range(0, 100):
    key = random.randint(0,100)
    value = random.randint(0,999)
    requests.post('http://127.0.0.1:3000/insert', json=[{'Update': [f'{key}', f'{value}']}])
    time.sleep(0.02)

← data structures  polling →


Comments

Join the discussion for the article on this ticket.