diff --git a/phpBB/phpbb/messenger/method/base.php b/phpBB/phpbb/messenger/method/base.php index 71e0b798d1..28b9cde636 100644 --- a/phpBB/phpbb/messenger/method/base.php +++ b/phpBB/phpbb/messenger/method/base.php @@ -96,6 +96,15 @@ abstract class base return; } + /** + * get messenger method fie queue object name + * @return string + */ + abstract public function get_queue_object_name($user) + { + return ''; + } + /** * Sets the use of messenger queue flag * @@ -180,6 +189,16 @@ abstract class base { } + /** + * Send messages from the queue + * + * @param array $queue_data Queue data array + * @return void + */ + abstract public function process_queue(&$queue_data) + { + } + /** * Set email template to use * diff --git a/phpBB/phpbb/messenger/method/email.php b/phpBB/phpbb/messenger/method/email.php index d8025d431b..1968c6771e 100644 --- a/phpBB/phpbb/messenger/method/email.php +++ b/phpBB/phpbb/messenger/method/email.php @@ -92,6 +92,15 @@ class email extends base return NOTIFY_EMAIL; } + /** + * get messenger method fie queue object name + * @return string + */ + abstract public function get_queue_object_name($user) + { + return 'email'; + } + /** * Check if the messenger method is enabled * @return void @@ -438,6 +447,71 @@ class email extends base } } + /** + * Send messages from the queue + * + * @param array $queue_data Queue data array + * @return void + */ + public function process_queue(&$queue_data) + { + $queue_object_name = $this->get_queue_object_name(); + $messages_count = count($queue_data[$queue_object_name]['data']; + + if (!$this->is_enabled() || !$messages_count) + { + unset($queue_data[$queue_object_name]); + return; + } + + @set_time_limit(0); + + $package_size = $queue_data[$queue_object_name]['package_size'] ?? 0; + $num_items = (!$package_size || $messages_count < $package_size) ? $messages_count : $package_size; + $mailer = new Mailer($this->transport); + + for ($i = 0; $i < $num_items; $i++) + { + // Make variables available... + extract(array_shift($queue_data[$queue_object_name]['data'])); + + $break = false; + /** + * Event to send message via external transport + * + * @event core.notification_message_process + * @var bool break Flag indicating if the function return after hook + * @var Symfony\Component\Mime\Email email The Symfony Email object + * @since 3.2.4-RC1 + * @changed 4.0.0-a1 Added vars: email. Removed vars: addresses, subject, msg. + */ + $vars = [ + 'break', + 'email', + ]; + extract($this->dispatcher->trigger_event('core.notification_message_process', compact($vars))); + + if (!$break) + { + try + { + $mailer->send($email); + } + catch (TransportExceptionInterface $e) + { + $this->error('EMAIL', $e->getDebug()); + continue; + } + } + } + + // No more data for this object? Unset it + if (!count($this->queue_data[$queue_object_name]['data'])) + { + unset($this->queue_data[$queue_object_name]); + } + } + /** * Get mailer transport object * diff --git a/phpBB/phpbb/messenger/method/jabber.php b/phpBB/phpbb/messenger/method/jabber.php index 439150a013..9661efc64a 100644 --- a/phpBB/phpbb/messenger/method/jabber.php +++ b/phpBB/phpbb/messenger/method/jabber.php @@ -120,6 +120,15 @@ class jabber extends base return NOTIFY_IM; } + /** + * get messenger method fie queue object name + * @return string + */ + abstract public function get_queue_object_name($user) + { + return 'jabber'; + } + /** * Check if the messenger method is enabled * @return void @@ -127,10 +136,10 @@ class jabber extends base public function is_enabled() { return - empty($this->config['jab_enable']) || - empty($this->config['jab_host']) || - empty($this->config['jab_username']) || - empty($this->config['jab_password']); + !empty($this->config['jab_enable']) && + !empty($this->config['jab_host']) && + !empty($this->config['jab_username']) && + !empty($this->config['jab_password']); } /** @@ -416,6 +425,65 @@ class jabber extends base $this->use_queue = !$this->config['jab_package_size'] ? false : $use_queue; } + /** + * Send messages from the queue + * + * @param array $queue_data Queue data array + * @return void + */ + public function process_queue(&$queue_data) + { + $queue_object_name = $this->get_queue_object_name(); + $messages_count = count($queue_data[$queue_object_name]['data']; + + if (!$this->is_enabled() || !$messages_count) + { + unset($queue_data[$queue_object_name]); + return; + } + + @set_time_limit(0); + + $package_size = $queue_data[$queue_object_name]['package_size'] ?? 0; + $num_items = (!$package_size || $messages_count < $package_size) ? $messages_count : $package_size; + $mailer = new Mailer($this->transport); + + for ($i = 0; $i < $num_items; $i++) + { + // Make variables available... + extract(array_shift($queue_data[$queue_object_name]['data'])); + + if (!$this->connect()) + { + $this->error('JABBER', $this->user->lang['ERR_JAB_CONNECT'] . '
' . $this->get_log()); + return false; + } + + if (!$this->login()) + { + $this->error('JABBER', $this->user->lang['ERR_JAB_AUTH'] . '
' . $this->get_log()); + return false; + } + + foreach ($addresses as $address) + { + if ($this->send_message($address, $msg, $subject) === false) + { + $this->error('JABBER', $this->get_log()); + continue; + } + } + } + + // No more data for this object? Unset it + if (!count($this->queue_data[$queue_object_name]['data'])) + { + unset($this->queue_data[$queue_object_name]); + } + + $this->disconnect(); + } + /** * Send jabber message out */ @@ -452,7 +520,11 @@ class jabber extends base foreach ($addresses as $address) { - $this->send_message($address, $this->msg, $this->subject); + if ($this->send_message($address, $this->msg, $this->subject) === false) + { + $this->error('JABBER', $this->get_log()); + continue; + } } $this->disconnect(); @@ -460,11 +532,11 @@ class jabber extends base else { $this->queue->init('jabber', $this->config['jab_package_size']); - $this->queue->put('jabber', array( + $this->queue->put('jabber', [ 'addresses' => $addresses, 'subject' => $this->subject, - 'msg' => $this->msg) - ); + 'msg' => $this->msg, + ]); } unset($addresses); diff --git a/phpBB/phpbb/messenger/queue.php b/phpBB/phpbb/messenger/queue.php new file mode 100644 index 0000000000..eb2406e273 --- /dev/null +++ b/phpBB/phpbb/messenger/queue.php @@ -0,0 +1,217 @@ + + * @license GNU General Public License, version 2 (GPL-2.0) + * + * For full copyright and license information, please see + * the docs/CREDITS.txt file. + * + */ + +namespace phpbb\messenger; + +use phpbb\config\config; +use phpbb\event\dispatcher; +use phpbb\di\service_collection; +use phpbb\filesystem\filesystem; + +/** + * Handling messenger file queue + */ +class queue +{ + /** @var string */ + protected $cache_file; + + /** @var config */ + protected $config; + + /** @var array */ + protected $data = []; + + /** @var dispatcher */ + protected $dispatcher; + + /** @var phpbb\filesystem\filesystem_interface */ + protected $filesystem; + + /** @var service_collection */ + protected $messenger_method_collection; + + /** @var int */ + protected $package_size = 0; + + /** @var array */ + protected $queue_data = []; + + /** + * Messenger queue constructor. + * + * @param config $config + * @param dispatcher $dispatcher + * @param service_collection $messenger_method_collection + * @param string $cache_file + */ + function __construct(config $config, dispatcher $dispatcher, service_collection $messenger_method_collection, $cache_file) + { + $this->config = $config; + $this->dispatcher = $dispatcher; + $this->messenger_method_collection = $messenger_method_collection; + $this->filesystem = new filesystem(); + $this->cache_file = $cache_file; + } + + /** + * Init a queue object + * + * @param string $object Queue object type: email/jabber/etc + * @param int $package_size Size of the messenger package to send + * @return void + */ + public function init($object, $package_size) + { + $this->data[$object] = []; + $this->data[$object]['package_size'] = $package_size; + $this->data[$object]['data'] = []; + } + + /** + * Put message into the messenger file queue + * + * @param string $object Queue object type: email/jabber/etc + * @param mixed $message_data Message data to send + * @return void + */ + public function put($object, $message_data) + { + $this->data[$object]['data'][] = $message_data; + } + + /** + * Process the messenger file queue (using lock file) + * + * @return void + */ + public function process() + { + $lock = new \phpbb\lock\flock($this->cache_file); + $lock->acquire(); + + // avoid races, check file existence once + $have_cache_file = file_exists($this->cache_file); + if (!$have_cache_file || $this->config['last_queue_run'] > time() - $this->config['queue_interval']) + { + if (!$have_cache_file) + { + $this->config->set('last_queue_run', time(), false); + } + + $lock->release(); + return; + } + + $this->config->set('last_queue_run', time(), false); + + include($this->cache_file); + + $messenger_collection_iterator = $this->messenger_method_collection->getIterator(); + while ($messenger_collection_iterator->valid()) + { + $messenger_method = $messenger_collection_iterator->current(); + if (isset($this->queue_data[$messenger_method->get_queue_object_name()]) + { + $messenger_method->process_queue($this->queue_data); + } + $messenger_collection_iterator->next(); + } + + if (!count($this->queue_data)) + { + @unlink($this->cache_file); + } + else + { + if ($fp = @fopen($this->cache_file, 'wb')) + { + fwrite($fp, "queue_data = unserialize(" . var_export(serialize($this->queue_data), true) . ");\n\n?>"); + fclose($fp); + + if (function_exists('opcache_invalidate')) + { + @opcache_invalidate($this->cache_file); + } + + try + { + $this->filesystem->phpbb_chmod($this->cache_file, \phpbb\filesystem\filesystem_interface::CHMOD_READ | \phpbb\filesystem\filesystem_interface::CHMOD_WRITE); + } + catch (\phpbb\filesystem\exception\filesystem_exception $e) + { + // Do nothing + } + } + } + + $lock->release(); + } + + /** + * Save message data to the messenger file queue + * + * @return void + */ + public function save() + { + if (!count($this->data)) + { + return; + } + + $lock = new \phpbb\lock\flock($this->cache_file); + $lock->acquire(); + + if (file_exists($this->cache_file)) + { + include($this->cache_file); + + foreach ($this->queue_data as $object => $data_ary) + { + if (isset($this->data[$object]) && count($this->data[$object])) + { + $this->data[$object]['data'] = array_merge($data_ary['data'], $this->data[$object]['data']); + } + else + { + $this->data[$object]['data'] = $data_ary['data']; + } + } + } + + if ($fp = @fopen($this->cache_file, 'w')) + { + fwrite($fp, "queue_data = unserialize(" . var_export(serialize($this->data), true) . ");\n\n?>"); + fclose($fp); + + if (function_exists('opcache_invalidate')) + { + @opcache_invalidate($this->cache_file); + } + + try + { + $this->filesystem->phpbb_chmod($this->cache_file, \phpbb\filesystem\filesystem_interface::CHMOD_READ | \phpbb\filesystem\filesystem_interface::CHMOD_WRITE); + } + catch (\phpbb\filesystem\exception\filesystem_exception $e) + { + // Do nothing + } + + $this->data = []; + } + + $lock->release(); + } +}