Commit 016da17b authored by Martin Marinov's avatar Martin Marinov
Browse files

ReForm, DOcloud: Added job stop criteria based on parsing the log output from...

ReForm, DOcloud: Added job stop criteria based on parsing the log output from CPLEX supplied by the DOcloud REST API. Because this is controlled on the client (and not on the server), the results can change due to synchronization differences, i.e., in many cases exact test results would be difficult to reproduce. The stopping criteria are as follows:

1. 15 sec on stalled jobs after the last new integer solution is found 
2. 5 min on stalled jobs with no solutions found

[git-p4: depot-paths = "//ReForm/ReForm/main/CoMISo/": change = 11992]
parent b5b1b8ae
......@@ -135,8 +135,8 @@ public:
THROW_OUTCOME(TODO);
}
DEB_line(3, "Received Header: " << hdr_);
DEB_line(4, "Received Body: " << bdy_);
DEB_line(6, "Received Header: " << hdr_);
DEB_line(6, "Received Body: " << bdy_);
finalize();
}
......@@ -351,15 +351,19 @@ private:
PTree ptree_;
};
Debug::Stream& operator<<(Debug::Stream& _ds, const JsonTokens& _json_tkns)
Debug::Stream& operator<<(Debug::Stream& _ds, const JsonTokens::PTree& _ptree)
{
DEB_enter_func;
std::stringstream os;
boost::property_tree::json_parser::write_json(os, _json_tkns.ptree());
boost::property_tree::json_parser::write_json(os, _ptree);
_ds << os.str();
return _ds;
}
Debug::Stream& operator<<(Debug::Stream& _ds, const JsonTokens& _json_tkns)
{
return _ds << _json_tkns.ptree();
}
void throw_http_error(const int _err_code, const std::string& _bdy)
{
DEB_enter_func;
......@@ -422,13 +426,27 @@ public:
void wait();
void sync_status();
void sync_log();
bool active() const; // requires synchronized status
bool stalled() const
{
// exit quick if we have a solution, or wait 5 min if we don't have one
return (sol_nmbr_ > 0 && stld_sec_nmbr_ >= 15) ||
(sol_nmbr_ == 0 && stld_sec_nmbr_ >= 300);
}
void abort();
void solution(std::vector<double>& _x) const;
private:
const std::string filename_;
std::string url_;
JsonTokens stts_;
// these variables are initialized in start()
int log_seq_idx_; // the log sequence number, used to get DOcloud log entires
int sol_nmbr_; // number of solutions found so far, according to the log
int sol_sec_nmbr_; // number of seconds at the last new solution
int stld_sec_nmbr_; // number of seconds since the last new solution
private:
void make();
......@@ -502,10 +520,14 @@ void Job::start()
post.perform();
HeaderTokens hdr_tkns(post.header());
check_http_error(post, hdr_tkns, 204);
log_seq_idx_ = sol_nmbr_ = sol_sec_nmbr_ = stld_sec_nmbr_ = 0;
}
void Job::sync_status()
{
DEB_enter_func;
cURLpp::Get get;
THROW_OUTCOME_if(!get.valid(), TODO); //Failed to initialize the request
get.set_url(url_.data());
......@@ -515,6 +537,84 @@ void Job::sync_status()
check_http_error(get, hdr_tkns, 200);
stts_.set(get.body());
/*
// The code below attempted to analyse the status data to find out the
// progress of the solver. This is an undocumented use and does not seem to
// work so far. Achieved here for potential use in the future.
DEB_line(2, stts_);
const auto& details = stts_.ptree().get_child_optional("details");
if (!details)
return;
DEB_line(2, details.get());
const auto& prg_gap =
details.get().get_child("PROGRESS_GAP").get_value<std::string>();
std::string mip_gap;
const auto mip_gap_it = details.get().find("cplex.mipabsgap");
if (mip_gap_it != details.get().not_found())
mip_gap = mip_gap_it->second.get_value<std::string>();
DEB_line(2, "Status, MIP gap: " << mip_gap << "; Progress gap: " << prg_gap);
*/
}
void Job::sync_log()
{
DEB_enter_func;
cURLpp::Get get;
THROW_OUTCOME_if(!get.valid(), TODO); //Failed to initialize the request
const std::string url = url_ + "/log/items?start=" +
std::to_string(log_seq_idx_) + "&continuous=true";
get.set_url(url.data());
get.add_http_header(api_key__);
get.perform();
HeaderTokens hdr_tkns(get.header());
check_http_error(get, hdr_tkns, 200);
JsonTokens log(get.body());
// iterate the log items, deb_out messages and analyze for solutions #
for (const auto& log_item : log.ptree())
{
DEB_line_if(log_seq_idx_ == 0, 2, "**** DOcloud log ****");
const auto& records = log_item.second.get_child("records");
for (const auto& record : records)
{// the message ends with \n
const std::string msg = record.second.get_child("message").
get_value<std::string>();
DEB_out(2, record.second.get_child("level").get_value<std::string>() <<
": " << msg);
const int time_str_len = 15;
const char time_str[time_str_len + 1] = "Elapsed time = ";
const auto time_str_idx = msg.find(time_str);
if (time_str_idx == std::string::npos)
continue;
const int sec_nmbr = atoi(msg.data() + time_str_idx + time_str_len);
//DEB_line(1, "# seconds elapsed : " << sec_nmbr);
const int sol_str_len = 12;
const char sol_str[sol_str_len + 1] = "solutions = ";
const auto sol_str_idx = msg.find(sol_str);
if (sol_str_idx == std::string::npos)
continue;
const int sol_nmbr = atoi(msg.data() + sol_str_idx + sol_str_len);
//DEB_line(1, "# solutions found so far: " << sol_nmbr);
if (sol_nmbr > sol_nmbr_) // new solution(s) found
{// update the number of solutions and the time of the last solution found
sol_nmbr_ = sol_nmbr;
sol_sec_nmbr_ = sec_nmbr;
}
stld_sec_nmbr_ = sec_nmbr - sol_sec_nmbr_;
}
log_seq_idx_ = log_item.second.get_child("seqid").get_value<int>() + 1;
}
}
bool Job::active() const
......@@ -546,12 +646,33 @@ bool Job::active() const
*/
}
void Job::abort()
{
std::string exct_stts;
stts_.find_value("executionStatus", exct_stts);
if (exct_stts != "RUNNING")
return; // already aborted or aborting
cURLpp::Delete del;
THROW_OUTCOME_if(!del.valid(), TODO); //Failed to initialize the request
const std::string url = url_ + "/execute";
del.set_url(url.data());
del.add_http_header(api_key__);
del.perform();
HeaderTokens hdr_tkns(del.header());
check_http_error(del, hdr_tkns, 204);
}
void Job::wait()
{
do
{
std::this_thread::sleep_for(std::chrono::seconds(1));
sync_status();
sync_log();
if (stalled())
abort();
} while (active());
}
......@@ -578,7 +699,7 @@ void Job::solution(std::vector<double>& _x) const
check_http_error(get, hdr_tkns, 200);
JsonTokens bdy_tkns(get.body());
DEB_line(3, bdy_tkns);
DEB_line(7, bdy_tkns);
const auto& vrbls = bdy_tkns.ptree().get_child("CPLEXSolution.variables");
const auto n_vrbls = vrbls.size();
......@@ -594,11 +715,11 @@ void Job::solution(std::vector<double>& _x) const
THROW_OUTCOME_if(idx < 0 || idx > n_vrbls, TODO); // Invalid index
_x[idx] = v.second.get_child("value").get_value<double>();
DEB_out(1, "#" << idx << "=" <<
DEB_out(7, "#" << idx << "=" <<
v.second.get_child("value").get_value<std::string>() << "; ");
}
DEB_line(1, "X=" << _x);
DEB_line(3, "X=" << _x);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment