Commit c564bd00 authored by Ivan Vilata-i-Balaguer's avatar Ivan Vilata-i-Balaguer
Browse files

Merge branch 'handle-injector-swarm-growth'.

This includes some enhancements to cope with transitory addresses appearing in
injector and bridge swarms, be they fake (attackers or BitTorrent spy nodes)
or genuine (like clients no longer available), as everything seen in the
swarms was never forgotten by the client, which caused the local view of the
swarm to grow indefinitely, causing issues like legitimate injectors never
getting picked for pinging in long-running clients.

  - Use an LRU cache for swarm entries in the client (to eventually drop
    spurious entries).
  - Also, do not ping injectors if a connection to one of them was
    successfully established while waiting for the next ping round.

Further refinement in swarm and ping handling paramenters may be needed, as
well as more sophisticated handling of the ping process.
parents ed49cda0 a414f011
#include <boost/functional/hash.hpp>
#include "client.h"
#include "../utp.h"
#include "../connect_proxy.h"
......@@ -8,6 +10,7 @@
#include "../../bittorrent/is_martian.h"
#include "../../logger.h"
#include "../../util/hash.h"
#include "../../util/lru_cache.h"
#include "../../ssl/util.h"
#include "../../util/handler_tracker.h"
......@@ -17,6 +20,22 @@
#define _INFO(...) LOG_INFO(_LOGPFX, __VA_ARGS__)
#define _ERROR(...) LOG_ERROR(_LOGPFX, __VA_ARGS__)
// It is ok to have many of these as a resort if injectors are not reachable,
// as long as they are fresh in the DHT.
static const std::size_t helper_swarm_capacity = 100;
// It is probably good to drop entries more aggressively from here
// to avoid accumulating spurious fake injector entries
// which may impede trying to ping good injectors.
static const std::size_t injector_swarm_capacity = 50;
// Choose values which would allow trying to ping a single injector entry
// if it was always available in the DHT,
// before we go over the "questionable" period (15 minutes according to BEP5)
// several times in a row.
static const std::size_t injectors_to_ping = 30;
static const auto injector_ping_period = std::chrono::minutes(10);
static const auto injector_ping_period_debug = std::chrono::minutes(2);
static const auto injector_pong_timeout = std::chrono::seconds(60);
using namespace std;
using namespace ouinet;
using namespace ouiservice;
......@@ -51,11 +70,54 @@ choose_multiplexer_for(bt::MainlineDht& dht, const udp::endpoint& ep)
return boost::none;
}
// Based on <https://stackoverflow.com/a/7222201>
// and `../dht_lookup.h`.
namespace boost {
static inline void hash_combine(std::size_t& seed, const asio::ip::address& addr)
{
// IPv4 addresses are encoded as IPv4-mapped IPv6 addresses
// to hash every address consistently regardless of the protocol version,
// so no need to take the protocol into account.
//boost::hash_combine(seed, ep.protocol().protocol());
asio::ip::address_v6::bytes_type addr6{0,0,0,0, 0,0,0,0, 0xff,0xff,0xff,0xff, 0,0,0,0};
if (addr.is_v4()) {
auto addr4 = addr.to_v4().to_ulong();
for (unsigned i = 15; i > 11; --i) {
addr6[i] = addr4 & 0x000000fful;
addr4 >>= 8;
}
} else {
assert(addr.is_v6());
addr6 = addr.to_v6().to_bytes();
}
boost::hash_combine(seed, addr6);
}
static inline void hash_combine(std::size_t& seed, const asio::ip::udp::endpoint& ep)
{
boost::hash_combine(seed, ep.address());
boost::hash_combine(seed, ep.port());
}
}
template <>
struct std::hash<asio::ip::udp::endpoint>
{
inline std::size_t operator() (const asio::ip::udp::endpoint& ep) const
{
std::size_t seed = 0;
boost::hash_combine(seed, ep);
return seed;
}
};
struct Bep5Client::Swarm
{
private:
using Peer = AbstractClient;
using Peers = std::map<asio::ip::udp::endpoint, std::shared_ptr<Peer>>;
using Peers = util::LruCache<asio::ip::udp::endpoint, std::shared_ptr<Peer>>;
private:
Bep5Client* _owner;
......@@ -71,12 +133,14 @@ public:
Swarm( Bep5Client* owner
, bt::NodeID infohash
, shared_ptr<bt::MainlineDht> dht
, size_t capacity
, Cancel& cancel
, bool connect_proxy)
: _owner(owner)
, _dht(move(dht))
, _infohash(infohash)
, _lifetime_cancel(cancel)
, _peers(capacity)
, _connect_proxy(connect_proxy)
{}
......@@ -196,13 +260,11 @@ private:
// Don't connect to self
if (wan_eps.count(ep) || lan_eps.count(ep)) continue;
auto r = _peers.emplace(ep, nullptr);
if (r.second) {
auto p = make_peer(ep);
if (!p) continue;
r.first->second = move(p);
}
auto r = _peers.get(ep);
if (r) continue; // already known, moved to front
auto p = make_peer(ep);
if (!p) continue;
_peers.put(ep, move(p));
}
}
};
......@@ -227,11 +289,11 @@ public:
~InjectorPinger() { _lifetime_cancel(); }
// Let this pinger known that injector was seen from somewhere else so that
// Let this pinger known that injector was directly seen from somewhere else so that
// it can postpone pinging.
void injector_was_seen_now()
{
_last_ping_time = Clock::now();
_injector_was_seen = true;
}
private:
......@@ -242,10 +304,10 @@ private:
_injector_swarm->wait_for_ready(cancel, yield[ec]);
return_or_throw_on_error(yield, cancel, ec);
boost::optional<chrono::steady_clock::time_point> _last_ping_time;
while (!cancel) {
auto injs = _injector_swarm->peers();
_DEBUG("Waiting to ping injectors...");
_injector_was_seen = false;
if (_last_ping_time && (Clock::now() - *_last_ping_time) < _ping_frequency) {
auto d = (*_last_ping_time + _ping_frequency) - Clock::now();
async_sleep(get_executor(), d, cancel, yield);
......@@ -253,17 +315,24 @@ private:
}
_DEBUG("Waiting to ping injectors: done");
bool got_reply = ping_injectors(select_injectors_to_ping(), cancel, yield[ec]);
if (!cancel && ec)
_ERROR("Failed to ping injectors ec:", ec.message());
return_or_throw_on_error(yield, cancel, ec);
bool got_reply = _injector_was_seen;
if (got_reply)
// A succesful direct connection during the pause is taken as a sign of reachability.
_DEBUG("Made connection to injector, announcing as helper (bridge)");
else {
got_reply = ping_injectors(select_injectors_to_ping(), cancel, yield[ec]);
if (!cancel && ec)
_ERROR("Failed to ping injectors ec:", ec.message());
return_or_throw_on_error(yield, cancel, ec);
if (got_reply)
_DEBUG("Got pong from injectors, announcing as helper (bridge)");
}
_last_ping_time = Clock::now();
if (got_reply) {
_DEBUG("Got pong from injectors, announcing as helper (bridge)");
if (got_reply)
_helper_announcer->update();
} else
else
_VERBOSE("Did not get pong from injectors,"
" the network may be down or they may be blocked");
}
......@@ -296,7 +365,7 @@ private:
auto sc = success_cancel.connect([&] { c(); });
sys::error_code ec;
WatchDog wd(ex, chrono::seconds(60), [&] { c(); });
WatchDog wd(ex, injector_pong_timeout, [&] { c(); });
if (ping_one_injector(inj, c, yield[ec])) {
success_cancel();
}
......@@ -311,9 +380,7 @@ private:
}
std::vector<shared_ptr<AbstractClient>> select_injectors_to_ping() {
// Select the first (at most) `max` injectors after shuffling them.
static const unsigned max = 30;
// Select the first (at most) `injectors_to_ping` injectors after shuffling them.
auto injector_map = _injector_swarm->peers();
std::vector<shared_ptr<AbstractClient>> injectors;
injectors.reserve(injector_map.size());
......@@ -321,8 +388,8 @@ private:
injectors.push_back(p.second);
std::shuffle(injectors.begin(), injectors.end(), _random_generator);
if (injectors.size() > max)
injectors.resize(max);
if (injectors.size() > injectors_to_ping)
injectors.resize(injectors_to_ping);
return injectors;
}
......@@ -333,8 +400,8 @@ private:
static const bool _debug = false; // for development testing only
Cancel _lifetime_cancel;
shared_ptr<Bep5Client::Swarm> _injector_swarm;
boost::optional<chrono::steady_clock::time_point> _last_ping_time;
const Clock::duration _ping_frequency = chrono::minutes(_debug ? 2 : 10);
bool _injector_was_seen = false;
const Clock::duration _ping_frequency = (_debug ? injector_ping_period_debug : injector_ping_period);
std::mt19937 _random_generator;
std::unique_ptr<bt::Bep5ManualAnnouncer> _helper_announcer;
};
......@@ -380,7 +447,7 @@ void Bep5Client::start(asio::yield_context)
_INFO("Injector swarm: sha1('", _injector_swarm_name, "'): ", infohash.to_hex());
_injector_swarm.reset(new Swarm(this, infohash, _dht, _cancel, false));
_injector_swarm.reset(new Swarm(this, infohash, _dht, injector_swarm_capacity, _cancel, false));
_injector_swarm->start();
}
......@@ -389,7 +456,7 @@ void Bep5Client::start(asio::yield_context)
_INFO("Helper swarm (bridges): sha1('", _helpers_swarm_name, "'): ", infohash.to_hex());
_helpers_swarm.reset(new Swarm(this, infohash, _dht, _cancel, true));
_helpers_swarm.reset(new Swarm(this, infohash, _dht, helper_swarm_capacity, _cancel, true));
_helpers_swarm->start();
_injector_pinger.reset(new InjectorPinger(_injector_swarm, _helpers_swarm_name, _dht, _cancel));
......@@ -583,12 +650,11 @@ GenericStream Bep5Client::connect( asio::yield_context yield
, " ec:", ec.message());
} else {
_last_working_ep = ret_ep;
if (_injector_pinger) {
_injector_pinger->injector_was_seen_now();
}
if (ret_target == Target::injectors)
if (ret_target == Target::injectors) {
if (_injector_pinger)
_injector_pinger->injector_was_seen_now();
_DEBUG("Connected to injector peer directly; rep:", ret_ep);
else if (ret_target == Target::helpers)
} else if (ret_target == Target::helpers)
_DEBUG("Connected to injector via helper peer (bridge); rep:", ret_ep);
else
assert(0 && "Invalid peer type");
......
......@@ -13,34 +13,34 @@ private:
using KeyVal = std::pair<Key, Value>;
using ListIter = typename std::list<KeyVal>::iterator;
using Map = std::unordered_map<Key, ListIter>;
using MapIter = typename Map::iterator;
using MapIter = typename Map::const_iterator;
public:
class iterator {
class const_iterator {
friend class LruCache;
MapIter i;
public:
iterator(MapIter i) : i(i) {}
KeyVal& operator*() { return *i->second; }
KeyVal* operator->() { return &*i->second; }
const_iterator(MapIter i) : i(i) {}
const KeyVal& operator*() const { return *i->second; }
const KeyVal* operator->() const { return &*i->second; }
iterator& operator++() {
const_iterator& operator++() {
++i;
return *this;
}
iterator operator++(int) {
iterator ret{i};
const_iterator operator++(int) {
const_iterator ret{i};
++i;
return ret;
}
bool operator==(iterator j) const {
bool operator==(const_iterator j) const {
return i == j.i;
}
bool operator!=(iterator j) const {
bool operator!=(const_iterator j) const {
return i != j.i;
}
};
......@@ -100,17 +100,17 @@ public:
// TODO: Currently the returned iterator is not
// ordered by usage.
iterator begin() {
return iterator{_map.begin()};
const_iterator begin() const {
return const_iterator{_map.begin()};
}
// TODO: Currently the returned iterator is not
// ordered by usage.
iterator end() {
return iterator{_map.end()};
const_iterator end() const {
return const_iterator{_map.end()};
}
iterator erase(iterator i) {
const_iterator erase(const_iterator i) {
auto j = i;
++j;
_list.erase(i.i->second);
......@@ -118,7 +118,7 @@ public:
return j;
}
void move_to_front(iterator i) {
void move_to_front(const_iterator i) {
_list.splice(_list.begin(), _list, i.i->second);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment