Properly send info between main and dl/parsing threads

This commit is contained in:
Jaidyn Ann 2021-02-26 21:28:12 -06:00
parent eef99bb569
commit 6c63b23491
4 changed files with 117 additions and 54 deletions

View File

@ -8,7 +8,6 @@ Important Features:
Important improvements: Important improvements:
* Stop locking during updates
* Stop memory accumulation * Stop memory accumulation
* A few MB accumulate on every update * A few MB accumulate on every update
* ime it's than .1 MB per feed― but if you have several feeds, it can * ime it's than .1 MB per feed― but if you have several feeds, it can
@ -16,7 +15,6 @@ Important improvements:
and updates every 30 minutes, it got to about 200MB after a week or and updates every 30 minutes, it got to about 200MB after a week or
so. so.
* _Huge_ problem, but I haven't had luck figuring it out yet… * _Huge_ problem, but I haven't had luck figuring it out yet…
* Proper queue list (see Multiple downloads at once?)
* No hardcoded paths * No hardcoded paths
* General input sanitization * General input sanitization
* File error-handling * File error-handling
@ -29,7 +27,6 @@ Important improvements:
Nice Improvements: Nice Improvements:
* 'About' window
* Save window positions * Save window positions
* Main window * Main window
* Subscription-edit window * Subscription-edit window
@ -37,6 +34,8 @@ Nice Improvements:
* Store feeds in kEnqueueFeeds/etc messages as paths? hmm * Store feeds in kEnqueueFeeds/etc messages as paths? hmm
* This way, if the user edits the feed after it is enqueued but before * This way, if the user edits the feed after it is enqueued but before
processing, the changes will be applied. processing, the changes will be applied.
* Move the provisional BList download queue to a BJob + JobQueue system
* This might add some more flexibility
* Multiple downloads at once? * Multiple downloads at once?
* Array of thread_ids for multiple DL threads? * Array of thread_ids for multiple DL threads?
* In which case, it'd probably make sense to only spin them * In which case, it'd probably make sense to only spin them

View File

@ -55,6 +55,7 @@ App::MessageReceived(BMessage* msg)
switch (msg->what) switch (msg->what)
{ {
case kUpdateSubscribed: case kUpdateSubscribed:
case kControllerCheck:
{ {
fFeedController->MessageReceived(msg); fFeedController->MessageReceived(msg);
break; break;

View File

@ -7,6 +7,7 @@
#include <Directory.h> #include <Directory.h>
#include <Message.h> #include <Message.h>
#include <MessageRunner.h>
#include <Notification.h> #include <Notification.h>
#include <iostream> #include <iostream>
@ -19,13 +20,16 @@
FeedController::FeedController() FeedController::FeedController()
: :
fMainThread(find_thread(NULL)),
fDownloadThread(0), fDownloadThread(0),
fParseThread(0) fParseThread(0),
fDownloadQueue(new BList()),
fMessageRunner(new BMessageRunner(be_app, BMessage(kControllerCheck), 50000, -1))
{ {
fDownloadThread = spawn_thread(_DownloadLoop, "here, eat this", fDownloadThread = spawn_thread(_DownloadLoop, "here, eat this",
B_NORMAL_PRIORITY, NULL); B_NORMAL_PRIORITY, &fMainThread);
fParseThread = spawn_thread(_ParseLoop, "oki tnx nomnomnom", fParseThread = spawn_thread(_ParseLoop, "oki tnx nomnomnom",
B_NORMAL_PRIORITY, NULL); B_NORMAL_PRIORITY, &fMainThread);
resume_thread(fDownloadThread); resume_thread(fDownloadThread);
resume_thread(fParseThread); resume_thread(fParseThread);
} }
@ -51,7 +55,7 @@ FeedController::MessageReceived(BMessage* msg)
while (msg->HasData("feeds", B_RAW_TYPE, i)) { while (msg->HasData("feeds", B_RAW_TYPE, i)) {
msg->FindData("feeds", B_RAW_TYPE, i, &data, &size); msg->FindData("feeds", B_RAW_TYPE, i, &data, &size);
send_data(fDownloadThread, msg->what, data, size); _EnqueueFeed((Feed*)data);
i++; i++;
} }
break; break;
@ -60,10 +64,7 @@ FeedController::MessageReceived(BMessage* msg)
{ {
BList subFeeds = SubscribedFeeds(); BList subFeeds = SubscribedFeeds();
for (int i = 0; i < subFeeds.CountItems(); i++) { for (int i = 0; i < subFeeds.CountItems(); i++) {
BMessage getFeed(kEnqueueFeed); _EnqueueFeed((Feed*)subFeeds.ItemAt(i));
getFeed.AddData("feeds", B_RAW_TYPE, subFeeds.ItemAt(i),
sizeof(Feed));
((App*)be_app)->MessageReceived(&getFeed);
} }
break; break;
} }
@ -71,25 +72,10 @@ FeedController::MessageReceived(BMessage* msg)
{ {
break; break;
} }
case kDownloadComplete: case kControllerCheck:
{ {
int i = 0; _ProcessQueueItem();
const void* data; _CheckStatus();
ssize_t size = sizeof(Feed);
while (msg->HasData("feeds", B_RAW_TYPE, i)) {
msg->FindData("feeds", B_RAW_TYPE, i, &data, &size);
if (((Feed*)data)->IsUpdated() == true)
send_data(fParseThread, msg->what, data, size);
else {
BMessage complete(kParseComplete);
complete.AddString("feed_name", ((Feed*)data)->GetTitle());
complete.AddInt32("entry_count", 0);
((App*)be_app)->MessageReceived(&complete);
}
i++;
}
break; break;
} }
default: default:
@ -114,29 +100,96 @@ FeedController::SubscribedFeeds()
} }
int32 void
FeedController::_DownloadLoop(void* ignored) FeedController::_EnqueueFeed(Feed* feed)
{ {
fMessageRunner->SetCount(-1);
fDownloadQueue->AddItem(feed);
}
void
FeedController::_ProcessQueueItem()
{
if (has_data(fDownloadThread) && !fDownloadQueue->IsEmpty()) {
Feed* buffer = (Feed*)(fDownloadQueue->RemoveItem(0));
send_data(fDownloadThread, 0, (void*)buffer, sizeof(Feed));
BMessage downloadInit = BMessage(kDownloadStart);
downloadInit.AddString("feed", buffer->GetTitle());
((App*)be_app)->MessageReceived(&downloadInit);
}
}
void
FeedController::_CheckStatus()
{
thread_id sender;
while (has_data(find_thread(NULL))) {
Feed* feedBuffer = (Feed*)malloc(sizeof(Feed));
int32 code = receive_data(&sender, (void*)feedBuffer, sizeof(Feed));
switch (code)
{
case kDownloadComplete:
{
BMessage complete = BMessage(kDownloadComplete);
complete.AddString("feed_url",
feedBuffer->GetXmlUrl().UrlString());
((App*)be_app)->MessageReceived(&complete);
send_data(fParseThread, 0, (void*)feedBuffer, sizeof(Feed));
break;
}
case kDownloadFail:
{
BMessage failure = BMessage(kDownloadFail);
failure.AddString("feed_url",
feedBuffer->GetXmlUrl().UrlString());
((App*)be_app)->MessageReceived(&failure);
break;
}
case kParseComplete:
{
BMessage complete = BMessage(kParseComplete);
complete.AddString("feed_name", feedBuffer->GetTitle());
complete.AddInt32("entry_count", feedBuffer->GetNewEntries().CountItems());
((App*)be_app)->MessageReceived(&complete);
break;
}
case kParseFail:
{
BMessage failure = BMessage(kParseFail);
failure.AddString("feed_url", feedBuffer->GetXmlUrl().UrlString());
((App*)be_app)->MessageReceived(&failure);
break;
}
}
}
}
int32
FeedController::_DownloadLoop(void* data)
{
thread_id main = *((thread_id*)data);
thread_id sender; thread_id sender;
Feed* feedBuffer = (Feed*)malloc(sizeof(Feed)); Feed* feedBuffer = (Feed*)malloc(sizeof(Feed));
while (receive_data(&sender, (void*)feedBuffer, sizeof(Feed)) != 0) {
while (true) {
int32 code = receive_data(&sender, (void*)feedBuffer, sizeof(Feed));
std::cout << "Downloading feed from " std::cout << "Downloading feed from "
<< feedBuffer->GetXmlUrl().UrlString() << "\n"; << feedBuffer->GetXmlUrl().UrlString() << "\n";
BMessage downloadInit = BMessage(kDownloadStart);
downloadInit.AddString("feed", feedBuffer->GetTitle());
((App*)be_app)->MessageReceived(&downloadInit);
if (feedBuffer->Fetch()) { if (feedBuffer->Fetch()) {
BMessage downloaded = BMessage(kDownloadComplete); send_data(main, kDownloadComplete, (void*)feedBuffer, sizeof(Feed));
downloaded.AddData("feeds", B_RAW_TYPE, feedBuffer, sizeof(Feed));
((App*)be_app)->MessageReceived(&downloaded);
} }
else { else {
BMessage failure = BMessage(kDownloadFail); send_data(main, kDownloadFail, (void*)feedBuffer, sizeof(Feed));
failure.AddString("feed_url", feedBuffer->GetXmlUrl().UrlString());
((App*)be_app)->MessageReceived(&failure);
} }
} }
delete(feedBuffer); delete(feedBuffer);
@ -145,12 +198,15 @@ FeedController::_DownloadLoop(void* ignored)
int32 int32
FeedController::_ParseLoop(void* ignored) FeedController::_ParseLoop(void* data)
{ {
thread_id main = *((thread_id*)data);
thread_id sender; thread_id sender;
Feed* feedBuffer = (Feed*)malloc(sizeof(Feed)); Feed* feedBuffer = (Feed*)malloc(sizeof(Feed));
while (receive_data(&sender, (void*)feedBuffer, sizeof(Feed)) != 0) { while (true) {
int32 code = receive_data(&sender, (void*)feedBuffer, sizeof(Feed));
BList entries; BList entries;
BString feedTitle; BString feedTitle;
BUrl feedUrl = feedBuffer->GetXmlUrl(); BUrl feedUrl = feedBuffer->GetXmlUrl();
@ -181,15 +237,10 @@ FeedController::_ParseLoop(void* ignored)
if (feedBuffer->IsAtom() || feedBuffer->IsRss()) { if (feedBuffer->IsAtom() || feedBuffer->IsRss()) {
BMessage complete = BMessage(kParseComplete); send_data(main, kParseComplete, (void*)feedBuffer, sizeof(Feed));
complete.AddString("feed_name", feedTitle);
complete.AddInt32("entry_count", entries.CountItems());
((App*)be_app)->MessageReceived(&complete);
} }
else { else {
BMessage failure = BMessage(kParseFail); send_data(main, kParseFail, (void*)feedBuffer, sizeof(Feed));
failure.AddString("feed_url", feedUrl.UrlString());
((App*)be_app)->MessageReceived(&failure);
} }
} }

View File

@ -10,6 +10,8 @@
class BList; class BList;
class BMessage; class BMessage;
class BMessageRunner;
class Feed;
enum enum
@ -21,7 +23,8 @@ enum
kDownloadFail = 'fdlf', kDownloadFail = 'fdlf',
kParseComplete = 'fpec', kParseComplete = 'fpec',
kParseFail = 'fpef', kParseFail = 'fpef',
kUpdateSubscribed = 'fups' kUpdateSubscribed = 'fups',
kControllerCheck = 'coch'
}; };
@ -31,14 +34,23 @@ public:
~FeedController(); ~FeedController();
void MessageReceived(BMessage* msg); void MessageReceived(BMessage* msg);
static BList SubscribedFeeds(); static BList SubscribedFeeds();
private: private:
static int32 _DownloadLoop(void* ignored); static int32 _DownloadLoop(void* data);
static int32 _ParseLoop(void* ignored); static int32 _ParseLoop(void* data);
void _EnqueueFeed(Feed* feed);
void _ProcessQueueItem();
void _CheckStatus();
thread_id fMainThread;
thread_id fDownloadThread; thread_id fDownloadThread;
thread_id fParseThread; thread_id fParseThread;
BList* fDownloadQueue;
BMessageRunner* fMessageRunner;
}; };