diff options
Diffstat (limited to 'plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php')
-rw-r--r-- | plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php | 253 |
1 files changed, 161 insertions, 92 deletions
diff --git a/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php b/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php index b5b73244..2b8bd231 100644 --- a/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php +++ b/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php @@ -12,16 +12,10 @@ * - we fire a trigger for the entire array which the Jetpack_Sync_Listener then serializes and queues. */ -require_once 'class.jetpack-sync-wp-replicastore.php'; - class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { const STATUS_OPTION_PREFIX = 'jetpack_sync_full_'; const FULL_SYNC_TIMEOUT = 3600; - private $items_added_since_last_pause; - private $last_pause_time; - private $queue_rate_limit; - public function name() { return 'full-sync'; } @@ -38,14 +32,12 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { add_action( 'jetpack_sync_processed_actions', array( $this, 'update_sent_progress_action' ) ); } - function start( $modules = null ) { + function start( $module_configs = null ) { $was_already_running = $this->is_started() && ! $this->is_finished(); // remove all evidence of previous full sync items and status $this->reset_data(); - $this->enable_queue_rate_limit(); - if ( $was_already_running ) { /** * Fires when a full sync is cancelled. @@ -55,71 +47,124 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { do_action( 'jetpack_full_sync_cancelled' ); } - /** - * Fires when a full sync begins. This action is serialized - * and sent to the server so that it knows a full sync is coming. - * - * @since 4.2.0 - */ - do_action( 'jetpack_full_sync_start', $modules ); $this->update_status_option( 'started', time() ); + $this->update_status_option( 'params', $module_configs ); - // configure modules - if ( ! is_array( $modules ) ) { - $modules = array(); - } - - if ( isset( $modules['users'] ) && 'initial' === $modules['users'] ) { - $user_module = Jetpack_Sync_Modules::get_module( 'users' ); - $modules['users'] = $user_module->get_initial_sync_user_config(); - } + $enqueue_status = array(); + $full_sync_config = array(); - // by default, all modules are fully enabled - if ( count( $modules ) === 0 ) { - $default_module_config = true; - } else { - $default_module_config = false; + // default value is full sync + if ( ! is_array( $module_configs ) ) { + $module_configs = array(); + foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { + $module_configs[ $module->name() ] = true; + } } // set default configuration, calculate totals, and save configuration if totals > 0 foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { $module_name = $module->name(); - if ( ! isset( $modules[ $module_name ] ) ) { - $modules[ $module_name ] = $default_module_config; + $module_config = isset( $module_configs[ $module_name ] ) ? $module_configs[ $module_name ] : false; + + if ( ! $module_config ) { + continue; } - // check if this module is enabled - if ( ! ( $module_config = $modules[ $module_name ] ) ) { - continue; + if ( 'users' === $module_name && 'initial' === $module_config ) { + $module_config = $module->get_initial_sync_user_config(); } + $enqueue_status[ $module_name ] = false; + $total_items = $module->estimate_full_sync_actions( $module_config ); + // if there's information to process, configure this module if ( ! is_null( $total_items ) && $total_items > 0 ) { - $this->update_status_option( "{$module_name}_total", $total_items ); - $this->update_status_option( "{$module_name}_config", $module_config ); + $full_sync_config[ $module_name ] = $module_config; + $enqueue_status[ $module_name ] = array( + $total_items, // total + 0, // queued + false, // current state + ); } } + $this->set_config( $full_sync_config ); + $this->set_enqueue_status( $enqueue_status ); + + /** + * Fires when a full sync begins. This action is serialized + * and sent to the server so that it knows a full sync is coming. + * + * @since 4.2.0 + */ + do_action( 'jetpack_full_sync_start', $full_sync_config ); + + $this->continue_enqueuing( $full_sync_config, $enqueue_status ); + + return true; + } + + function continue_enqueuing( $configs = null, $enqueue_status = null ) { + if ( ! $this->is_started() || $this->get_status_option( 'queue_finished' ) ) { + return; + } + + // if full sync queue is full, don't enqueue more items + $max_queue_size_full_sync = Jetpack_Sync_Settings::get_setting( 'max_queue_size_full_sync' ); + $full_sync_queue = new Jetpack_Sync_Queue( 'full_sync' ); + + $available_queue_slots = $max_queue_size_full_sync - $full_sync_queue->size(); + + if ( $available_queue_slots <= 0 ) { + return; + } else { + $remaining_items_to_enqueue = min( Jetpack_Sync_Settings::get_setting( 'max_enqueue_full_sync' ), $available_queue_slots ); + } + + if ( ! $configs ) { + $configs = $this->get_config(); + } + + if ( ! $enqueue_status ) { + $enqueue_status = $this->get_enqueue_status(); + } + foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { - $module_name = $module->name(); - $module_config = $modules[ $module_name ]; + $module_name = $module->name(); - // check if this module is enabled - if ( ! $module_config ) { + // skip module if not configured for this sync or module is done + if ( ! isset( $configs[ $module_name ] ) + || // no module config + ! $configs[ $module_name ] + || // no enqueue status + ! $enqueue_status[ $module_name ] + || // finished enqueuing this module + true === $enqueue_status[ $module_name ][ 2 ] ) { continue; } - $items_enqueued = $module->enqueue_full_sync_actions( $module_config ); + list( $items_enqueued, $next_enqueue_state ) = $module->enqueue_full_sync_actions( $configs[ $module_name ], $remaining_items_to_enqueue, $enqueue_status[ $module_name ][ 2 ] ); + + $enqueue_status[ $module_name ][ 2 ] = $next_enqueue_state; + // if items were processed, subtract them from the limit if ( ! is_null( $items_enqueued ) && $items_enqueued > 0 ) { - $this->update_status_option( "{$module_name}_queued", $items_enqueued ); + $enqueue_status[ $module_name ][ 1 ] += $items_enqueued; + $remaining_items_to_enqueue -= $items_enqueued; } - } - $this->update_status_option( 'queue_finished', time() ); + // stop processing if we've reached our limit of items to enqueue + if ( 0 >= $remaining_items_to_enqueue ) { + $this->set_enqueue_status( $enqueue_status ); + return; + } + } + + $this->set_enqueue_status( $enqueue_status ); - $store = new Jetpack_Sync_WP_Replicastore(); + // setting autoload to true means that it's faster to check whether we should continue enqueuing + $this->update_status_option( 'queue_finished', time(), true ); /** * Fires when a full sync ends. This action is serialized @@ -129,10 +174,6 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { * @since 4.2.0 */ do_action( 'jetpack_full_sync_end', '' ); - - $this->disable_queue_rate_limit(); - - return true; } function update_sent_progress_action( $actions ) { @@ -145,7 +186,7 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { } if ( isset( $actions_with_counts['jetpack_full_sync_start'] ) ) { - $this->update_status_option( 'sent_started', time() ); + $this->update_status_option( 'send_started', time() ); } foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { @@ -188,32 +229,37 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { $status = array( 'started' => $this->get_status_option( 'started' ), 'queue_finished' => $this->get_status_option( 'queue_finished' ), - 'sent_started' => $this->get_status_option( 'sent_started' ), + 'send_started' => $this->get_status_option( 'send_started' ), 'finished' => $this->get_status_option( 'finished' ), 'sent' => array(), 'queue' => array(), - 'config' => array(), + 'config' => $this->get_status_option( 'params' ), 'total' => array(), ); + $enqueue_status = $this->get_enqueue_status(); + $module_config = $this->get_config(); + foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { $name = $module->name(); - if ( $total = $this->get_status_option( "{$name}_total" ) ) { + if ( ! isset( $enqueue_status[ $name ] ) ) { + continue; + } + + list( $total, $queued, $state ) = $enqueue_status[ $name ]; + + if ( $total ) { $status[ 'total' ][ $name ] = $total; } - if ( $queued = $this->get_status_option( "{$name}_queued" ) ) { + if ( $queued ) { $status[ 'queue' ][ $name ] = $queued; } if ( $sent = $this->get_status_option( "{$name}_sent" ) ) { $status[ 'sent' ][ $name ] = $sent; } - - if ( $config = $this->get_status_option( "{$name}_config" ) ) { - $status[ 'config' ][ $name ] = $config; - } } return $status; @@ -222,15 +268,13 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { public function clear_status() { $prefix = self::STATUS_OPTION_PREFIX; delete_option( "{$prefix}_started" ); + delete_option( "{$prefix}_params" ); delete_option( "{$prefix}_queue_finished" ); - delete_option( "{$prefix}_sent_started" ); + delete_option( "{$prefix}_send_started" ); delete_option( "{$prefix}_finished" ); foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { - delete_option( "{$prefix}_{$module->name()}_total" ); - delete_option( "{$prefix}_{$module->name()}_queued" ); delete_option( "{$prefix}_{$module->name()}_sent" ); - delete_option( "{$prefix}_{$module->name()}_config" ); } } @@ -241,10 +285,10 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { $listener->get_full_sync_queue()->reset(); } - private function get_status_option( $option, $default = null ) { + private function get_status_option( $name, $default = null ) { $prefix = self::STATUS_OPTION_PREFIX; - $value = get_option( "{$prefix}_{$option}", $default ); + $value = get_option( "{$prefix}_{$name}", $default ); if ( ! $value ) { // don't cast to int if we didn't find a value - we want to preserve null or false as sentinals @@ -254,43 +298,68 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module { return is_numeric( $value ) ? intval( $value ) : $value; } - private function update_status_option( $name, $value ) { + private function update_status_option( $name, $value, $autoload = false ) { $prefix = self::STATUS_OPTION_PREFIX; - update_option( "{$prefix}_{$name}", $value, false ); + update_option( "{$prefix}_{$name}", $value, $autoload ); } - private function enable_queue_rate_limit() { - $this->queue_rate_limit = Jetpack_Sync_Settings::get_setting( 'queue_max_writes_sec' ); - $this->items_added_since_last_pause = 0; - $this->last_pause_time = microtime( true ); + private function set_enqueue_status( $new_status ) { + $this->write_option( 'jetpack_sync_full_enqueue_status', $new_status ); + } - add_action( 'jpsq_item_added', array( $this, 'queue_item_added' ) ); - add_action( 'jpsq_items_added', array( $this, 'queue_items_added' ) ); + private function get_enqueue_status() { + return $this->read_option( 'jetpack_sync_full_enqueue_status' ); } - private function disable_queue_rate_limit() { - remove_action( 'jpsq_item_added', array( $this, 'queue_item_added' ) ); - remove_action( 'jpsq_items_added', array( $this, 'queue_items_added' ) ); + private function set_config( $config ) { + $this->write_option( 'jetpack_sync_full_config', $config ); + } + + private function get_config() { + return $this->read_option( 'jetpack_sync_full_config' ); } - public function queue_item_added() { - $this->queue_items_added( 1 ); + private function write_option( $name, $value ) { + // we write our own option updating code to bypass filters/caching/etc on set_option/get_option + global $wpdb; + $serialized_value = maybe_serialize( $value ); + // try updating, if no update then insert + // TODO: try to deal with the fact that unchanged values can return updated_num = 0 + // below we used "insert ignore" to at least suppress the resulting error + $updated_num = $wpdb->query( + $wpdb->prepare( + "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", + $serialized_value, + $name + ) + ); + + if ( ! $updated_num ) { + $updated_num = $wpdb->query( + $wpdb->prepare( + "INSERT IGNORE INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )", + $name, + $serialized_value + ) + ); + } + return $updated_num; } - public function queue_items_added( $item_count ) { - // jpsq_item_added and jpsq_items_added both exec 1 db query, - // so we ignore $item_count and treat it as always 1 - $this->items_added_since_last_pause += 1; - - if ( $this->items_added_since_last_pause > $this->queue_rate_limit ) { - // sleep for the rest of the second - $sleep_til = $this->last_pause_time + 1.0; - $sleep_duration = $sleep_til - microtime( true ); - if ( $sleep_duration > 0.0 ) { - usleep( $sleep_duration * 1000000 ); - $this->last_pause_time = microtime( true ); - } - $this->items_added_since_last_pause = 0; + private function read_option( $name, $default = null ) { + global $wpdb; + $value = $wpdb->get_var( + $wpdb->prepare( + "SELECT option_value FROM $wpdb->options WHERE option_name = %s LIMIT 1", + $name + ) + ); + $value = maybe_unserialize( $value ); + + if ( $value === null && $default !== null ) { + return $default; } + + return $value; } } |