œ_#ÁÕ§TE NAŒ“KeÉ:”(åŽÖJÞùY’‚ñùž7; «]Û ý`8g“¯B© jdÖÖ¸ðzœ¸¦4Ç3Kó^(ÍÖ¼ Õ€pvìwšõB4df$Èü^0˜…åÌC$#2FŽÑ§±¦ÛZ/÷š&m£ñzÒÖ ’.Î]!Î;ƒ(Õ–¢d/—#Kª+tZyuÏB>NÛÖ†(¸ŒSà'³„Y˜´-_•¦¼´˜OlNK§¶ÒàŠˆTHµƒeTPå·fïM’…þuÏÍüp6دªE£åü‡ZØ'CKF#â«;‹eyO Qp„†l"ö1èíÙP ÏŒúl! BÝ2ñª•_VÁÉ÷3eu`–F¸ìI--ö<¿žë¯4õ캿¢)34Å{wMÉ2ÆÖFŸ¥`e9Ú¶¸P‡.”FÔï rY ‚²ÈTB,{ÛœéJ}«àQ4¹0Rû4D‚B§S‘ dO•v¾„™Sן¯3FeŸ™«+ÓâwH dÕÛÌì·P4ë&¥#rÜÉ Ù¦ê†ý·xòqk¯2,¹§™E\ék‚×Sá”ÚºÙ⺷ö£6…à ʾ qSá³Å|;àû}4Ÿ($â¹VY~óÍ!èÜÒŒËX½Ù1j‚VíÍŸš³+œ]«½g{_{/vµ½\¢¶vÉWKÿ:ñám½ ¥ S²x‘t ŽšÝÙÿÀÇ^ný PK IW™k‚½÷ á _rels/.relsUT dìd dìd dìd’ÏNÃ0‡ï{ŠÈ÷ÕÝ@¡¥» ¤Ý*`%îÑ&QâÁöö‚J£ì°cœŸ¿|¶²ÙÆA½rL½wVE Šñ¶w†çúay * 9Kƒw¬áÈ ¶ÕbóÄIîI]’Ê—4t"á1™ŽGJ…ìòMããH’±Å@æ…ZÆuYÞ`üÍ€jÂT;«!îì T}|Û7MoøÞ›ýÈNN<|v–í2ÄÜ¥ÏèšbË¢Ázó˜Ë )„"£OÏ7ú{ZYÈ’yÞç#1'tuÉM?6o>Z´_å9›ëKÚ˜}?þ³žÏÌ·N>fµx PK IWª½e ¢ U € word/document.xmlUT dìdPK IWþË3” z €J¢ word/settings.xmlUT dìdPK IWC‡{š' ƒ €¤ docProps/custom.xmlUT dìdPK IW츱=Œ €‡¥ [Content_Types].xmlUT dìdPK IWV%ë±" €U§ docProps/app.xmlUT dìdPK IW€RŒ 3 €¶¨ docProps/core.xmlUT dìdPK IWkòDn ô €ª word/_rels/document.xml.relsUT dìdPK IW;$î €Î« word/fontTable.xmlUT dìdPK IW+åäz] ÷. €ý¬ word/numbering.xmlUT dìdPK IW¤2×r- ¿ €›° word/styles.xmlUT dìdPK IWMFÒ ø €´ word/header1.xmlUT dìdPK IWF— T e €· word/media/image1.jpegUT dìdPK IW!Yéáå €°Ë word/media/image2.pngUT dìdPK IW°Àºë ú €ÙÌ word/media/image3.pngUT dìdPK IW$“†ª L €Î word/footer1.xmlUT dìdPK IWzaGôM €ñÑ word/footer2.xmlUT dìdPK IW–µâº P €}Õ word/theme/theme1.xmlUT dìdPK IW™k‚½÷ á €{Û _rels/.relsUT PK ! bîh^ [Content_Types].xml ¢( ¬”ËNÃ0E÷HüCä-Jܲ@5í‚Ç*Q>Àēƪc[žiiÿž‰ûB¡j7±ÏÜ{2ñÍh²nm¶‚ˆÆ»R‹ÈÀU^7/ÅÇì%¿’rZYï @1__f› ˜q·ÃR4DáAJ¬h>€ãÚÇV߯¹ªZ¨9ÈÛÁàNVÞ8Ê©ÓãÑÔji){^óã-I‹"{Üv^¥P!XS)bR¹rú—K¾s(¸3Õ`cÞ0†½ÝÎß»¾7M4²©ŠôªZÆk+¿|\|z¿(Ž‹ôPúº6h_-[ž@!‚ÒØ Pk‹´2nÏ}Ä?£LËð Ýû%áÄßdºždN"m,à¥ÇžDO97*‚~§Èɸ8ÀOíc|n¦Ñ äEøÿöéºóÀBÉÀ!$}‡íàÈé;{ìÐå[ƒîñ–é2þ ÿÿ PK ! µU0#ô L _rels/.rels ¢( ¬’MOÃ0†ïHü‡È÷ÕÝBKwAH»!T~€Iܵ£$Ý¿'TƒG½~üÊÛÝ<êÈ!öâ4¬‹;#¶w†—úqu *&r–Fq¬áÄvÕõÕö™GJy(v½*«¸¨¡KÉß#FÓñD±Ï.W ¥†=™ZÆMYÞbø®ÕBS톰·7 ê“Ï›×–¦é ?ˆ9LìÒ™ÈsbgÙ®|Èl!õùUSh9i°bžr:"y_dlÀóD›¿ý|-NœÈR"4ø2ÏGÇ% õZ´4ñËyÄ7 ëÈðÉ‚‹¨Þ ÿÿ PK ! Q48wÛ — xl/workbook.xml¤UÙnâ0}iþ!cñ‡ *–¢AšVU×$dC¬&vÆv UÕŸë@XÊK§/¹p|Žï¹N÷b“¥Ö •Š ÞC¸î"‹òHÄŒ¯zèá~b·‘¥4á1I§=ôJºèÿüÑ] ù¼âÙ ®z(Ñ:GE ͈ª‹œrˆ,…̈†©\9*—”Ä*¡Tg©ã¹nàd„q´Eåg0ÄrÉ":Q‘Q®· ’¦D}•°\UhYô¸ŒÈç"·#‘å ±`)Ó¯%(²²(œ®¸d‘‚ì nZ w v¡ñª• t¶TÆ")”Xê:@;[Ògú±ë`|²›ó=ø’ïHúÂL÷¬dðEVÁ+8€a÷Ûh¬Uz%„Íû"ZsÏÍCýî’¥ôqk]‹äù5ÉL¦Rd¥Dé˘i÷P ¦bM/|dÉ",…¨çãFNoçiûéë>aêiçsó#ðÄ ÕTr¢éHp ÜIú®ÝJìQ"ÀÜÖ-ý[0I¡¦ÀZ Z…d¡nˆN¬B¦=4 g %PDF-1.4 %âãÏÓ 3 0 obj << /Linearized 1 /L 422775 ÿØÿà JFIF ÿÛ C ÿÛ C ÿÀ X" ÿÄ ÿÄ H !1A"Qaq2‘¡#±ÁBRÑ3Cbrá$S‚¢²ð4ñ%6DTc’ÂsÿÄ ÿÄ = !1AQ"aq‘Á2R¡±BÑð#3br’²4á$‚¢ÂñÿÚ ? áHBßÝ`„! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! !@B„ „! ! stream
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
"""
This module contains RequestProcessor class
"""
import logging
import sys
import time
import traceback
from datetime import datetime, timedelta, timezone
from threading import Thread, RLock, current_thread
from typing import Callable, Any
from .autotracer import AutoTracer
from .common import Common
from .decision_maker import DecisionMaker
from .stat_sender import StatisticsSender
from ..db import session_scope, setup_database, RequestResult, cleanup_old_data, restore_database, is_malformed_database
from ..internal.exceptions import SSAError
from ..internal.utils import (
singleton,
url_split,
switch_schedstats
)
@singleton
class RequestProcessor(Common):
"""
SSA Request processor implementation.
Only one instance is allowed to be created
"""
BUFFER_SIZE = 100
def __init__(self, engine=None):
super().__init__()
self.logger = logging.getLogger('req_processor')
self.logger.info('Processor enabled: %s', __package__)
# enable throttling detection kernel mechanism on service start
switch_schedstats(enabled=True)
self.engine = engine if engine else setup_database()
self._lock = RLock()
self.decision_maker = DecisionMaker(engine=engine)
self.sender = StatisticsSender()
self.auto_tracer = AutoTracer(engine=engine)
self.start_background_routine()
# sqlite is not thread-safe and saving results one-by-one
# into database slowes ssa 10x times
# so let's make a buffer that will contain some elements
# and flush it periodically
self._buffer = []
@property
def configured_duration(self):
"""
Return config file value multiplied by 1000000,
as we receive duration in microseconds
"""
return self.requests_duration * 1000000
def send_stats(self, report: dict):
"""
Call Statistics Sender
"""
try:
self.sender.send(report)
except SSAError as e:
self.logger.error('StatisticsSender failed: %s', str(e))
def start_background_routine(self) -> None:
"""
Start dumper|DecisionMaker thread in background
"""
t = Thread(target=self.background_routine, daemon=True)
t.start()
self.logger.info('[%s] Routine started', t.name)
def background_routine(self) -> None:
"""
Dumps collected stats to file once an hour.
Runs DecisionMaker once a day
Cleanup storage after DecisionMaker run
"""
while True:
tick = datetime.now(timezone.utc)
if tick.minute == 0:
if tick.hour == 0:
if is_malformed_database(self.engine):
self._save_exec(self.restore_db_with_lock(self.engine))
self.logger.info(
'[%s] Routine thread found Database disk image is malformed and now restored (%s)',
current_thread().name, tick)
self.logger.info(
'[%s] Routine thread launching buffer flushing (%s)',
current_thread().name, tick)
self._safe_exec(self.flush_with_lock)
self.logger.info(
'[%s] Routine thread launching AutoTracer (%s)',
current_thread().name, tick)
self._safe_exec(self.auto_tracer)
self.logger.info(
'[%s] Routine thread launching DecisionMaker (%s)',
current_thread().name, tick)
report = self._safe_exec(self.decision_maker)
self.logger.info(
'[%s] Routine thread launching cleanup (%s)',
current_thread().name, tick)
cleanup_old_data(self.engine)
self._safe_exec(self.send_stats, report)
# attempt to enable throttling detection kernel mechanism
# in case it was accidentally switched off
switch_schedstats(enabled=True)
self._simple_sleep(60)
else:
self._sleep_till_next_hour(tick.minute)
def _safe_exec(self, action: Callable, *args) -> Any:
"""Call requested Callable with given args and capture any exception"""
try:
return action(*args)
except Exception:
et, ev, _ = sys.exc_info()
self.logger.exception('%s failed with exception %s, %s',
str(action), et, ev,
extra={'orig_traceback': traceback.format_exc()})
def _simple_sleep(self, to_sleep: int = 15 * 60):
"""
Log and sleep given number of seconds or 15 minutes by default
"""
self.logger.info('[%s] Routine thread sleeping for (%s)',
current_thread().name, to_sleep)
time.sleep(to_sleep)
def _sleep_till_next_hour(self, start_minute):
"""
Sleep the number of minutes remaining till next hour
"""
sleep_for = (timedelta(hours=1) - timedelta(
minutes=start_minute)).total_seconds()
self._simple_sleep(int(sleep_for))
def restore_db_with_lock(self, engine):
with self._lock:
restore_database(engine)
@staticmethod
def get_interval_for(timestamp: int) -> int:
"""
Takes an hour of a day, to which the given timestamp belongs
"""
return datetime.fromtimestamp(timestamp, timezone.utc).hour
def flush_with_lock(self):
with self._lock:
objects = self._buffer[:]
self._buffer = []
self.flush_buffer(objects)
def flush_buffer(self, objects=None):
"""
Save in-memory buffer into database.
"""
if objects is None:
objects = self._buffer
if not objects:
return
with session_scope(self.engine) as db:
db.bulk_save_objects(objects)
def handle(self, data: dict) -> None:
"""
Process given request data
"""
if not data:
self.logger.info('[%s] has empty request, skipping', current_thread().name)
return
url = data.get('url')
if self.is_ignored(url):
self.logger.debug('%s ignored', url)
return
domain, uri = url_split(url)
self.logger.debug('[%s] Acquires lock to handle request counters',
current_thread().name)
objects_per_thread = []
with self._lock:
self._buffer.append(
RequestResult(
domain=domain,
path=url,
timestamp=data['timestamp'],
duration=data['duration'],
is_slow_request=data['duration'] > self.configured_duration,
hitting_limits=data['hitting_limits'],
throttled_time=data['throttled_time'],
io_throttled_time=data['io_throttled_time'],
wordpress=data['wordpress'],
)
)
# check buffer in same lock to prevent race conditions
# between threads modifying buffer
if len(self._buffer) >= self.BUFFER_SIZE:
objects_per_thread = self._buffer[:]
self._buffer = []
self.flush_buffer(objects_per_thread)
self.logger.debug('[%s] Released lock to handle request counters',
current_thread().name)