static uint64_t do_io(struct thread_data *td)
{
    uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
    unsigned int i;
    int ret = 0;
    uint64_t bytes_issued = 0;

    if (in_ramp_time(td))
        td_set_runstate(td, TD_RAMP);
    else
        td_set_runstate(td, TD_RUNNING);

    while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
        (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) ||
        td->o.time_based) {
        struct timeval comp_time;
        int min_evts = 0;
        struct io_u *io_u;
        int ret2, full;
        enum fio_ddir ddir;

        check_update_rusage(td);

        if (td->terminate || td->done)
            break;

        update_tv_cache(td);

        if (runtime_exceeded(td, &td->tv_cache)) {
            __update_tv_cache(td);
            if (runtime_exceeded(td, &td->tv_cache)) {
                td->terminate = 1;
                break;
            }
        }

        if (flow_threshold_exceeded(td))
            continue;

        if (bytes_issued >= (uint64_t) td->o.size)
            break;

        io_u = get_io_u(td);
        if (!io_u)
            break;

        ddir = io_u->ddir;

        /*
         * Add verification end_io handler if:
         *    - Asked to verify (!td_rw(td))
         *    - Or the io_u is from our verify list (mixed write/ver)
         */
        if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
            ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
            if (td->o.verify_async)
                io_u->end_io = verify_io_u_async;
            else
                io_u->end_io = verify_io_u;
            td_set_runstate(td, TD_VERIFYING);
        } else if (in_ramp_time(td))
            td_set_runstate(td, TD_RAMP);
        else
            td_set_runstate(td, TD_RUNNING);

        ret = td_io_queue(td, io_u);
        switch (ret) {
        case FIO_Q_COMPLETED:
            if (io_u->error) {
                ret = -io_u->error;
                clear_io_u(td, io_u);
            } else if (io_u->resid) {
                int bytes = io_u->xfer_buflen - io_u->resid;
                struct fio_file *f = io_u->file;

                bytes_issued += bytes;
                /*
                 * zero read, fail
                 */
                if (!bytes) {
                    td_verror(td, EIO, "full resid");
                    put_io_u(td, io_u);
                    break;
                }

                io_u->xfer_buflen = io_u->resid;
                io_u->xfer_buf += bytes;
                io_u->offset += bytes;

                if (ddir_rw(io_u->ddir))
                    td->ts.short_io_u[io_u->ddir]++;

                if (io_u->offset == f->real_file_size)
                    goto sync_done;

                requeue_io_u(td, &io_u);
            } else {
sync_done:
                if (__should_check_rate(td, DDIR_READ) ||
                    __should_check_rate(td, DDIR_WRITE) ||
                    __should_check_rate(td, DDIR_TRIM))
                    fio_gettime(&comp_time, NULL);

                ret = io_u_sync_complete(td, io_u, bytes_done);
                if (ret < 0)
                    break;
                bytes_issued += io_u->xfer_buflen;
            }
            break;
        case FIO_Q_QUEUED:
            /*
             * if the engine doesn't have a commit hook,
             * the io_u is really queued. if it does have such
             * a hook, it has to call io_u_queued() itself.
             */
            if (td->io_ops->commit == NULL)
                io_u_queued(td, io_u);
            bytes_issued += io_u->xfer_buflen;
            break;
        case FIO_Q_BUSY:
            requeue_io_u(td, &io_u);
            ret2 = td_io_commit(td);
            if (ret2 < 0)
                ret = ret2;
            break;
        default:
            assert(ret < 0);
            put_io_u(td, io_u);
            break;
        }

        if (break_on_this_error(td, ddir, &ret))
            break;

        /*
         * See if we need to complete some commands. Note that we
         * can get BUSY even without IO queued, if the system is
         * resource starved.
         */
        full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
        if (full || !td->o.iodepth_batch_complete) {
            min_evts = min(td->o.iodepth_batch_complete,
                    td->cur_depth);
            /*
             * if the queue is full, we MUST reap at least 1 event
             */
            if (full && !min_evts)
                min_evts = 1;

            if (__should_check_rate(td, DDIR_READ) ||
                __should_check_rate(td, DDIR_WRITE) ||
                __should_check_rate(td, DDIR_TRIM))
                fio_gettime(&comp_time, NULL);

            do {
                ret = io_u_queued_complete(td, min_evts, bytes_done);
                if (ret < 0)
                    break;

            } while (full && (td->cur_depth > td->o.iodepth_low));
        }

        if (ret < 0)
            break;
        if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
            continue;

        if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
            if (check_min_rate(td, &comp_time, bytes_done)) {
                if (exitall_on_terminate)
                    fio_terminate_threads(td->groupid);
                td_verror(td, EIO, "check_min_rate");
                break;
            }
        }

        if (td->o.thinktime) {
            unsigned long long b;

            b = ddir_rw_sum(td->io_blocks);
            if (!(b % td->o.thinktime_blocks)) {
                int left;

                io_u_quiesce(td);

                if (td->o.thinktime_spin)
                    usec_spin(td->o.thinktime_spin);

                left = td->o.thinktime - td->o.thinktime_spin;
                if (left)
                    usec_sleep(td, left);
            }
        }
    }

    check_update_rusage(td);

    if (td->trim_entries)
        log_err("fio: %lu trim entries leaked?\n", td->trim_entries);

    if (td->o.fill_device && td->error == ENOSPC) {
        td->error = 0;
        td->terminate = 1;
    }
    if (!td->error) {
        struct fio_file *f;

        i = td->cur_depth;
        if (i) {
            ret = io_u_queued_complete(td, i, bytes_done);
            if (td->o.fill_device && td->error == ENOSPC)
                td->error = 0;
        }

        if (should_fsync(td) && td->o.end_fsync) {
            td_set_runstate(td, TD_FSYNCING);

            for_each_file(td, f, i) {
                if (!fio_file_fsync(td, f))
                    continue;

                log_err("fio: end_fsync failed for file %s\n",
                                f->file_name);
            }
        }
    } else
        cleanup_pending_aio(td);

    if (!ddir_rw_sum(td->this_io_bytes))
        td->done = 1;

    return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
}