Skip to content

Commit

Permalink
WIP 3 - Scraper thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
jamescowens committed Aug 28, 2021
1 parent 93de2a9 commit 3873f1d
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 124 deletions.
27 changes: 17 additions & 10 deletions src/gridcoin/quorum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1039,17 +1039,22 @@ class SuperblockValidator

const CScraperManifest_shared_ptr manifest = iter->second;

LOCK(manifest->cs_manifest);

// If the manifest for the beacon list is now empty, we cannot
// proceed, but ProjectResolver should always select manifests
// with a beacon list part:
if (manifest->vParts.empty()) {
LogPrintf("ValidateSuperblock(): beacon list part missing.");
return;
}

convergence.AddPart("BeaconList", manifest->vParts[0]);
// Note using fine-grained locking here to avoid lock-order issues and
// also to deal with Clang potential false positive below.
{
LOCK(manifest->cs_manifest);

if (manifest->vParts.empty()) {
LogPrintf("ValidateSuperblock(): beacon list part missing.");
return;
}

convergence.AddPart("BeaconList", manifest->vParts[0]);
}

// Find the offset of the verified beacons project part. Typically
// this exists at vParts offset 1 when a scraper verified at least
Expand All @@ -1058,10 +1063,14 @@ class SuperblockValidator
const auto verified_beacons_entry_iter = std::find_if(
manifest->projects.begin(),
manifest->projects.end(),
[](const CScraperManifest::dentry& entry) {
[manifest](const CScraperManifest::dentry& entry) {
// Clang was giving a false positive on thread safety without this.
LOCK(manifest->cs_manifest);
return entry.project == "VerifiedBeacons";
});

LOCK2(CSplitBlob::cs_mapParts, manifest->cs_manifest);

if (verified_beacons_entry_iter == manifest->projects.end()) {
LogPrintf("ValidateSuperblock(): verified beacon project missing.");
return;
Expand All @@ -1074,8 +1083,6 @@ class SuperblockValidator
return;
}

LOCK(CSplitBlob::cs_mapParts);

convergence.AddPart("VerifiedBeacons", manifest->vParts[part_offset]);
}
}; // ProjectCombiner
Expand Down
217 changes: 116 additions & 101 deletions src/gridcoin/scraper/scraper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3819,6 +3819,9 @@ bool ScraperSaveCScraperManifestToFiles(uint256 nManifestHash)
// This is from the map find above.
const CScraperManifest_shared_ptr manifest = pair->second;

LOCK(manifest->cs_manifest);
_log(logattribute::INFO, "LOCK", "cs_manifest");

// Write out to files the parts. Note this assumes one-to-one part to file. Needs to
// be fixed for more than one part per file.
int iPartNum = 0;
Expand Down Expand Up @@ -3859,6 +3862,7 @@ bool ScraperSaveCScraperManifestToFiles(uint256 nManifestHash)
iPartNum++;
}

_log(logattribute::INFO, "ENDLOCK", "cs_manifest");
_log(logattribute::INFO, "ENDLOCK", "CScraperManifest::cs_mapManifest");

return true;
Expand Down Expand Up @@ -4178,6 +4182,9 @@ unsigned int ScraperDeleteUnauthorizedCScraperManifests()
{
CScraperManifest_shared_ptr manifest = iter->second;

LOCK(manifest->cs_manifest);
_log(logattribute::INFO, "LOCK", "cs_manifest");

// We are not going to do anything with the banscore here, but it is an out parameter of IsManifestAuthorized.
unsigned int banscore_out = 0;

Expand All @@ -4194,6 +4201,8 @@ unsigned int ScraperDeleteUnauthorizedCScraperManifests()
iter = CScraperManifest::DeleteManifest(iter, true);
nDeleted++;
}

_log(logattribute::INFO, "ENDLOCK", "cs_manifest");
}

// End LOCK(CScraperManifest::cs_mapManifest)
Expand All @@ -4211,25 +4220,30 @@ bool ScraperSendFileManifestContents(CBitcoinAddress& Address, CKey& Key) EXCLUS

auto manifest = std::shared_ptr<CScraperManifest>(new CScraperManifest());

// The manifest name is the authorized address of the scraper.
manifest->sCManifestName = Address.ToString();

// Also store local sCManifestName, because the manifest will be std::moved by addManifest.
std::string sCManifestName = Address.ToString();

manifest->nTime = StructScraperFileManifest.timestamp;

// Also store local nTime, because the manifest will be std::moved by addManifest.
int64_t nTime = StructScraperFileManifest.timestamp;

manifest->ConsensusBlock = StructScraperFileManifest.nConsensusBlockHash;
std::string sCManifestName;
int64_t nTime = 0;

// This will have to be changed to support files bigger than 32 MB, where more than one
// part per object will be required.
int iPartNum = 0;

// Inject the BeaconList part.
{
LOCK2(CSplitBlob::cs_mapParts, manifest->cs_manifest);
_log(logattribute::INFO, "LOCK2", "cs_mapParts, cs_manifest");

// The manifest name is the authorized address of the scraper.
manifest->sCManifestName = Address.ToString();

// Also store local sCManifestName, because the manifest will be std::moved by addManifest.
sCManifestName = Address.ToString();

manifest->nTime = StructScraperFileManifest.timestamp;

// Also store local nTime, because the manifest will be std::moved by addManifest.
nTime = StructScraperFileManifest.timestamp;

manifest->ConsensusBlock = StructScraperFileManifest.nConsensusBlockHash;

// Read in BeaconList
fs::path inputfile = "BeaconList.csv.gz";
fs::path inputfilewpath = pathScraper / inputfile;
Expand Down Expand Up @@ -4268,136 +4282,132 @@ bool ScraperSendFileManifestContents(CBitcoinAddress& Address, CKey& Key) EXCLUS

CDataStream part(std::move(vchData), SER_NETWORK, 1);

LOCK(CSplitBlob::cs_mapParts);
_log(logattribute::INFO, "LOCK", "cs_mapParts");

manifest->addPartData(std::move(part));

iPartNum++;

_log(logattribute::INFO, "ENDLOCK", "cs_mapParts");
}
// Inject the VerifiedBeaconList as a "project" called VerifiedBeacons. This is inelegant, but
// will maintain compatibility with older nodes. The older nodes will simply ignore this extra part
// because it will never match any whitelisted project. Only include it if it is not empty.
{
LOCK(cs_VerifiedBeacons);
_log(logattribute::INFO, "LOCK", "cs_VerifiedBeacons");

// Inject the VerifiedBeaconList as a "project" called VerifiedBeacons. This is inelegant, but
// will maintain compatibility with older nodes. The older nodes will simply ignore this extra part
// because it will never match any whitelisted project. Only include it if it is not empty.
{
LOCK(cs_VerifiedBeacons);
_log(logattribute::INFO, "LOCK", "cs_VerifiedBeacons");
ScraperVerifiedBeacons& ScraperVerifiedBeacons = GetVerifiedBeacons();

ScraperVerifiedBeacons& ScraperVerifiedBeacons = GetVerifiedBeacons();
if (!ScraperVerifiedBeacons.mVerifiedMap.empty())
{
CScraperManifest::dentry ProjectEntry;

if (!ScraperVerifiedBeacons.mVerifiedMap.empty())
{
CScraperManifest::dentry ProjectEntry;
ProjectEntry.project = "VerifiedBeacons";
ProjectEntry.LastModified = ScraperVerifiedBeacons.timestamp;
ProjectEntry.current = true;

ProjectEntry.project = "VerifiedBeacons";
ProjectEntry.LastModified = ScraperVerifiedBeacons.timestamp;
ProjectEntry.current = true;
// For now each object will only have one part.
ProjectEntry.part1 = iPartNum;
ProjectEntry.partc = 0;
ProjectEntry.GridcoinTeamID = -1; //Not used anymore

// For now each object will only have one part.
ProjectEntry.part1 = iPartNum;
ProjectEntry.partc = 0;
ProjectEntry.GridcoinTeamID = -1; //Not used anymore
ProjectEntry.last = 1;

ProjectEntry.last = 1;
manifest->projects.push_back(ProjectEntry);

manifest->projects.push_back(ProjectEntry);
CDataStream part(SER_NETWORK, 1);

CDataStream part(SER_NETWORK, 1);
part << ScraperVerifiedBeacons.mVerifiedMap;

part << ScraperVerifiedBeacons.mVerifiedMap;
LOCK(CSplitBlob::cs_mapParts);
_log(logattribute::INFO, "LOCK", "cs_mapParts");

LOCK(CSplitBlob::cs_mapParts);
_log(logattribute::INFO, "LOCK", "cs_mapParts");
manifest->addPartData(std::move(part));

manifest->addPartData(std::move(part));
iPartNum++;

iPartNum++;
_log(logattribute::INFO, "ENDLOCK", "cs_mapParts");
}

_log(logattribute::INFO, "ENDLOCK", "cs_mapParts");
_log(logattribute::INFO, "ENDLOCK", "cs_VerifiedBeacons");
}

_log(logattribute::INFO, "ENDLOCK", "cs_VerifiedBeacons");
}

for (auto const& entry : StructScraperFileManifest.mScraperFileManifest)
{
auto scraper_cmanifest_include_noncurrent_proj_files =
[]() { LOCK(cs_ScraperGlobals); return SCRAPER_CMANIFEST_INCLUDE_NONCURRENT_PROJ_FILES; };

for (auto const& entry : StructScraperFileManifest.mScraperFileManifest)
{
auto scraper_cmanifest_include_noncurrent_proj_files =
[]() { LOCK(cs_ScraperGlobals); return SCRAPER_CMANIFEST_INCLUDE_NONCURRENT_PROJ_FILES; };
// If SCRAPER_CMANIFEST_INCLUDE_NONCURRENT_PROJ_FILES is false, only include current files to send across the network.
// Also continue (exclude) if it is a non-publishable entry (excludefromcsmanifest is true).
if ((!scraper_cmanifest_include_noncurrent_proj_files() && !entry.second.current) || entry.second.excludefromcsmanifest)
continue;

// If SCRAPER_CMANIFEST_INCLUDE_NONCURRENT_PROJ_FILES is false, only include current files to send across the network.
// Also continue (exclude) if it is a non-publishable entry (excludefromcsmanifest is true).
if ((!scraper_cmanifest_include_noncurrent_proj_files() && !entry.second.current) || entry.second.excludefromcsmanifest)
continue;
fs::path inputfile = entry.first;

fs::path inputfile = entry.first;
//_log(logattribute::INFO, "ScraperSendFileManifestContents", "Input file for CScraperManifest is " + inputfile.string());

//_log(logattribute::INFO, "ScraperSendFileManifestContents", "Input file for CScraperManifest is " + inputfile.string());
fs::path inputfilewpath = pathScraper / inputfile;

fs::path inputfilewpath = pathScraper / inputfile;
// open input file, and associate with CAutoFile
FILE *file = fsbridge::fopen(inputfilewpath, "rb");
CAutoFile filein(file, SER_DISK, CLIENT_VERSION);

// open input file, and associate with CAutoFile
FILE *file = fsbridge::fopen(inputfilewpath, "rb");
CAutoFile filein(file, SER_DISK, CLIENT_VERSION);
if (filein.IsNull())
{
_log(logattribute::ERR, "ScraperSendFileManifestContents", "Failed to open file (" + inputfile.string() + ")");
return false;
}

if (filein.IsNull())
{
_log(logattribute::ERR, "ScraperSendFileManifestContents", "Failed to open file (" + inputfile.string() + ")");
return false;
}
// use file size to size memory buffer
int dataSize = fs::file_size(inputfilewpath);
std::vector<unsigned char> vchData;
vchData.resize(dataSize);

// use file size to size memory buffer
int dataSize = fs::file_size(inputfilewpath);
std::vector<unsigned char> vchData;
vchData.resize(dataSize);
// read data from file
try
{
filein.read((char *)&vchData[0], dataSize);
}
catch (std::exception &e)
{
_log(logattribute::ERR, "ScraperSendFileManifestContents", "Failed to read file (" + inputfile.string() + ")");
return false;
}

// read data from file
try
{
filein.read((char *)&vchData[0], dataSize);
}
catch (std::exception &e)
{
_log(logattribute::ERR, "ScraperSendFileManifestContents", "Failed to read file (" + inputfile.string() + ")");
return false;
}
filein.fclose();

filein.fclose();

CScraperManifest::dentry ProjectEntry;

CScraperManifest::dentry ProjectEntry;
ProjectEntry.project = entry.second.project;
std::string sProject = entry.second.project + "-";

ProjectEntry.project = entry.second.project;
std::string sProject = entry.second.project + "-";
std::string sinputfile = inputfile.string();
std::string suffix = ".csv.gz";

std::string sinputfile = inputfile.string();
std::string suffix = ".csv.gz";
// Remove project-
sinputfile.erase(sinputfile.find(sProject), sProject.length());
// Remove suffix. What is left is the ETag.
ProjectEntry.ETag = sinputfile.erase(sinputfile.find(suffix), suffix.length());

// Remove project-
sinputfile.erase(sinputfile.find(sProject), sProject.length());
// Remove suffix. What is left is the ETag.
ProjectEntry.ETag = sinputfile.erase(sinputfile.find(suffix), suffix.length());
ProjectEntry.LastModified = entry.second.timestamp;

ProjectEntry.LastModified = entry.second.timestamp;
// For now each object will only have one part.
ProjectEntry.part1 = iPartNum;
ProjectEntry.partc = 0;
ProjectEntry.GridcoinTeamID = -1; //Not used anymore

// For now each object will only have one part.
ProjectEntry.part1 = iPartNum;
ProjectEntry.partc = 0;
ProjectEntry.GridcoinTeamID = -1; //Not used anymore
ProjectEntry.current = entry.second.current;

ProjectEntry.current = entry.second.current;
ProjectEntry.last = 1;

ProjectEntry.last = 1;
manifest->projects.push_back(ProjectEntry);

manifest->projects.push_back(ProjectEntry);
CDataStream part(vchData, SER_NETWORK, 1);

CDataStream part(vchData, SER_NETWORK, 1);
manifest->addPartData(std::move(part));

manifest->addPartData(std::move(part));
iPartNum++;
}

iPartNum++;
_log(logattribute::INFO, "ENDLOCK2", "cs_mapParts, cs_manifest");
}

// "Sign" and "send".
Expand Down Expand Up @@ -4447,6 +4457,9 @@ ConvergedManifest::ConvergedManifest()

ConvergedManifest::ConvergedManifest(CScraperManifest_shared_ptr& in)
{
// Make Clang happy.
LOCK(in->cs_manifest);

ConsensusBlock = in->ConsensusBlock;
timestamp = GetAdjustedTime();
bByParts = false;
Expand All @@ -4462,6 +4475,8 @@ ConvergedManifest::ConvergedManifest(CScraperManifest_shared_ptr& in)

bool ConvergedManifest::operator()(const CScraperManifest_shared_ptr& in)
{
LOCK(in->cs_manifest);

ConsensusBlock = in->ConsensusBlock;
timestamp = GetAdjustedTime();
bByParts = false;
Expand Down
Loading

0 comments on commit 3873f1d

Please sign in to comment.