Commit 2acfa9ca authored by Ivan Vilata-i-Balaguer's avatar Ivan Vilata-i-Balaguer
Browse files

Merge branch 'multi-peer-reader' into master.

This includes a new working implementation of multi-peer downloads using a
two-phase protocol for the concurrent retrieval of content seeded by several
other clients.

Ouinet protocol v6 and signed HTTP storage v3 are introduced.  They both are
interim verions until some cleanups afecting protocol consistency are applied,
specs updated, and v7 is released.
parents 44794e30 470c4f17
# Partial content signing
# Streamable content signing
- Allow streaming content from origin to client.
- Less memory usage in injector: do not "slurp" whole response.
......@@ -56,7 +56,7 @@ Please note that neither the initial signature nor framing headers (`Transfer-En
```
HTTP/1.1 200 OK
X-Ouinet-Version: 5
X-Ouinet-Version: 6
X-Ouinet-URI: https://example.com/foo
X-Ouinet-Injection: id=d6076384-2295-462b-a047-fe2c9274e58d,ts=1516048310
Date: Mon, 15 Jan 2018 20:31:50 GMT
......@@ -74,9 +74,9 @@ Trailer: Digest, X-Ouinet-Data-Size, X-Ouinet-Sig1
0123456789...
100000;ouisig=BASE64(BSIG(d607…e58d NUL 0 NUL CHASH[0]=SHA2-512(SHA2-512(BLOCK[0]))))
0123456789...
4;ouisig=BASE64(BSIG(d607…e58d NUL 1048576 NUL CHASH[1]=SHA2-512(CHASH[0] SHA2-512(BLOCK[1]))))
4;ouisig=BASE64(BSIG(d607…e58d NUL 1048576 NUL CHASH[1]=SHA2-512(SIG[0] CHASH[0] SHA2-512(BLOCK[1]))))
abcd
0;ouisig=BASE64(BSIG(d607…e58d NUL 2097152 NUL CHASH[2]=SHA2-512(CHASH[1] SHA2-512(BLOCK[2]))))
0;ouisig=BASE64(BSIG(d607…e58d NUL 2097152 NUL CHASH[2]=SHA2-512(SIG[1] CHASH[1] SHA2-512(BLOCK[2]))))
Digest: SHA-256=BASE64(SHA2-256(COMPLETE_BODY))
X-Ouinet-Data-Size: 1048580
X-Ouinet-Sig1: keyId="ed25519=????",algorithm="hs2019",created=1516048311,
......@@ -96,13 +96,15 @@ The signature string for each block covers the following values (separated by nu
This helps detecting an attacker which replies to a range request with a range of the expected length, with correctly signed and ordered blocks, that however starts at the wrong offset.
- A **chain hash** (binary) computed from the chain hash of the previous block and the **data hash** of the block itself: for the i-th block, `DHASH[i]=SHA2-512(BLOCK[i])` and `CHASH[i]=SHA2-512(CHASH[i-1] DHASH[i])`, with `CHASH[0]=SHA2-512(DHASH[0])`.
- A **chain hash** (binary) computed from the chain hash of the previous block and the **data hash** of the block itself: for the i-th block, `DHASH[i]=SHA2-512(BLOCK[i])` and `CHASH[i]=SHA2-512(SIG[i-1] CHASH[i-1] DHASH[i])`, with `CHASH[0]=SHA2-512(DHASH[0])`.
Signing the hash instead of block data itself spares the signer from keeping the whole block in memory for producing the signature (the hash algorithm can be fed as data comes in from the origin).
Using the data block hash instead of its data allows to independently verify the signatures without needing to be in possession of the data itself, just the hashes.
Keeping the injection identifier out of the hash allows to compare the hashes at particular blocks of different injections (if transmitted independently) to ascertain that their data is the same up to that block.
**TODOv6 REVIEW,OBSOLETE** Keeping the injection identifier out of the hash allows to compare the hashes at particular blocks of different injections (if transmitted independently) to ascertain that their data is the same up to that block. **TODO contradicts below**
**TODOv6 REVIEW** Including the previous signature in the hash allows to transitively verify the signatures of previous blocks by verifying the last signature (in case signatures and hashes are retrieved by themselves without the data beforehand). **TODO contradicts above**
The chaining precludes the attacker from reordering correctly signed blocks for this injection. SHA2-512 is used as a compromise between security and speed on 64-bit platforms; although the hash is longer than the slower SHA2-256, it will be seldom transmitted (e.g. for range requests as indicated below).
......@@ -132,7 +134,7 @@ For example, a client having stored the complete response shown above may reply
```
HTTP/1.1 200 OK
X-Ouinet-Version: 4
X-Ouinet-Version: 6
X-Ouinet-URI: https://example.com/foo
X-Ouinet-Injection: id=d6076384-2295-462b-a047-fe2c9274e58d,ts=1516048310
Date: Mon, 15 Jan 2018 20:31:50 GMT
......@@ -157,7 +159,7 @@ In contrast, a client having stored only an incomplete response from the injecto
```
HTTP/1.1 200 OK
X-Ouinet-Version: 4
X-Ouinet-Version: 6
X-Ouinet-URI: https://example.com/foo
X-Ouinet-Injection: id=d6076384-2295-462b-a047-fe2c9274e58d,ts=1516048310
Date: Mon, 15 Jan 2018 20:31:50 GMT
......
......@@ -375,6 +375,8 @@ A cache entry signed using implementations of these primitives different from th
#### Examples
**TODOv6 OBSOLETE**
An injector server using Ed25519 private key `KEY` might construct the following as-yet unsigned cache entry:
```
......@@ -398,7 +400,7 @@ X-Ouinet-Data-Size: 12
The injector server would create the following complete cache entry signature:
```
keyId="ed25519=<key>",algorithm="hs2019",created=1584748800, headers="(response-status) (created) x-Ouinet-version x-Ouinet-uri x-Ouinet-injection date content-type digest x-Ouinet-data-size",signature="<signature-base64>"
keyId="ed25519=<key>",algorithm="hs2019",created=1584748800, headers="(response-status) (created) x-ouinet-version x-ouinet-uri x-ouinet-injection date content-type digest x-ouinet-data-size",signature="<signature-base64>"
```
In this signature, `<key>` stands for the public key associated with the `KEY` private key, and `<signature-base64>` is the base64 encoding of the Ed25519 signature of the following string:
......@@ -406,13 +408,13 @@ In this signature, `<key>` stands for the public key associated with the `KEY` p
```
(response-status): 200
(created): 1584748800
x-Ouinet-version: 4
x-Ouinet-uri: https://example.com/hello
x-Ouinet-injection: id=qwertyuiop-12345,ts=1584748800
x-ouinet-version: 4
x-ouinet-uri: https://example.com/hello
x-ouinet-injection: id=qwertyuiop-12345,ts=1584748800
date: Sat, 21 Mar 2020 00:00:00 GMT
content-type: text/plain
digest: SHA-256=wFNeS+K3n/2TKRMFQ2v4iTFOSj+uwF7P/Lt98xrZ5Ro=
x-Ouinet-data-size: 12
x-ouinet-data-size: 12
```
Lines in this string are separated by newline `\n` characters. The string does not begin with or end in a newline character.
......@@ -421,7 +423,7 @@ The injector server might choose not to create a signature stream for this cache
* `block_size`: `5`
* `injection-id`: `qwertyuiop-12345`
* `header-signature`: `keyId="ed25519=<key>",algorithm="hs2019",created=1584748800, headers="(response-status) (created) x-Ouinet-version x-Ouinet-uri x-Ouinet-injection date content-type",signature="<header-signature-base64>"`
* `header-signature`: `keyId="ed25519=<key>",algorithm="hs2019",created=1584748800, headers="(response-status) (created) x-ouinet-version x-ouinet-uri x-ouinet-injection date content-type",signature="<header-signature-base64>"`
* `block(0)`: `Hello`
* `block(1)`: ` worl`
* `block(2)`: `d!`
......@@ -439,9 +441,9 @@ In the computation of `header-signature` in the above, `<key>` stands for the pu
```
(response-status): 200
(created): 1584748800
x-Ouinet-version: 4
x-Ouinet-uri: https://example.com/hello
x-Ouinet-injection: id=qwertyuiop-12345,ts=1584748800
x-ouinet-version: 4
x-ouinet-uri: https://example.com/hello
x-ouinet-injection: id=qwertyuiop-12345,ts=1584748800
date: Sat, 21 Mar 2020 00:00:00 GMT
content-type: text/plain
```
......@@ -574,6 +576,8 @@ Of these three examples, the last two would be considered equivalent by a recipi
### Peer-to-peer cache entry exchange
**TODOv6 OBSOLETE,INCOMPLETE(multi-peer)**
When a Ouinet client stores a collection of cache entries in its device local storage, it can share these cache entries with other users that wish to access them. By fetching cache entries from other users in this way, without involvement of the injector servers, a Ouinet client can access web content even in cases when it cannot reach the injector servers.
A Ouinet client willing to share its cache entries with others can serve HTTP requests using a protocol very similar to that used by the injector servers. Unlike injector servers, a Ouinet client participating in the distributed cache will only respond to such requests by serving a copy of a cache entry it has stored in its local device storage. Using this system, a client wishing to fetch a cached resource from another client that stores a cache entry for that resource can establish a peer-to-peer connection to that client, send an HTTP request for the cached resource, and retrieve the cache entry. The recipient can then verify the legitimacy of the cache entry, use the resource in a user application, and optionally store the resource in its own local storage.
......
Subproject commit 318c1f0f41594532317bcd03dd8d993da4e390d6
Subproject commit 95888ecdd232f705551b377a0b7afd2fa79c8631
#pragma once
#include "../util/crypto.h"
namespace ouinet { namespace cache {
class ChainHash {
public:
using PrivateKey = util::Ed25519PrivateKey;
using PublicKey = util::Ed25519PublicKey;
using Signature = PublicKey::sig_array_t;
using Hash = util::SHA512;
using Digest = Hash::digest_type;
size_t offset;
Digest chain_digest;
Signature chain_signature;
bool verify(const PublicKey& pk, const std::string& injection_id) const {
return pk.verify(str_to_sign(injection_id, offset, chain_digest), chain_signature);
}
private:
friend class ChainHasher;
static
std::string str_to_sign(
const std::string& injection_id,
size_t offset,
Digest digest)
{
static const auto fmt_ = "%s%c%d%c%s";
return ( boost::format(fmt_)
% injection_id % '\0'
% offset % '\0'
% util::bytes::to_string_view(digest)).str();
}
};
class ChainHasher {
public:
using PrivateKey = ChainHash::PrivateKey;
using Signature = ChainHash::Signature;
using Hash = ChainHash::Hash;
using Digest = ChainHash::Digest;
struct Signer {
const std::string& injection_id;
const PrivateKey& key;
Signature sign(size_t offset, const Digest& chained_digest) const {
return key.sign(ChainHash::str_to_sign(injection_id, offset, chained_digest));
}
};
using SigOrSigner = boost::variant<Signature, Signer>;
public:
ChainHasher()
: _offset(0)
{}
ChainHash calculate_block(size_t data_size, Digest data_digest, SigOrSigner sig_or_signer)
{
Hash chained_hasher;
if (_prev_chained_signature) {
chained_hasher.update(*_prev_chained_signature);
}
if (_prev_chained_digest) {
chained_hasher.update(*_prev_chained_digest);
}
chained_hasher.update(data_digest);
Digest chained_digest = chained_hasher.close();
Signature chained_signature = util::apply(sig_or_signer,
[&] (const Signature& s) { return s; },
[&] (const Signer& s) { return s.sign(_offset, chained_digest); });
size_t old_offset = _offset;
// Prepare for next block
_offset += data_size;
_prev_chained_digest = chained_digest;
_prev_chained_signature = chained_signature;
return {old_offset, chained_digest, chained_signature};
}
void set_prev_chained_digest(Digest prev_chained_digest) {
_prev_chained_digest = prev_chained_digest;
}
void set_offset(size_t offset) {
_offset = offset;
}
const boost::optional<Digest>& prev_chained_digest() const {
return _prev_chained_digest;
}
private:
size_t _offset;
boost::optional<Digest> _prev_chained_digest;
boost::optional<Signature> _prev_chained_signature;
};
}} // namespaces
......@@ -9,7 +9,6 @@
#include "../parse/number.h"
#include "../util/wait_condition.h"
#include "../util/set_io.h"
#include "../util/async_generator.h"
#include "../util/lru_cache.h"
#include "../util/handler_tracker.h"
#include "../bittorrent/dht.h"
......@@ -20,6 +19,7 @@
#include "../constants.h"
#include "../session.h"
#include "../bep5_swarms.h"
#include "multi_peer_reader.h"
#include <ctime>
#include <map>
......@@ -54,24 +54,24 @@ struct GarbageCollector {
TRACK_HANDLER();
Cancel cancel(_cancel);
LOG_DEBUG("Bep5HTTP: Garbage collector started");
LOG_DEBUG("cache/client: Garbage collector started");
while (!cancel) {
sys::error_code ec;
async_sleep(_executor, chrono::minutes(7), cancel, yield[ec]);
if (cancel || ec) break;
LOG_DEBUG("Bep5HTTP: Collecting garbage...");
LOG_DEBUG("cache/client: Collecting garbage...");
http_store.for_each([&] (auto rr, auto y) {
sys::error_code e;
auto k = keep(std::move(rr), y[e]);
if (cancel) ec = asio::error::operation_aborted;
return or_throw(y, e, k);
}, cancel, yield[ec]);
if (ec) LOG_WARN("Bep5HTTP: Collecting garbage: failed"
if (ec) LOG_WARN("cache/client: Collecting garbage: failed"
" ec:", ec.message());
LOG_DEBUG("Bep5HTTP: Collecting garbage: done");
LOG_DEBUG("cache/client: Collecting garbage: done");
}
LOG_DEBUG("Bep5HTTP: Garbage collector stopped");
LOG_DEBUG("cache/client: Garbage collector stopped");
});
}
};
......@@ -79,7 +79,7 @@ struct GarbageCollector {
struct Client::Impl {
// The newest protocol version number seen in a trusted exchange
// (i.e. from injector-signed cached content).
unsigned newest_proto_seen = http_::protocol_version_current;
std::shared_ptr<unsigned> _newest_proto_seen;
asio::executor _ex;
shared_ptr<bt::MainlineDht> _dht;
......@@ -92,7 +92,7 @@ struct Client::Impl {
Announcer _announcer;
GarbageCollector _gc;
map<string, udp::endpoint> _peer_cache;
util::LruCache<bt::NodeID, unique_ptr<DhtLookup>> _dht_lookups;
util::LruCache<std::string, shared_ptr<DhtLookup>> _dht_lookups;
log_level_t _log_level = INFO;
LocalPeerDiscovery _local_peer_discovery;
std::unique_ptr<DhtGroups> _dht_groups;
......@@ -107,7 +107,8 @@ struct Client::Impl {
, unique_ptr<cache::HttpStore> http_store_
, boost::posix_time::time_duration max_cached_age
, log_level_t log_level)
: _ex(dht_->get_executor())
: _newest_proto_seen(std::make_shared<unsigned>(http_::protocol_version_current))
, _ex(dht_->get_executor())
, _dht(move(dht_))
, _uri_swarm_prefix(bep5::compute_uri_swarm_prefix
(cache_pk, http_::protocol_version_current))
......@@ -130,7 +131,18 @@ struct Client::Impl {
dht_group);
}
void serve_local( const http::request<http::empty_body>& req
template<class Body>
static
boost::optional<util::HttpRequestByteRange> get_range(const http::request<Body>& rq)
{
auto rs = util::HttpRequestByteRange::parse(rq[http::field::range]);
if (!rs) return boost::none;
// XXX: We currently support max 1 rage in the request
if ((*rs).size() != 1) return boost::none;
return (*rs)[0];
}
bool serve_local( const http::request<http::empty_body>& req
, GenericStream& sink
, Cancel& cancel
, Yield& yield)
......@@ -139,6 +151,10 @@ struct Client::Impl {
sys::error_code ec;
if (do_log) {
yield.log("cache/client: start\n", req);
}
// Usually we would
// (1) check that the request matches our protocol version, and
// (2) check that we can derive a key to look up the local cache.
......@@ -153,39 +169,85 @@ struct Client::Impl {
if (!boost::regex_match( req_proto.begin(), req_proto.end()
, http_::protocol_version_rx)) {
if (do_log) {
yield.log("Bep5HTTP: Not a Ouinet request\n", req);
yield.log("cache/client: Not a Ouinet request\n", req);
}
return handle_bad_request(sink, req, yield[ec]);
handle_bad_request(sink, req, yield[ec]);
return or_throw(yield, ec, req.keep_alive());
}
auto key = key_from_http_req(req);
if (!key) {
if (do_log) {
yield.log("Bep5HTTP: Cannot derive key from request\n", req);
yield.log("cache/client: Cannot derive key from request\n", req);
}
return handle_bad_request(sink, req, yield[ec]);
handle_bad_request(sink, req, yield[ec]);
return or_throw(yield, ec, req.keep_alive());
}
if (do_log) {
yield.log("Bep5HTTP: Received request for ", *key);
yield.log("cache/client: Received request for ", *key);
}
if (req.method() == http::verb::propfind) {
if (do_log) {
yield.log("cache/client: Serving propfind for ", *key);
}
auto hl = _http_store->load_hash_list(*key, cancel, yield[ec]);
if (do_log) {
yield.log("cache/client: load ec:\"", ec.message(), "\"");
}
if (ec) {
ec = {};
handle_not_found(sink, req, yield[ec]);
return or_throw(yield, ec, bool(!ec));
}
return_or_throw_on_error(yield, cancel, ec, false);
hl.write(sink, cancel, yield[ec].tag("write-propfind"));
if (do_log) {
yield.log("cache/client: write ec:\"", ec.message(), "\"");
}
return or_throw(yield, ec, bool(!ec));
}
cache::reader_uptr rr;
auto range = get_range(req);
if (range) {
rr = _http_store->range_reader(*key, range->first, range->last, ec);
assert(rr);
} else {
rr = _http_store->reader(*key, ec);
}
auto rr = _http_store->reader(*key, ec);
if (ec) {
if (!cancel && do_log) {
yield.log("Bep5HTTP: Not Serving ", *key, " ec:", ec.message());
yield.log("cache/client: Not Serving ", *key, " ec:", ec.message());
}
return handle_not_found(sink, req, yield[ec]);
handle_not_found(sink, req, yield[ec]);
return or_throw(yield, ec, req.keep_alive());
}
if (do_log) {
yield.log("Bep5HTTP: Serving ", *key);
yield.log("cache/client: Serving ", *key);
}
auto s = Session::create(move(rr), cancel, yield[ec].tag("read_header"));
if (!ec) s.flush_response(sink, cancel, yield[ec].tag("flush"));
return or_throw(yield, ec);
if (ec) return or_throw(yield, ec, false);
bool keep_alive = req.keep_alive() && s.response_header().keep_alive();
auto& head = s.response_header();
if (req.method() == http::verb::head) {
head.async_write(sink, cancel, yield[ec].tag("write-head"));
} else {
s.flush_response(sink, cancel, yield[ec].tag("flush"));
}
return or_throw(yield, ec, keep_alive);
}
std::size_t local_size( Cancel cancel
......@@ -198,7 +260,7 @@ struct Client::Impl {
, asio::yield_context yield)
{
// TODO: avoid overlapping with garbage collector
LOG_DEBUG("Bep5HTTP: Purging local cache...");
LOG_DEBUG("cache/client: Purging local cache...");
sys::error_code ec;
_http_store->for_each([&] (auto rr, auto y) {
......@@ -214,12 +276,12 @@ struct Client::Impl {
return false; // remove all entries
}, cancel, yield[ec]);
if (ec) {
LOG_ERROR("Bep5HTTP: Purging local cache: failed"
LOG_ERROR("cache/client: Purging local cache: failed"
" ec:", ec.message());
return or_throw(yield, ec);
}
LOG_DEBUG("Bep5HTTP: Purging local cache: done");
LOG_DEBUG("cache/client: Purging local cache: done");
}
void handle_http_error( GenericStream& con
......@@ -247,18 +309,16 @@ struct Client::Impl {
, http_::response_error_hdr_retrieval_failed, yield);
}
std::set<udp::endpoint> dht_get_peers( bt::NodeID infohash
, Cancel& cancel
, asio::yield_context yield)
shared_ptr<DhtLookup> dht_lookup(std::string swarm_name)
{
auto* lookup = _dht_lookups.get(infohash);
auto* lookup = _dht_lookups.get(swarm_name);
if (!lookup) {
lookup = _dht_lookups.put( infohash
, make_unique<DhtLookup>(_dht, infohash));
lookup = _dht_lookups.put( swarm_name
, make_shared<DhtLookup>(_dht, swarm_name));
}
return (*lookup)->get(cancel, yield);
return *lookup;
}
Session load( const std::string& key
......@@ -266,86 +326,53 @@ struct Client::Impl {
, Cancel cancel
, Yield yield_)
{
Yield yield = yield_.tag("load");
Yield yield = yield_.tag("cache/client/load");
using Clock = std::chrono::steady_clock;
auto start = Clock::now();
bool dbg;
bool dbg = false;
if (log_debug()) dbg = true;
unique_ptr<util::AsyncGenerator<pair<Session, udp::endpoint>>> gen;
namespace err = asio::error;
auto or_throw_ = [&] (sys::error_code ec, Session session = {}) {
if (gen) gen->async_shut_down(yield);
if (dbg && ec != asio::error::operation_aborted) {
using namespace std::chrono;
auto now = Clock::now();
yield.log("Bep5Http: Done. ec: ", ec.message()
, " took:", duration_cast<seconds>(now - start).count()
, "s");
}
return or_throw<Session>(yield, ec, std::move(session));
};
sys::error_code ec;
namespace err = asio::error;
if (dbg) {
yield.log("Requesting from the cache: ", key);
}
{
sys::error_code ec;
auto rs = load_from_local(key, cancel, yield[ec]);
if (dbg) yield.log("Bep5Http: looking up local cache ec:", ec.message());
if (ec == err::operation_aborted) return or_throw_(ec);
if (dbg) yield.log("looking up local cache ec:", ec.message());
if (ec == err::operation_aborted) return or_throw<Session>(yield, ec);
// TODO: Check its age, store it if it's too old but keep trying
// other peers.
if (!ec) return rs;
// Try distributed cache on other errors.
ec = {};
}
gen = make_connection_generator(key, dht_group, dbg, yield);
sys::error_code ec;
while (auto opt_res = gen->async_get_value(cancel, yield[ec])) {
assert(!cancel || ec == err::operation_aborted);
if (cancel) ec = err::operation_aborted;
if (ec == err::operation_aborted) {
return or_throw_(ec);
}
if (ec) continue;
if (dbg) {
yield.log("Bep5Http: Connect to clients done, ec:", ec.message(),
" chosen ep:", opt_res->second, "; fetching...");
}
auto session = std::move(opt_res->first);
auto& hdr = session.response_header();
if (dbg) {
yield.log("Bep5Http: fetch done,",
" ec:", ec.message(), " result:", hdr.result());
}
assert(!cancel || ec == err::operation_aborted);
if (cancel) {
return or_throw_(err::operation_aborted);
}
if (ec || hdr.result() == http::status::not_found) {
continue;
}
// We found the entry
// TODO: Check its age, store it if it's too old but keep trying
// other peers.
_peer_cache[dht_group] = opt_res->second;
return or_throw_(ec, std::move(session));
string debug_tag;
if (dbg) { debug_tag = yield.tag() + "/multi_peer_reader"; };
auto reader = std::make_unique<MultiPeerReader>
( _ex
, _cache_pk
, _local_peer_discovery.found_peers()
, key
, _dht
, dht_group
, dht_lookup(compute_swarm_name(dht_group))
, _newest_proto_seen
, debug_tag);
auto s = Session::create(std::move(reader), cancel, yield[ec].tag("create_session"));
if (!ec) {
s.response_header().set( http_::response_source_hdr // for agent
, http_::response_source_hdr_dist_cache);
}
if (cancel) return or_throw_(asio::error::operation_aborted);
return or_throw_(err::not_found);
return or_throw<Session>(yield, ec, move(s));
}
Session load_from_local( const std::string& key
......@@ -362,193 +389,6 @@ struct Client::Impl {
return or_throw(yield, ec, move(rs));
}
Session load_from_connection( const string& key
, udp::endpoint ep
, Cancel cancel
, asio::yield_context yield)
{
sys::error_code ec;
Cancel timeout_cancel(cancel);