summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php')
-rw-r--r--plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php253
1 files changed, 161 insertions, 92 deletions
diff --git a/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php b/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php
index b5b73244..2b8bd231 100644
--- a/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php
+++ b/plugins/jetpack/sync/class.jetpack-sync-module-full-sync.php
@@ -12,16 +12,10 @@
* - we fire a trigger for the entire array which the Jetpack_Sync_Listener then serializes and queues.
*/
-require_once 'class.jetpack-sync-wp-replicastore.php';
-
class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
const STATUS_OPTION_PREFIX = 'jetpack_sync_full_';
const FULL_SYNC_TIMEOUT = 3600;
- private $items_added_since_last_pause;
- private $last_pause_time;
- private $queue_rate_limit;
-
public function name() {
return 'full-sync';
}
@@ -38,14 +32,12 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
add_action( 'jetpack_sync_processed_actions', array( $this, 'update_sent_progress_action' ) );
}
- function start( $modules = null ) {
+ function start( $module_configs = null ) {
$was_already_running = $this->is_started() && ! $this->is_finished();
// remove all evidence of previous full sync items and status
$this->reset_data();
- $this->enable_queue_rate_limit();
-
if ( $was_already_running ) {
/**
* Fires when a full sync is cancelled.
@@ -55,71 +47,124 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
do_action( 'jetpack_full_sync_cancelled' );
}
- /**
- * Fires when a full sync begins. This action is serialized
- * and sent to the server so that it knows a full sync is coming.
- *
- * @since 4.2.0
- */
- do_action( 'jetpack_full_sync_start', $modules );
$this->update_status_option( 'started', time() );
+ $this->update_status_option( 'params', $module_configs );
- // configure modules
- if ( ! is_array( $modules ) ) {
- $modules = array();
- }
-
- if ( isset( $modules['users'] ) && 'initial' === $modules['users'] ) {
- $user_module = Jetpack_Sync_Modules::get_module( 'users' );
- $modules['users'] = $user_module->get_initial_sync_user_config();
- }
+ $enqueue_status = array();
+ $full_sync_config = array();
- // by default, all modules are fully enabled
- if ( count( $modules ) === 0 ) {
- $default_module_config = true;
- } else {
- $default_module_config = false;
+ // default value is full sync
+ if ( ! is_array( $module_configs ) ) {
+ $module_configs = array();
+ foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
+ $module_configs[ $module->name() ] = true;
+ }
}
// set default configuration, calculate totals, and save configuration if totals > 0
foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
$module_name = $module->name();
- if ( ! isset( $modules[ $module_name ] ) ) {
- $modules[ $module_name ] = $default_module_config;
+ $module_config = isset( $module_configs[ $module_name ] ) ? $module_configs[ $module_name ] : false;
+
+ if ( ! $module_config ) {
+ continue;
}
- // check if this module is enabled
- if ( ! ( $module_config = $modules[ $module_name ] ) ) {
- continue;
+ if ( 'users' === $module_name && 'initial' === $module_config ) {
+ $module_config = $module->get_initial_sync_user_config();
}
+ $enqueue_status[ $module_name ] = false;
+
$total_items = $module->estimate_full_sync_actions( $module_config );
+ // if there's information to process, configure this module
if ( ! is_null( $total_items ) && $total_items > 0 ) {
- $this->update_status_option( "{$module_name}_total", $total_items );
- $this->update_status_option( "{$module_name}_config", $module_config );
+ $full_sync_config[ $module_name ] = $module_config;
+ $enqueue_status[ $module_name ] = array(
+ $total_items, // total
+ 0, // queued
+ false, // current state
+ );
}
}
+ $this->set_config( $full_sync_config );
+ $this->set_enqueue_status( $enqueue_status );
+
+ /**
+ * Fires when a full sync begins. This action is serialized
+ * and sent to the server so that it knows a full sync is coming.
+ *
+ * @since 4.2.0
+ */
+ do_action( 'jetpack_full_sync_start', $full_sync_config );
+
+ $this->continue_enqueuing( $full_sync_config, $enqueue_status );
+
+ return true;
+ }
+
+ function continue_enqueuing( $configs = null, $enqueue_status = null ) {
+ if ( ! $this->is_started() || $this->get_status_option( 'queue_finished' ) ) {
+ return;
+ }
+
+ // if full sync queue is full, don't enqueue more items
+ $max_queue_size_full_sync = Jetpack_Sync_Settings::get_setting( 'max_queue_size_full_sync' );
+ $full_sync_queue = new Jetpack_Sync_Queue( 'full_sync' );
+
+ $available_queue_slots = $max_queue_size_full_sync - $full_sync_queue->size();
+
+ if ( $available_queue_slots <= 0 ) {
+ return;
+ } else {
+ $remaining_items_to_enqueue = min( Jetpack_Sync_Settings::get_setting( 'max_enqueue_full_sync' ), $available_queue_slots );
+ }
+
+ if ( ! $configs ) {
+ $configs = $this->get_config();
+ }
+
+ if ( ! $enqueue_status ) {
+ $enqueue_status = $this->get_enqueue_status();
+ }
+
foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
- $module_name = $module->name();
- $module_config = $modules[ $module_name ];
+ $module_name = $module->name();
- // check if this module is enabled
- if ( ! $module_config ) {
+ // skip module if not configured for this sync or module is done
+ if ( ! isset( $configs[ $module_name ] )
+ || // no module config
+ ! $configs[ $module_name ]
+ || // no enqueue status
+ ! $enqueue_status[ $module_name ]
+ || // finished enqueuing this module
+ true === $enqueue_status[ $module_name ][ 2 ] ) {
continue;
}
- $items_enqueued = $module->enqueue_full_sync_actions( $module_config );
+ list( $items_enqueued, $next_enqueue_state ) = $module->enqueue_full_sync_actions( $configs[ $module_name ], $remaining_items_to_enqueue, $enqueue_status[ $module_name ][ 2 ] );
+
+ $enqueue_status[ $module_name ][ 2 ] = $next_enqueue_state;
+ // if items were processed, subtract them from the limit
if ( ! is_null( $items_enqueued ) && $items_enqueued > 0 ) {
- $this->update_status_option( "{$module_name}_queued", $items_enqueued );
+ $enqueue_status[ $module_name ][ 1 ] += $items_enqueued;
+ $remaining_items_to_enqueue -= $items_enqueued;
}
- }
- $this->update_status_option( 'queue_finished', time() );
+ // stop processing if we've reached our limit of items to enqueue
+ if ( 0 >= $remaining_items_to_enqueue ) {
+ $this->set_enqueue_status( $enqueue_status );
+ return;
+ }
+ }
+
+ $this->set_enqueue_status( $enqueue_status );
- $store = new Jetpack_Sync_WP_Replicastore();
+ // setting autoload to true means that it's faster to check whether we should continue enqueuing
+ $this->update_status_option( 'queue_finished', time(), true );
/**
* Fires when a full sync ends. This action is serialized
@@ -129,10 +174,6 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
* @since 4.2.0
*/
do_action( 'jetpack_full_sync_end', '' );
-
- $this->disable_queue_rate_limit();
-
- return true;
}
function update_sent_progress_action( $actions ) {
@@ -145,7 +186,7 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
}
if ( isset( $actions_with_counts['jetpack_full_sync_start'] ) ) {
- $this->update_status_option( 'sent_started', time() );
+ $this->update_status_option( 'send_started', time() );
}
foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
@@ -188,32 +229,37 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
$status = array(
'started' => $this->get_status_option( 'started' ),
'queue_finished' => $this->get_status_option( 'queue_finished' ),
- 'sent_started' => $this->get_status_option( 'sent_started' ),
+ 'send_started' => $this->get_status_option( 'send_started' ),
'finished' => $this->get_status_option( 'finished' ),
'sent' => array(),
'queue' => array(),
- 'config' => array(),
+ 'config' => $this->get_status_option( 'params' ),
'total' => array(),
);
+ $enqueue_status = $this->get_enqueue_status();
+ $module_config = $this->get_config();
+
foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
$name = $module->name();
- if ( $total = $this->get_status_option( "{$name}_total" ) ) {
+ if ( ! isset( $enqueue_status[ $name ] ) ) {
+ continue;
+ }
+
+ list( $total, $queued, $state ) = $enqueue_status[ $name ];
+
+ if ( $total ) {
$status[ 'total' ][ $name ] = $total;
}
- if ( $queued = $this->get_status_option( "{$name}_queued" ) ) {
+ if ( $queued ) {
$status[ 'queue' ][ $name ] = $queued;
}
if ( $sent = $this->get_status_option( "{$name}_sent" ) ) {
$status[ 'sent' ][ $name ] = $sent;
}
-
- if ( $config = $this->get_status_option( "{$name}_config" ) ) {
- $status[ 'config' ][ $name ] = $config;
- }
}
return $status;
@@ -222,15 +268,13 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
public function clear_status() {
$prefix = self::STATUS_OPTION_PREFIX;
delete_option( "{$prefix}_started" );
+ delete_option( "{$prefix}_params" );
delete_option( "{$prefix}_queue_finished" );
- delete_option( "{$prefix}_sent_started" );
+ delete_option( "{$prefix}_send_started" );
delete_option( "{$prefix}_finished" );
foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
- delete_option( "{$prefix}_{$module->name()}_total" );
- delete_option( "{$prefix}_{$module->name()}_queued" );
delete_option( "{$prefix}_{$module->name()}_sent" );
- delete_option( "{$prefix}_{$module->name()}_config" );
}
}
@@ -241,10 +285,10 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
$listener->get_full_sync_queue()->reset();
}
- private function get_status_option( $option, $default = null ) {
+ private function get_status_option( $name, $default = null ) {
$prefix = self::STATUS_OPTION_PREFIX;
- $value = get_option( "{$prefix}_{$option}", $default );
+ $value = get_option( "{$prefix}_{$name}", $default );
if ( ! $value ) {
// don't cast to int if we didn't find a value - we want to preserve null or false as sentinals
@@ -254,43 +298,68 @@ class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
return is_numeric( $value ) ? intval( $value ) : $value;
}
- private function update_status_option( $name, $value ) {
+ private function update_status_option( $name, $value, $autoload = false ) {
$prefix = self::STATUS_OPTION_PREFIX;
- update_option( "{$prefix}_{$name}", $value, false );
+ update_option( "{$prefix}_{$name}", $value, $autoload );
}
- private function enable_queue_rate_limit() {
- $this->queue_rate_limit = Jetpack_Sync_Settings::get_setting( 'queue_max_writes_sec' );
- $this->items_added_since_last_pause = 0;
- $this->last_pause_time = microtime( true );
+ private function set_enqueue_status( $new_status ) {
+ $this->write_option( 'jetpack_sync_full_enqueue_status', $new_status );
+ }
- add_action( 'jpsq_item_added', array( $this, 'queue_item_added' ) );
- add_action( 'jpsq_items_added', array( $this, 'queue_items_added' ) );
+ private function get_enqueue_status() {
+ return $this->read_option( 'jetpack_sync_full_enqueue_status' );
}
- private function disable_queue_rate_limit() {
- remove_action( 'jpsq_item_added', array( $this, 'queue_item_added' ) );
- remove_action( 'jpsq_items_added', array( $this, 'queue_items_added' ) );
+ private function set_config( $config ) {
+ $this->write_option( 'jetpack_sync_full_config', $config );
+ }
+
+ private function get_config() {
+ return $this->read_option( 'jetpack_sync_full_config' );
}
- public function queue_item_added() {
- $this->queue_items_added( 1 );
+ private function write_option( $name, $value ) {
+ // we write our own option updating code to bypass filters/caching/etc on set_option/get_option
+ global $wpdb;
+ $serialized_value = maybe_serialize( $value );
+ // try updating, if no update then insert
+ // TODO: try to deal with the fact that unchanged values can return updated_num = 0
+ // below we used "insert ignore" to at least suppress the resulting error
+ $updated_num = $wpdb->query(
+ $wpdb->prepare(
+ "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
+ $serialized_value,
+ $name
+ )
+ );
+
+ if ( ! $updated_num ) {
+ $updated_num = $wpdb->query(
+ $wpdb->prepare(
+ "INSERT IGNORE INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
+ $name,
+ $serialized_value
+ )
+ );
+ }
+ return $updated_num;
}
- public function queue_items_added( $item_count ) {
- // jpsq_item_added and jpsq_items_added both exec 1 db query,
- // so we ignore $item_count and treat it as always 1
- $this->items_added_since_last_pause += 1;
-
- if ( $this->items_added_since_last_pause > $this->queue_rate_limit ) {
- // sleep for the rest of the second
- $sleep_til = $this->last_pause_time + 1.0;
- $sleep_duration = $sleep_til - microtime( true );
- if ( $sleep_duration > 0.0 ) {
- usleep( $sleep_duration * 1000000 );
- $this->last_pause_time = microtime( true );
- }
- $this->items_added_since_last_pause = 0;
+ private function read_option( $name, $default = null ) {
+ global $wpdb;
+ $value = $wpdb->get_var(
+ $wpdb->prepare(
+ "SELECT option_value FROM $wpdb->options WHERE option_name = %s LIMIT 1",
+ $name
+ )
+ );
+ $value = maybe_unserialize( $value );
+
+ if ( $value === null && $default !== null ) {
+ return $default;
}
+
+ return $value;
}
}