[Pkg-owncloud-commits] [owncloud-client] 35/333: Fix chunking
Sandro Knauß
hefee-guest at moszumanska.debian.org
Thu Apr 17 23:16:30 UTC 2014
This is an automated email from the git hooks/post-receive script.
hefee-guest pushed a commit to branch master
in repository owncloud-client.
commit 7b6269b4bff982b0231262dcf9d76fd02db6effd
Author: Olivier Goffart <ogoffart at woboq.com>
Date: Thu Feb 13 14:02:05 2014 +0100
Fix chunking
- Do not start them in parallel, start them in sequence instead, as they are quite
and there are already other jobs running in parallel normaly
- Do not load the fill into memory
- Support resuming
---
src/mirall/owncloudpropagator_qnam.cpp | 275 ++++++++++++++-------------------
src/mirall/owncloudpropagator_qnam.h | 43 +-----
2 files changed, 126 insertions(+), 192 deletions(-)
diff --git a/src/mirall/owncloudpropagator_qnam.cpp b/src/mirall/owncloudpropagator_qnam.cpp
index 0ba8d85..0c8e082 100644
--- a/src/mirall/owncloudpropagator_qnam.cpp
+++ b/src/mirall/owncloudpropagator_qnam.cpp
@@ -17,7 +17,9 @@
#include "account.h"
#include "syncjournaldb.h"
#include "syncjournalfilerecord.h"
+#include "utility.h"
#include <QNetworkAccessManager>
+#include <cmath>
namespace Mirall {
@@ -39,191 +41,113 @@ void PUTFileJob::start() {
}
-#define CHUNKING_SIZE (10*1024*1024)
+static const int CHUNKING_SIZE = (10*1024);
-int ChunkedPUTFileJob::computeChunks()
+void PropagateUploadFileQNAM::start()
{
- qint64 size = _device->size();
- qint64 blockSize = CHUNKING_SIZE;
- qint64 overall;
- qint64 numBlocks = size / blockSize;
-
- /* there migth be a remainder. */
- qint64 remainder = size - numBlocks * blockSize;
- qsrand(QTime::currentTime().msec());
-
- _transferId = qrand(); // FIXME: Sufficient?
-
- /* if there is a remainder, add one block */
- if( remainder > 0 ) {
- numBlocks++;
- }
+ if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
+ return;
- /* The file has size 0. There still needs to be at least one block. */
- if( size == 0 ) {
- numBlocks = 1;
- blockSize = 0;
+ _file = new QFile(_propagator->_localDir + _item._file, this);
+ if (!_file->open(QIODevice::ReadOnly)) {
+ done(SyncFileItem::NormalError, _file->errorString());
+ delete _file;
+ return;
}
- qDebug() << "Chunks: " << numBlocks << "a" << blockSize << ", remainder " << remainder;
+ quint64 fileSize = _file->size();
+ _chunkCount = std::ceil(fileSize/double(CHUNKING_SIZE));
+ _startChunk = 0;
+ _transferId = qrand() ^ _item._modtime ^ (_item._size << 16);
+ const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item._file);
- for( qint64 cnt=0; cnt < numBlocks; cnt++ ) {
- /* allocate a block struct and fill */
- ChunkBlock block;
-
- block._sequenceNo = cnt;
- block._start = cnt * blockSize;
- block._size = blockSize;
- block._state = ChunkBlock::NotTransfered;
-
- _device->seek(block._start);
- block._buffer = new QBuffer;
- block._buffer->setData(_device->read(block._size));
-
- /* consider the remainder if we're already at the end */
- if( cnt == numBlocks-1 && remainder > 0 ) {
- block._size = remainder;
- }
- overall += block._size;
-
- _chunks.append(block);
- qDebug() << " computed chunk " << cnt << "from" << block._start << block._size << "bytes";
+ if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item._modtime ) {
+ _startChunk = progressInfo._chunk;
+ _transferId = progressInfo._transferid;
+ qDebug() << Q_FUNC_INFO << _item._file << ": Resuming from chunk " << _startChunk;
}
- return numBlocks;
-}
-
-void ChunkedPUTFileJob::start()
-{
- int blockCount = _chunks.size();
- _transferedChunks = 0;
+ _currentChunk = 0;
- foreach( ChunkBlock block, _chunks ) {
- QMap<QByteArray, QByteArray> headers;
- headers["OC-Total-Length"] = QByteArray::number(qint64(block._size));
- headers["Content-Type"] = "application/octet-stream";
- headers["X-OC-Mtime"] = QByteArray::number(qint64(_modtime));
- headers["OC-Chunked"] = QByteArray::number(1);
-
- if (!_item._etag.isEmpty() && _item._etag != "empty_etag") {
- // We add quotes because the owncloud server always add quotes around the etag, and
- // csync_owncloud.c's owncloud_file_id always strip the quotes.
- headers["If-Match"] = '"' + _item._etag + '"';
- }
-
- PUTFileJob *job;
- QString url = _item._file;
- url += QString("-chunking-%1-%2-%3").arg(_transferId).arg(blockCount).arg(block._sequenceNo);
+ _propagator->_activeJobs++;
+ emit progress(Progress::StartUpload, _item, 0, fileSize);
+ emitReady();
+ this->startNextChunk();
+}
- job = new PUTFileJob(account(), url, block._buffer, headers);
+struct ChunkDevice : QIODevice {
+ QIODevice *_file;
+ qint64 _read;
- _chunkUploadJobs.insert(job, block);
- connect(job, SIGNAL(finishedSignal()), this, SLOT(slotChunkFinished()));
- job->start();
+ ChunkDevice(QIODevice *file, qint64 start)
+ : QIODevice(file), _file(file), _read(0) {
+ _file->seek(start);
}
-}
-void ChunkedPUTFileJob::slotChunkFinished()
-{
- PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
- Q_ASSERT(job);
-
- QNetworkReply::NetworkError err = job->reply()->error();
- if (err != QNetworkReply::NoError) {
- Q_ASSERT(_chunkUploadJobs.contains(job));
- ChunkBlock block = _chunkUploadJobs[job];
- _chunkUploadJobs[job]._state = ChunkBlock::TransferSuccess;
- delete block._buffer;
+ virtual qint64 writeData(const char* , qint64 ) {
+ Q_ASSERT(!"write to read only device");
+ return 0;
}
- _transferedChunks++;
-
- if( _transferedChunks >= _chunkUploadJobs.size() ) {
- bool completed = true;
- // FIXME: Build
- foreach( ChunkBlock block, _chunkUploadJobs ) {
- if( block._state != ChunkBlock::TransferSuccess ) {
- completed = false;
- break;
- }
- }
- if( completed ) {
- emit finishedSignal();
- }
+
+ virtual qint64 readData(char* data, qint64 maxlen) {
+ maxlen = qMin(maxlen, CHUNKING_SIZE - _read);
+ if (maxlen == 0)
+ return 0;
+ qint64 ret = _file->read(data, maxlen);
+ _read += ret;
+ return ret;
}
-}
+ virtual bool atEnd() const {
+ return _read >= CHUNKING_SIZE || _file->atEnd();
+ }
+};
-void PropagateUploadFileQNAM::start()
+void PropagateUploadFileQNAM::startNextChunk()
{
- if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
- return;
-
- QFile *file = new QFile(_propagator->_localDir + _item._file);
- if (!file->open(QIODevice::ReadOnly)) {
- done(SyncFileItem::NormalError, file->errorString());
- delete file;
- return;
+ /*
+ * // If the source file has changed during upload, it is detected and the
+ * // variable _previousFileSize is set accordingly. The propagator waits a
+ * // couple of seconds and retries.
+ * if(_previousFileSize > 0) {
+ * qDebug() << "File size changed underway: " << trans->stat_size - _previousFileSize;
+ * // Report the change of the overall transmission size to the propagator
+ * _propagator->overallTransmissionSizeChanged(qint64(trans->stat_size - _previousFileSize));
+ * // update the item's values to the current from trans. hbf_splitlist does a stat
+ * _item._size = trans->stat_size;
+ * _item._modtime = trans->modtime;
+ *
+ */
+ quint64 fileSize = _item._size;
+ QMap<QByteArray, QByteArray> headers;
+ headers["OC-Total-Length"] = QByteArray::number(fileSize);
+ headers["Content-Type"] = "application/octet-stream";
+ headers["X-OC-Mtime"] = QByteArray::number(qint64(_item._modtime));
+ if (!_item._etag.isEmpty() && _item._etag != "empty_etag") {
+ // We add quotes because the owncloud server always add quotes around the etag, and
+ // csync_owncloud.c's owncloud_file_id always strip the quotes.
+ headers["If-Match"] = '"' + _item._etag + '"';
}
- //TODO
- //const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item._file);
- QMap<QByteArray, QByteArray> headers;
- qint64 fileSize = file->size();
-
- AbstractNetworkJob *job;
- if( fileSize < CHUNKING_SIZE ) {
- headers["OC-Total-Length"] = QByteArray::number(fileSize);
- headers["Content-Type"] = "application/octet-stream";
- headers["X-OC-Mtime"] = QByteArray::number(qint64(_item._modtime));
-
- if (!_item._etag.isEmpty() && _item._etag != "empty_etag") {
- // We add quotes because the owncloud server always add quotes around the etag, and
- // csync_owncloud.c's owncloud_file_id always strip the quotes.
- headers["If-Match"] = '"' + _item._etag + '"';
- }
- job = new PUTFileJob(AccountManager::instance()->account(), _item._file, file, headers);
+ QString path = _item._file;
+ QIODevice *device;
+ if (_chunkCount > 1) {
+ int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
+ path += QString("-chunking-%1-%2-%3").arg(uint(_transferId)).arg(_chunkCount).arg(sendingChunk);
+ headers["OC-Chunked"] = "1";
+ device = new ChunkDevice(_file, CHUNKING_SIZE * sendingChunk);
} else {
- job = new ChunkedPUTFileJob(AccountManager::instance()->account(), _item._file, _item, file);
+ device = _file;
}
- connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
-
- // if( transfer->block_cnt > 1 ) {
- // ne_add_request_header(req, "OC-Chunked", "1");
- // }
-
-
- /*
- // If the source file has changed during upload, it is detected and the
- // variable _previousFileSize is set accordingly. The propagator waits a
- // couple of seconds and retries.
- if(_previousFileSize > 0) {
- qDebug() << "File size changed underway: " << trans->stat_size - _previousFileSize;
- // Report the change of the overall transmission size to the propagator
- _propagator->overallTransmissionSizeChanged(qint64(trans->stat_size - _previousFileSize));
- // update the item's values to the current from trans. hbf_splitlist does a stat
- _item._size = trans->stat_size;
- _item._modtime = trans->modtime;
-
- }
- if (progressInfo._valid) {
- if (Utility::qDateTimeToTime_t(progressInfo._modtime) == _item._modtime) {
- trans->start_id = progressInfo._chunk;
- trans->transfer_id = progressInfo._transferid;
- }
- }
- */
- emit progress(Progress::StartUpload, _item, 0, file->size());
- job->start();
-
- _propagator->_activeJobs++;
- emitReady();
+ _job = new PUTFileJob(AccountManager::instance()->account(), path, device, headers);
+ connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
+ _job->start();
}
void PropagateUploadFileQNAM::slotPutFinished()
{
- _propagator->_activeJobs--;
PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
Q_ASSERT(job);
@@ -254,10 +178,46 @@ void PropagateUploadFileQNAM::slotPutFinished()
// // FIXME: find out the error class.
// _item._httpErrorCode = hbf_fail_http_code(trans.data());
_item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
+ _propagator->_activeJobs--;
done(SyncFileItem::NormalError, job->reply()->errorString());
return;
}
+ bool finished = job->reply()->hasRawHeader("ETag");
+ if (!finished) {
+ // Proceed to next chunk.
+ _currentChunk++;
+ if (_currentChunk >= _chunkCount) {
+ _propagator->_activeJobs--;
+ done(SyncFileItem::NormalError, tr("The server did not aknoledge the last chunk. (No e-tag were present)"));
+ return;
+ }
+
+ SyncJournalDb::UploadInfo pi;
+ pi._valid = true;
+ pi._chunk = _currentChunk; // next chunk to start with
+ pi._transferid = _transferId;
+ pi._modtime = Utility::qDateTimeFromTime_t(_item._modtime);
+ _propagator->_journal->setUploadInfo(_item._file, pi);
+ _propagator->_journal->commit("Upload info");
+ startNextChunk();
+ return;
+ }
+
+ _propagator->_activeJobs--;
+
+ // FIXME: hack to check that the server did not accept the first chunk as a file
+// if (transfer->block_cnt > 1 && state == HBF_SUCCESS && cnt == 0) {
+// /* Success on the first chunk is suspicious.
+// * It could happen that the server did not support chunking */
+// int rc = ne_delete(session, transfer_url);
+// if (rc == NE_OK && _hbf_http_error_code(session) == 204) {
+// /* If delete suceeded, it means some proxy strips the OC_CHUNKING header
+// * start again without chunking: */
+// free( transfer_url );
+// return _hbf_transfer_no_chunk(session, transfer, verb);
+// }
+// }
// the file id should only be empty for new files up- or downloaded
QString fid = QString::fromUtf8(job->reply()->rawHeader("OC-FileID"));
@@ -270,6 +230,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
_item._etag = parseEtag(job->reply()->rawHeader("ETag"));
+
if (job->reply()->rawHeader("X-OC-MTime") != "accepted") {
//FIXME
// updateMTimeAndETag(uri.data(), _item._modtime);
diff --git a/src/mirall/owncloudpropagator_qnam.h b/src/mirall/owncloudpropagator_qnam.h
index 6634f8a..23f4fd8 100644
--- a/src/mirall/owncloudpropagator_qnam.h
+++ b/src/mirall/owncloudpropagator_qnam.h
@@ -70,47 +70,20 @@ signals:
class PropagateUploadFileQNAM : public PropagateItemJob {
Q_OBJECT
QPointer<PUTFileJob> _job;
+ QFile *_file;
+ int _startChunk;
+ int _currentChunk;
+ int _chunkCount;
+ int _transferId;
public:
- PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item) : PropagateItemJob(propagator, item) {}
+ PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item)
+ : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0) {}
void start();
private slots:
void slotPutFinished();
void abort();
-};
-
-class ChunkedPUTFileJob : public AbstractNetworkJob {
- Q_OBJECT
-
-private:
- QIODevice* _device;
- QMap<QByteArray, QByteArray> _headers;
- QMap <PUTFileJob*, ChunkBlock>_chunkUploadJobs;
- QVector<ChunkBlock> _chunks;
- time_t _modtime;
- SyncFileItem _item;
- int _transferId;
- int _transferedChunks;
-
-public:
- explicit ChunkedPUTFileJob(Account* account, const QString& path, SyncFileItem& item, QIODevice *device,
- QObject* parent = 0) :
- AbstractNetworkJob(account, path, parent), _device(device), _item(item)
- {}
-
- virtual void start();
-
- virtual void finished() {
- emit finishedSignal();
- }
-
- int computeChunks();
+ void startNextChunk();
-private slots:
- void slotChunkFinished();
-
-signals:
- void finishedSignal();
};
-
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-owncloud/owncloud-client.git
More information about the Pkg-owncloud-commits
mailing list