Commit 79793f02 authored by Ivan Vilata-i-Balaguer's avatar Ivan Vilata-i-Balaguer
Browse files

Merge branch 'http-store-gc'.

This adds several enhancements for HTTP store maintenance:

  - Removal of entries older than `max-cached-age`, as well as temporaries not
    seeing progress.
  - A periodic garbage collector for such entries.
  - Removed entries are no longer announced in the DHT.
  - Reporting of (approximate) local cache size in the status API and client
    front-end.
  - A button to purge the local cache in the client front-end.
  - Cancellable local cache iteration and cleaning operations.
parents 6de0dae0 b279b82f
......@@ -36,6 +36,8 @@ struct Entry {
Clock::time_point successful_update;
Clock::time_point failed_update;
bool to_remove = false;
Entry() = default;
Entry(Announcer::Key key)
......@@ -97,6 +99,18 @@ struct Announcer::Loop {
_timer_cancel = Cancel();
}
void remove(const Key& key) {
Entries::iterator i = entries.begin();
for (; i != entries.end(); ++i)
if (i->first.key == key) break; // found
if (i == entries.end()) return; // not found
// The actual removal is not done here but in the main loop.
i->first.to_remove = true;
// No new entries, so no `_timer_cancel` reset.
}
Clock::duration next_update_after(const Entry& e) const
{
if (e.successful_update == Clock::time_point()
......@@ -211,6 +225,12 @@ struct Announcer::Loop {
assert(!ec);
ec = {};
if (ei->first.to_remove) {
// Marked for removal, drop the entry and get another one.
entries.erase(ei);
continue;
}
// Try inserting three times before moving to the next entry
bool success = false;
for (int i = 0; i != 3; ++i) {
......@@ -234,7 +254,7 @@ struct Announcer::Loop {
Entry e = move(ei->first);
entries.erase(ei);
entries.push_back(move(e));
if (!e.to_remove) entries.push_back(move(e));
if (ll.debug()) { print_entries(); }
}
......@@ -277,6 +297,10 @@ void Announcer::add(Key key)
_loop->add(move(key));
}
void Announcer::remove(const Key& key) {
_loop->remove(key);
}
void Announcer::set_log_level(log_level_t l)
{
_loop->set_log_level(l);
......
......@@ -17,6 +17,7 @@ public:
Announcer(std::shared_ptr<bittorrent::MainlineDht>, log_level_t);
void add(Key key);
void remove(const Key&);
~Announcer();
......
......@@ -6,6 +6,7 @@
#include "../http_sign.h"
#include "../http_store.h"
#include "../../http_util.h"
#include "../../parse/number.h"
#include "../../util/wait_condition.h"
#include "../../util/set_io.h"
#include "../../util/async_generator.h"
......@@ -19,6 +20,7 @@
#include "../../constants.h"
#include "../../session.h"
#include "../../bep5_swarms.h"
#include <ctime>
#include <map>
using namespace std;
......@@ -29,6 +31,51 @@ using udp = asio::ip::udp;
namespace fs = boost::filesystem;
namespace bt = bittorrent;
struct GarbageCollector {
cache::AbstractHttpStore& http_store; // for looping over entries
cache::AbstractHttpStore::keep_func keep; // caller-provided checks
asio::executor _executor;
Cancel _cancel;
GarbageCollector( cache::AbstractHttpStore& http_store
, cache::AbstractHttpStore::keep_func keep
, asio::executor ex)
: http_store(http_store)
, keep(move(keep))
, _executor(ex)
{}
~GarbageCollector() { _cancel(); }
void start()
{
asio::spawn(_executor, [&] (asio::yield_context yield) {
TRACK_HANDLER();
Cancel cancel(_cancel);
LOG_DEBUG("Bep5HTTP: Garbage collector started");
while (!cancel) {
sys::error_code ec;
async_sleep(_executor, chrono::minutes(7), cancel, yield[ec]);
if (cancel || ec) break;
LOG_DEBUG("Bep5HTTP: Collecting garbage...");
http_store.for_each([&] (auto rr, auto y) {
sys::error_code e;
auto k = keep(std::move(rr), y[e]);
if (cancel) ec = asio::error::operation_aborted;
return or_throw(y, e, k);
}, cancel, yield[ec]);
if (ec) LOG_WARN("Bep5HTTP: Collecting garbage: failed"
" ec:", ec.message());
LOG_DEBUG("Bep5HTTP: Collecting garbage: done");
}
LOG_DEBUG("Bep5HTTP: Garbage collector stopped");
});
}
};
struct Client::Impl {
// The newest protocol version number seen in a trusted exchange
// (i.e. from injector-signed cached content).
......@@ -40,8 +87,10 @@ struct Client::Impl {
util::Ed25519PublicKey cache_pk;
fs::path cache_dir;
unique_ptr<cache::AbstractHttpStore> http_store;
boost::posix_time::time_duration max_cached_age;
Cancel lifetime_cancel;
Announcer announcer;
GarbageCollector gc;
map<string, udp::endpoint> peer_cache;
util::LruCache<bt::NodeID, unique_ptr<DhtLookup>> dht_lookups;
log_level_t log_level = INFO;
......@@ -56,7 +105,8 @@ struct Client::Impl {
Impl( shared_ptr<bt::MainlineDht> dht_
, util::Ed25519PublicKey& cache_pk
, fs::path cache_dir
, unique_ptr<cache::AbstractHttpStore> http_store
, unique_ptr<cache::AbstractHttpStore> http_store_
, boost::posix_time::time_duration max_cached_age
, log_level_t log_level)
: ex(dht_->get_executor())
, dht(move(dht_))
......@@ -64,8 +114,12 @@ struct Client::Impl {
(cache_pk, http_::protocol_version_current))
, cache_pk(cache_pk)
, cache_dir(move(cache_dir))
, http_store(move(http_store))
, http_store(move(http_store_))
, max_cached_age(max_cached_age)
, announcer(dht, log_level)
, gc(*http_store, [&] (auto rr, auto y) {
return keep_cache_entry(move(rr), y);
}, ex)
, dht_lookups(256)
, log_level(log_level)
, local_peer_discovery(ex, dht->local_endpoints())
......@@ -130,6 +184,31 @@ struct Client::Impl {
return or_throw(yield, ec);
}
std::size_t local_size( Cancel cancel
, asio::yield_context yield) const
{
return http_store->size(cancel, yield);
}
void local_purge( Cancel cancel
, asio::yield_context yield)
{
// TODO: avoid overlapping with garbage collector
LOG_DEBUG("Bep5HTTP: Purging local cache...");
sys::error_code ec;
http_store->for_each([&] (auto, auto) {
return false; // remove all entries
}, cancel, yield[ec]);
if (ec) {
LOG_ERROR("Bep5HTTP: Purging local cache: failed"
" ec:", ec.message());
return or_throw(yield, ec);
}
LOG_DEBUG("Bep5HTTP: Purging local cache: done");
}
void handle_http_error( GenericStream& con
, const http::request<http::empty_body>& req
, http::status status
......@@ -472,6 +551,70 @@ struct Client::Impl {
return *head;
}
// Return maximum if not available.
boost::posix_time::time_duration
cache_entry_age(const http::response_header<>& head)
{
using ssecs = std::chrono::seconds;
using bsecs = boost::posix_time::seconds;
static auto max_age = bsecs(ssecs::max().count());
auto ts_sv = util::http_injection_ts(head);
if (ts_sv.empty()) return max_age; // missing header or field
auto ts_o = parse::number<ssecs::rep>(ts_sv);
if (!ts_o) return max_age; // malformed creation time stamp
auto now = ssecs(std::time(nullptr)); // as done by injector
auto age = now - ssecs(*ts_o);
return bsecs(age.count());
}
inline
void unpublish_cache_entry(const std::string& key)
{
auto empty_groups = _dht_groups->remove(key);
for (const auto& eg : empty_groups) announcer.remove(eg);
}
// Return whether the entry should be kept in storage.
bool keep_cache_entry(cache::reader_uptr rr, asio::yield_context yield)
{
// This should be available to
// allow removing keys of entries to be evicted.
assert(_dht_groups);
sys::error_code ec;
auto hdr = read_response_header(*rr, yield[ec]);
if (ec) return or_throw<bool>(yield, ec);
if (hdr[http_::protocol_version_hdr] != http_::protocol_version_hdr_current) {
LOG_WARN( "Bep5HTTP: Cached response contains an invalid "
, http_::protocol_version_hdr
, " header field; removing");
return false;
}
auto key = hdr[http_::response_uri_hdr];
if (key.empty()) {
LOG_WARN( "Bep5HTTP: Cached response does not contain a "
, http_::response_uri_hdr
, " header field; removing");
return false;
}
auto age = cache_entry_age(hdr);
if (age > max_cached_age) {
LOG_DEBUG( "Bep5HTTP: Cached response is too old; removing: "
, age, " > ", max_cached_age
, "; uri=", key );
unpublish_cache_entry(key.to_string());
return false;
}
return true;
}
void announce_stored_data(asio::yield_context y)
{
Cancel cancel(lifetime_cancel);
......@@ -483,31 +626,9 @@ struct Client::Impl {
if (e) return or_throw(y, e);
http_store->for_each([&] (auto rr, auto yield) {
sys::error_code ec;
auto hdr = read_response_header(*rr, yield[ec]);
if (ec) return or_throw<bool>(yield, ec);
if (hdr[http_::protocol_version_hdr] != http_::protocol_version_hdr_current) {
LOG_WARN( "Bep5HTTP: Cached response contains an invalid "
, http_::protocol_version_hdr
, " header field; removing");
return false;
}
auto key = hdr[http_::response_uri_hdr];
if (key.empty()) {
LOG_WARN( "Bep5HTTP: Cached response does not contain a "
, http_::response_uri_hdr
, " header field; removing");
_dht_groups->remove(key.to_string());
return false;
}
return true;
}, y[e]);
return keep_cache_entry(std::move(rr), yield);
}, cancel, y[e]);
if (e) return or_throw(y, e);
for (auto dht_group : _dht_groups->groups()) {
announcer.add(compute_swarm_name(dht_group));
......@@ -537,6 +658,7 @@ std::unique_ptr<Client>
Client::build( shared_ptr<bt::MainlineDht> dht
, util::Ed25519PublicKey cache_pk
, fs::path cache_dir
, boost::posix_time::time_duration max_cached_age
, log_level_t log_level
, asio::yield_context yield)
{
......@@ -561,12 +683,12 @@ Client::build( shared_ptr<bt::MainlineDht> dht
unique_ptr<Impl> impl(new Impl( move(dht)
, cache_pk, move(cache_dir)
, move(http_store)
, move(http_store), max_cached_age
, log_level));
impl->announce_stored_data(yield[ec]);
if (ec) return or_throw<ClientPtr>(yield, ec);
impl->gc.start();
return unique_ptr<Client>(new Client(move(impl)));
}
......@@ -597,6 +719,18 @@ void Client::serve_local( const http::request<http::empty_body>& req
_impl->serve_local(req, sink, cancel, yield);
}
std::size_t Client::local_size( Cancel cancel
, asio::yield_context yield) const
{
return _impl->local_size(cancel, yield);
}
void Client::local_purge( Cancel cancel
, asio::yield_context yield)
{
_impl->local_purge(cancel, yield);
}
unsigned Client::get_newest_proto_version() const
{
return _impl->get_newest_proto_version();
......
......@@ -5,6 +5,7 @@
#include "../../util/crypto.h"
#include "../../util/yield.h"
#include "../cache_entry.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem.hpp>
namespace ouinet {
......@@ -27,6 +28,7 @@ public:
build( std::shared_ptr<bittorrent::MainlineDht>
, util::Ed25519PublicKey cache_pk
, fs::path cache_dir
, boost::posix_time::time_duration max_cached_age
, log_level_t
, asio::yield_context);
......@@ -47,6 +49,12 @@ public:
, Cancel&
, asio::yield_context);
std::size_t local_size( Cancel
, asio::yield_context) const;
void local_purge( Cancel
, asio::yield_context);
// Get the newest protocol version that has been seen in the network
// (e.g. to warn about potential upgrades).
unsigned get_newest_proto_version() const;
......
#include "http_store.h"
#include <array>
#include <ctime>
#include <boost/asio/buffer.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp>
......@@ -32,6 +35,13 @@
namespace ouinet { namespace cache {
// An entry modified less than this time ago
// is considered recently updated.
//
// Mainly useful to detect temporary entries that
// are no longer being written to.
static const std::time_t recently_updated_secs = 10 * 60; // 10 minutes ago
// Lowercase hexadecimal representation of a SHA1 digest.
static const boost::regex v0_file_name_rx("^[0-9a-f]{40}$");
......@@ -41,9 +51,32 @@ static const boost::regex v1_parent_name_rx("^[0-9a-f]{2}$");
static const boost::regex v1_dir_name_rx("^[0-9a-f]{38}$");
// File names for response components.
static const fs::path head_fname = "head";
static const fs::path body_fname = "body";
static const fs::path sigs_fname = "sigs";
static const fs::path v1_head_fname = "head";
static const fs::path v1_body_fname = "body";
static const fs::path v1_sigs_fname = "sigs";
static
std::size_t
recursive_dir_size(const fs::path& path, sys::error_code& ec)
{
// TODO: make asynchronous?
fs::recursive_directory_iterator dit(path, fs::symlink_option::no_recurse, ec);
if (ec) return 0;
// TODO: take directories themselves into account
// TODO: take block sizes into account
std::size_t total = 0;
for (; dit != fs::recursive_directory_iterator(); ++dit) {
auto p = dit->path();
auto is_file = fs::is_regular_file(p, ec);
if (ec) return 0;
if (!is_file) continue;
auto file_size = fs::file_size(p, ec);
if (ec) return 0;
total += file_size;
}
return total;
}
static
http_response::Head
......@@ -215,7 +248,7 @@ public:
head = http_injection_merge(std::move(h), {});
sys::error_code ec;
auto hf = create_file(head_fname, cancel, ec);
auto hf = create_file(v1_head_fname, cancel, ec);
return_or_throw_on_error(yield, cancel, ec);
headf = std::move(hf);
head.async_write(*headf, cancel, yield);
......@@ -226,7 +259,7 @@ public:
{
if (!sigsf) {
sys::error_code ec;
auto sf = create_file(sigs_fname, cancel, ec);
auto sf = create_file(v1_sigs_fname, cancel, ec);
return_or_throw_on_error(yield, cancel, ec);
sigsf = std::move(sf);
}
......@@ -264,7 +297,7 @@ public:
{
if (!bodyf) {
sys::error_code ec;
auto bf = create_file(body_fname, cancel, ec);
auto bf = create_file(v1_body_fname, cancel, ec);
return_or_throw_on_error(yield, cancel, ec);
bodyf = std::move(bf);
}
......@@ -611,14 +644,14 @@ _http_store_reader_v1( const fs::path& dirp, asio::executor ex
, boost::optional<std::size_t> range_last
, sys::error_code& ec)
{
auto headf = util::file_io::open_readonly(ex, dirp / head_fname, ec);
auto headf = util::file_io::open_readonly(ex, dirp / v1_head_fname, ec);
if (ec) return nullptr;
auto sigsf = util::file_io::open_readonly(ex, dirp / sigs_fname, ec);
auto sigsf = util::file_io::open_readonly(ex, dirp / v1_sigs_fname, ec);
if (ec && ec != sys::errc::no_such_file_or_directory) return nullptr;
ec = {};
auto bodyf = util::file_io::open_readonly(ex, dirp / body_fname, ec);
auto bodyf = util::file_io::open_readonly(ex, dirp / v1_body_fname, ec);
if (ec && ec != sys::errc::no_such_file_or_directory) return nullptr;
ec = {};
......@@ -816,8 +849,21 @@ name_matches_model(const fs::path& name, const fs::path& model)
return true;
}
static
bool
v0_recently_updated(const fs::path& path)
{
auto now = std::time(nullptr);
sys::error_code ec;
auto ts = fs::last_write_time(path, ec);
if (ec) return false;
return (now - ts <= recently_updated_secs);
}
void
HttpStoreV0::for_each(keep_func keep, asio::yield_context yield)
HttpStoreV0::for_each( keep_func keep
, Cancel cancel, asio::yield_context yield)
{
for (auto& p : fs::directory_iterator(path)) {
if (!fs::is_regular_file(p)) {
......@@ -827,8 +873,13 @@ HttpStoreV0::for_each(keep_func keep, asio::yield_context yield)
auto p_name = p.path().filename();
if (name_matches_model(p_name, util::default_temp_model)) {
_DEBUG("Found temporary file: ", p);
v0_try_remove(p); continue;
if (v0_recently_updated(p)) {
_DEBUG("Found recent temporary file: ", p);
} else {
_DEBUG("Found old temporary file: ", p);
v0_try_remove(p);
}
continue;
}
auto& p_name_s = p_name.native();
......@@ -840,7 +891,6 @@ HttpStoreV0::for_each(keep_func keep, asio::yield_context yield)
sys::error_code ec;
auto rr = http_store_reader_v0(p, executor, ec);
if (ec == asio::error::operation_aborted) return;
if (ec) {
_WARN("Failed to open cached response: ", p, " ec:", ec.message());
v0_try_remove(p); continue;
......@@ -848,7 +898,8 @@ HttpStoreV0::for_each(keep_func keep, asio::yield_context yield)
assert(rr);
auto keep_entry = keep(std::move(rr), yield[ec]);
if (ec == asio::error::operation_aborted) return;
if (cancel) ec = asio::error::operation_aborted;
if (ec == asio::error::operation_aborted) return or_throw(yield, ec);
if (ec) {
_WARN("Failed to check cached response: ", p, " ec:", ec.message());
v0_try_remove(p); continue;
......@@ -883,6 +934,17 @@ HttpStoreV0::reader( const std::string& key
return http_store_reader_v0(kpath, executor, ec);
}
std::size_t
HttpStoreV0::size( Cancel cancel
, asio::yield_context yield) const
{
// Do not use `for_each` since it can alter the store.
sys::error_code ec;
auto sz = recursive_dir_size(path, ec);
if (cancel) ec = asio::error::operation_aborted;
return or_throw(yield, ec, sz);
}
// end HttpStoreV0
// begin HttpStoreV0
......@@ -914,8 +976,32 @@ v1_try_remove(const fs::path& path)
// The parent directory may be left empty.
}
static
bool
v1_recently_updated(const fs::path& path)
{
auto now = std::time(nullptr);
std::array<fs::path, 4> paths
{ path
, path / v1_head_fname
, path / v1_body_fname
, path / v1_sigs_fname};
for (const auto& p : paths) {
sys::error_code ec;
auto ts = fs::last_write_time(p, ec);
if (ec) continue;
if (now - ts <= recently_updated_secs)
return true;
}
return false;
}
void
HttpStoreV1::for_each(keep_func keep, asio::yield_context yield)
HttpStoreV1::for_each( keep_func keep
, Cancel cancel, asio::yield_context yield)
{
for (auto& pp : fs::directory_iterator(path)) { // iterate over `DIGEST[:2]` dirs
if (!fs::is_directory(pp)) {
......@@ -937,8 +1023,13 @@ HttpStoreV1::for_each(keep_func keep, asio::yield_context yield)
auto p_name = p.path().filename();
if (name_matches_model(p_name, util::default_temp_model)) {
_DEBUG("Found temporary directory: ", p);
v1_try_remove(p); continue;
if (v1_recently_updated(p)) {
_DEBUG("Found recent temporary directory: ", p);
} else {
_DEBUG("Found old temporary directory: ", p);
v1_try_remove(p);
}
continue;
}
auto& p_name_s = p_name.native();
......@@ -950,7 +1041,6 @@ HttpStoreV1::for_each(keep_func keep, asio::yield_context yield)
sys::error_code ec;
auto rr = http_store_reader_v1(p, executor, ec);
if (ec == asio::error::operation_aborted) return;
if (ec) {
_WARN("Failed to open cached response: ", p, " ec:", ec.message());
v1_try_remove(p); continue;
......@@ -958,7 +1048,8 @@ HttpStoreV1::for_each(keep_func keep, asio::yield_context yield)
assert(rr);
auto keep_entry = keep(std::move(rr), yield[ec]);
if (ec == asio::error::operation_aborted) return;
if (cancel) ec = asio::error::operation_aborted;
if (ec == asio::error::operation_aborted) return or_throw(yield, ec);
if (ec) {
_WARN("Failed to check cached response: ", p, " ec:", ec.message());
v1_try_remove(p); continue;
......@@ -1004,6 +1095,17 @@ HttpStoreV1::reader( const std::string& key
return http_store_reader_v1(kpath, executor, ec);