[Pkg-owncloud-commits] [owncloud-client] 35/333: Fix chunking

Sandro Knauß hefee-guest at moszumanska.debian.org
Thu Apr 17 23:16:30 UTC 2014


This is an automated email from the git hooks/post-receive script.

hefee-guest pushed a commit to branch master
in repository owncloud-client.

commit 7b6269b4bff982b0231262dcf9d76fd02db6effd
Author: Olivier Goffart <ogoffart at woboq.com>
Date:   Thu Feb 13 14:02:05 2014 +0100

    Fix chunking
    
    - Do not start them in parallel, start them in sequence instead, as they are quite
      and there are already other jobs running in parallel normaly
    - Do not load the fill into memory
    - Support resuming
---
 src/mirall/owncloudpropagator_qnam.cpp | 275 ++++++++++++++-------------------
 src/mirall/owncloudpropagator_qnam.h   |  43 +-----
 2 files changed, 126 insertions(+), 192 deletions(-)

diff --git a/src/mirall/owncloudpropagator_qnam.cpp b/src/mirall/owncloudpropagator_qnam.cpp
index 0ba8d85..0c8e082 100644
--- a/src/mirall/owncloudpropagator_qnam.cpp
+++ b/src/mirall/owncloudpropagator_qnam.cpp
@@ -17,7 +17,9 @@
 #include "account.h"
 #include "syncjournaldb.h"
 #include "syncjournalfilerecord.h"
+#include "utility.h"
 #include <QNetworkAccessManager>
+#include <cmath>
 
 namespace Mirall {
 
@@ -39,191 +41,113 @@ void PUTFileJob::start() {
 
 }
 
-#define CHUNKING_SIZE (10*1024*1024)
+static const int CHUNKING_SIZE = (10*1024);
 
-int ChunkedPUTFileJob::computeChunks()
+void PropagateUploadFileQNAM::start()
 {
-    qint64 size = _device->size();
-    qint64 blockSize = CHUNKING_SIZE;
-    qint64 overall;
-    qint64 numBlocks = size / blockSize;
-
-    /* there migth be a remainder. */
-    qint64 remainder = size - numBlocks * blockSize;
-    qsrand(QTime::currentTime().msec());
-
-    _transferId = qrand(); // FIXME: Sufficient?
-
-    /* if there is a remainder, add one block */
-    if( remainder > 0 ) {
-        numBlocks++;
-    }
+    if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
+        return;
 
-    /* The file has size 0. There still needs to be at least one block. */
-    if( size == 0 ) {
-      numBlocks = 1;
-      blockSize   = 0;
+    _file = new QFile(_propagator->_localDir + _item._file, this);
+    if (!_file->open(QIODevice::ReadOnly)) {
+        done(SyncFileItem::NormalError, _file->errorString());
+        delete _file;
+        return;
     }
 
-    qDebug() << "Chunks: " << numBlocks << "a" << blockSize << ", remainder " << remainder;
+    quint64 fileSize = _file->size();
+    _chunkCount = std::ceil(fileSize/double(CHUNKING_SIZE));
+    _startChunk = 0;
+    _transferId = qrand() ^ _item._modtime ^ (_item._size << 16);
 
+    const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item._file);
 
-    for( qint64 cnt=0; cnt < numBlocks; cnt++ ) {
-        /* allocate a block struct and fill */
-        ChunkBlock block;
-
-        block._sequenceNo = cnt;
-        block._start = cnt * blockSize;
-        block._size  = blockSize;
-        block._state = ChunkBlock::NotTransfered;
-
-        _device->seek(block._start);
-        block._buffer = new QBuffer;
-        block._buffer->setData(_device->read(block._size));
-
-        /* consider the remainder if we're already at the end */
-        if( cnt == numBlocks-1 && remainder > 0 ) {
-            block._size = remainder;
-        }
-        overall += block._size;
-
-        _chunks.append(block);
-        qDebug() << "  computed chunk " << cnt << "from" << block._start << block._size << "bytes";
+    if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item._modtime ) {
+        _startChunk = progressInfo._chunk;
+        _transferId = progressInfo._transferid;
+        qDebug() << Q_FUNC_INFO << _item._file << ": Resuming from chunk " << _startChunk;
     }
-    return numBlocks;
-}
-
-void ChunkedPUTFileJob::start()
-{
-    int blockCount = _chunks.size();
 
-    _transferedChunks = 0;
+    _currentChunk = 0;
 
-    foreach( ChunkBlock block, _chunks ) {
-        QMap<QByteArray, QByteArray> headers;
-        headers["OC-Total-Length"] = QByteArray::number(qint64(block._size));
-        headers["Content-Type"] = "application/octet-stream";
-        headers["X-OC-Mtime"] = QByteArray::number(qint64(_modtime));
-        headers["OC-Chunked"] = QByteArray::number(1);
-
-        if (!_item._etag.isEmpty() && _item._etag != "empty_etag") {
-            // We add quotes because the owncloud server always add quotes around the etag, and
-            //  csync_owncloud.c's owncloud_file_id always strip the quotes.
-            headers["If-Match"] = '"' + _item._etag + '"';
-        }
-
-        PUTFileJob *job;
-        QString url = _item._file;
-        url += QString("-chunking-%1-%2-%3").arg(_transferId).arg(blockCount).arg(block._sequenceNo);
+    _propagator->_activeJobs++;
+    emit progress(Progress::StartUpload, _item, 0, fileSize);
+    emitReady();
+    this->startNextChunk();
+}
 
-        job = new PUTFileJob(account(), url, block._buffer, headers);
+struct ChunkDevice : QIODevice {
+    QIODevice *_file;
+    qint64 _read;
 
-        _chunkUploadJobs.insert(job, block);
-        connect(job, SIGNAL(finishedSignal()), this, SLOT(slotChunkFinished()));
-        job->start();
+    ChunkDevice(QIODevice *file,  qint64 start)
+            : QIODevice(file), _file(file), _read(0) {
+        _file->seek(start);
     }
-}
 
-void ChunkedPUTFileJob::slotChunkFinished()
-{
-    PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
-    Q_ASSERT(job);
-
-    QNetworkReply::NetworkError err = job->reply()->error();
-    if (err != QNetworkReply::NoError) {
-        Q_ASSERT(_chunkUploadJobs.contains(job));
-        ChunkBlock block = _chunkUploadJobs[job];
-        _chunkUploadJobs[job]._state = ChunkBlock::TransferSuccess;
-        delete block._buffer;
+    virtual qint64 writeData(const char* , qint64 ) {
+        Q_ASSERT(!"write to read only device");
+        return 0;
     }
-    _transferedChunks++;
-
-    if( _transferedChunks >= _chunkUploadJobs.size() ) {
-        bool completed = true;
-        // FIXME: Build
-        foreach( ChunkBlock block, _chunkUploadJobs ) {
-            if( block._state != ChunkBlock::TransferSuccess ) {
-                completed = false;
-                break;
-            }
-        }
-        if( completed ) {
-            emit finishedSignal();
-        }
+
+    virtual qint64 readData(char* data, qint64 maxlen) {
+        maxlen = qMin(maxlen, CHUNKING_SIZE - _read);
+        if (maxlen == 0)
+            return 0;
+        qint64 ret = _file->read(data, maxlen);
+        _read += ret;
+        return ret;
     }
 
-}
+    virtual bool atEnd() const {
+        return  _read >= CHUNKING_SIZE || _file->atEnd();
+    }
+};
 
-void PropagateUploadFileQNAM::start()
+void PropagateUploadFileQNAM::startNextChunk()
 {
-    if (_propagator->_abortRequested.fetchAndAddRelaxed(0))
-        return;
-
-    QFile *file = new QFile(_propagator->_localDir + _item._file);
-    if (!file->open(QIODevice::ReadOnly)) {
-        done(SyncFileItem::NormalError, file->errorString());
-        delete file;
-        return;
+    /*
+     *        // If the source file has changed during upload, it is detected and the
+     *        // variable _previousFileSize is set accordingly. The propagator waits a
+     *        // couple of seconds and retries.
+     *        if(_previousFileSize > 0) {
+     *            qDebug() << "File size changed underway: " << trans->stat_size - _previousFileSize;
+     *            // Report the change of the overall transmission size to the propagator
+     *            _propagator->overallTransmissionSizeChanged(qint64(trans->stat_size - _previousFileSize));
+     *            // update the item's values to the current from trans. hbf_splitlist does a stat
+     *            _item._size = trans->stat_size;
+     *            _item._modtime = trans->modtime;
+     *
+     */
+    quint64 fileSize = _item._size;
+    QMap<QByteArray, QByteArray> headers;
+    headers["OC-Total-Length"] = QByteArray::number(fileSize);
+    headers["Content-Type"] = "application/octet-stream";
+    headers["X-OC-Mtime"] = QByteArray::number(qint64(_item._modtime));
+    if (!_item._etag.isEmpty() && _item._etag != "empty_etag") {
+        // We add quotes because the owncloud server always add quotes around the etag, and
+        //  csync_owncloud.c's owncloud_file_id always strip the quotes.
+        headers["If-Match"] = '"' + _item._etag + '"';
     }
 
-    //TODO
-    //const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item._file);
-    QMap<QByteArray, QByteArray> headers;
-    qint64 fileSize = file->size();
-
-    AbstractNetworkJob *job;
-    if( fileSize < CHUNKING_SIZE ) {
-        headers["OC-Total-Length"] = QByteArray::number(fileSize);
-        headers["Content-Type"] = "application/octet-stream";
-        headers["X-OC-Mtime"] = QByteArray::number(qint64(_item._modtime));
-
-        if (!_item._etag.isEmpty() && _item._etag != "empty_etag") {
-            // We add quotes because the owncloud server always add quotes around the etag, and
-            //  csync_owncloud.c's owncloud_file_id always strip the quotes.
-            headers["If-Match"] = '"' + _item._etag + '"';
-        }
-        job = new PUTFileJob(AccountManager::instance()->account(), _item._file, file, headers);
+    QString path = _item._file;
+    QIODevice *device;
+    if (_chunkCount > 1) {
+        int sendingChunk = (_currentChunk + _startChunk) % _chunkCount;
+        path +=  QString("-chunking-%1-%2-%3").arg(uint(_transferId)).arg(_chunkCount).arg(sendingChunk);
+        headers["OC-Chunked"] = "1";
+        device = new ChunkDevice(_file, CHUNKING_SIZE * sendingChunk);
     } else {
-        job = new ChunkedPUTFileJob(AccountManager::instance()->account(), _item._file, _item, file);
+        device = _file;
     }
-    connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
-
-    //     if( transfer->block_cnt > 1 ) {
-    //         ne_add_request_header(req, "OC-Chunked", "1");
-    //     }
-
-
-        /*
-        // If the source file has changed during upload, it is detected and the
-        // variable _previousFileSize is set accordingly. The propagator waits a
-        // couple of seconds and retries.
-        if(_previousFileSize > 0) {
-            qDebug() << "File size changed underway: " << trans->stat_size - _previousFileSize;
-            // Report the change of the overall transmission size to the propagator
-            _propagator->overallTransmissionSizeChanged(qint64(trans->stat_size - _previousFileSize));
-            // update the item's values to the current from trans. hbf_splitlist does a stat
-            _item._size = trans->stat_size;
-            _item._modtime = trans->modtime;
-
-        }
-        if (progressInfo._valid) {
-            if (Utility::qDateTimeToTime_t(progressInfo._modtime) == _item._modtime) {
-                trans->start_id = progressInfo._chunk;
-                trans->transfer_id = progressInfo._transferid;
-            }
-        }
-        */
 
-    emit progress(Progress::StartUpload, _item, 0, file->size());
-    job->start();
-
-    _propagator->_activeJobs++;
-    emitReady();
+    _job = new PUTFileJob(AccountManager::instance()->account(), path, device, headers);
+    connect(_job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished()));
+    _job->start();
 }
 
 void PropagateUploadFileQNAM::slotPutFinished()
 {
-    _propagator->_activeJobs--;
     PUTFileJob *job = qobject_cast<PUTFileJob *>(sender());
     Q_ASSERT(job);
 
@@ -254,10 +178,46 @@ void PropagateUploadFileQNAM::slotPutFinished()
 //                 // FIXME: find out the error class.
 //                 _item._httpErrorCode = hbf_fail_http_code(trans.data());
         _item._httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
+        _propagator->_activeJobs--;
         done(SyncFileItem::NormalError, job->reply()->errorString());
         return;
     }
 
+    bool finished = job->reply()->hasRawHeader("ETag");
+    if (!finished) {
+        // Proceed to next chunk.
+        _currentChunk++;
+        if (_currentChunk >= _chunkCount) {
+            _propagator->_activeJobs--;
+            done(SyncFileItem::NormalError, tr("The server did not aknoledge the last chunk. (No e-tag were present)"));
+            return;
+        }
+
+        SyncJournalDb::UploadInfo pi;
+        pi._valid = true;
+        pi._chunk = _currentChunk; // next chunk to start with
+        pi._transferid = _transferId;
+        pi._modtime =  Utility::qDateTimeFromTime_t(_item._modtime);
+        _propagator->_journal->setUploadInfo(_item._file, pi);
+        _propagator->_journal->commit("Upload info");
+        startNextChunk();
+        return;
+    }
+
+    _propagator->_activeJobs--;
+
+    // FIXME:  hack to check that the server did not accept the first chunk as a file
+//     if (transfer->block_cnt > 1 && state == HBF_SUCCESS && cnt == 0) {
+//         /* Success on the first chunk is suspicious.
+//          *      It could happen that the server did not support chunking */
+//         int rc = ne_delete(session, transfer_url);
+//         if (rc == NE_OK && _hbf_http_error_code(session) == 204) {
+//             /* If delete suceeded, it means some proxy strips the OC_CHUNKING header
+//              *          start again without chunking: */
+//             free( transfer_url );
+//             return _hbf_transfer_no_chunk(session, transfer, verb);
+//         }
+//     }
 
     // the file id should only be empty for new files up- or downloaded
     QString fid = QString::fromUtf8(job->reply()->rawHeader("OC-FileID"));
@@ -270,6 +230,7 @@ void PropagateUploadFileQNAM::slotPutFinished()
 
     _item._etag = parseEtag(job->reply()->rawHeader("ETag"));
 
+
     if (job->reply()->rawHeader("X-OC-MTime") != "accepted") {
         //FIXME
 //             updateMTimeAndETag(uri.data(), _item._modtime);
diff --git a/src/mirall/owncloudpropagator_qnam.h b/src/mirall/owncloudpropagator_qnam.h
index 6634f8a..23f4fd8 100644
--- a/src/mirall/owncloudpropagator_qnam.h
+++ b/src/mirall/owncloudpropagator_qnam.h
@@ -70,47 +70,20 @@ signals:
 class PropagateUploadFileQNAM : public PropagateItemJob {
     Q_OBJECT
     QPointer<PUTFileJob> _job;
+    QFile *_file;
+    int _startChunk;
+    int _currentChunk;
+    int _chunkCount;
+    int _transferId;
 public:
-    PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item)  : PropagateItemJob(propagator, item) {}
+    PropagateUploadFileQNAM(OwncloudPropagator* propagator,const SyncFileItem& item)
+        : PropagateItemJob(propagator, item), _startChunk(0), _currentChunk(0), _chunkCount(0), _transferId(0) {}
     void start();
 private slots:
     void slotPutFinished();
     void abort();
-};
-
-class ChunkedPUTFileJob : public AbstractNetworkJob {
-    Q_OBJECT
-
-private:
-    QIODevice* _device;
-    QMap<QByteArray, QByteArray> _headers;
-    QMap <PUTFileJob*, ChunkBlock>_chunkUploadJobs;
-    QVector<ChunkBlock> _chunks;
-    time_t _modtime;
-    SyncFileItem _item;
-    int _transferId;
-    int _transferedChunks;
-
-public:
-    explicit ChunkedPUTFileJob(Account* account, const QString& path, SyncFileItem& item, QIODevice *device,
-                               QObject* parent = 0) :
-        AbstractNetworkJob(account, path, parent), _device(device), _item(item)
-    {}
-
-    virtual void start();
-
-    virtual void finished() {
-        emit finishedSignal();
-    }
-
-    int computeChunks();
+    void startNextChunk();
 
-private slots:
-    void slotChunkFinished();
-
-signals:
-    void finishedSignal();
 };
 
-
 }

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-owncloud/owncloud-client.git



More information about the Pkg-owncloud-commits mailing list