Tests for twitter

This commit is contained in:
Jan Kaluza 2015-12-27 16:31:00 +01:00
parent d0f3cbe60c
commit 5f0dc68407
6 changed files with 145 additions and 6 deletions

View file

@ -75,9 +75,10 @@ TwitterPlugin::TwitterPlugin(Config *config, Swift::SimpleEventLoop *loop, Stora
m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
tp = new ThreadPool(loop_, 10);
tweet_timer = m_factories->getTimerFactory()->createTimer(90000);
message_timer = m_factories->getTimerFactory()->createTimer(90000);
LOG4CXX_INFO(logger, "Fetch timeout is set to " << CONFIG_INT_DEFAULTED(config, "twitter.fetch_timeout", 90000));
tweet_timer = m_factories->getTimerFactory()->createTimer(CONFIG_INT_DEFAULTED(config, "twitter.fetch_timeout", 90000));
message_timer = m_factories->getTimerFactory()->createTimer(CONFIG_INT_DEFAULTED(config, "twitter.fetch_timeout", 90000));
tweet_timer->onTick.connect(boost::bind(&TwitterPlugin::pollForTweets, this));
message_timer->onTick.connect(boost::bind(&TwitterPlugin::pollForDirectMessages, this));

View file

@ -141,6 +141,7 @@ bool Config::load(std::istream &ifs, boost::program_options::options_description
("proxy.user", value<std::string>()->default_value(""), "Proxy user.")
("proxy.password", value<std::string>()->default_value(""), "Proxy Password.")
("proxy.port", value<int>()->default_value(0), "Proxy port.")
("twitter.fetch_timeout", value<int>()->default_value(0), "Proxy port.")
;

View file

@ -167,23 +167,36 @@ class JabberSlackServerModeConf(BaseTest):
if test.find("bad_password") != -1:
print "Changing password to 'badpassword'"
os.system("sqlite3 slack.sql \"UPDATE users SET password='badpassword' WHERE id=1\"")
#os.system("sqlite3 slack.sql \"SELECT * FROM users\"")
return False
def pre_test(self):
os.system("prosody --config ../slack_jabber/prosody.cfg.lua > prosody.log &")
#time.sleep(3)
#os.system("../../../spectrum_manager/src/spectrum2_manager -c manager.conf localhostxmpp set_oauth2_code xoxb-17213576196-VV2K8kEwwrJhJFfs5YWv6La6 use_bot_token 2>/dev/null >/dev/null")
def post_test(self):
os.system("killall lua-5.1 2>/dev/null")
os.system("killall spectrum2_libpurple_backend 2>/dev/null")
class TwitterServerModeConf(BaseTest):
def __init__(self):
BaseTest.__init__(self, "../twitter/twitter_test.cfg", True, "")
self.directory = "../twitter/"
self.client_password = "testpass123"
def skip_test(self, test):
os.system("cp ../twitter/twitter.sql .")
def pre_test(self):
pass
def post_test(self):
os.system("killall spectrum2_twitter_backend 2>/dev/null")
configurations = []
configurations.append(LibcommuniServerModeSingleServerConf())
configurations.append(LibcommuniServerModeConf())
configurations.append(JabberServerModeConf())
configurations.append(JabberSlackServerModeConf())
configurations.append(TwitterServerModeConf())
exitcode = 0

BIN
tests/twitter/twitter.sql Normal file

Binary file not shown.

View file

@ -0,0 +1,47 @@
[service]
jid = localhostxmpp
password = secret
server = 0.0.0.0
port = 5223
server_mode = 1
backend_host=127.0.0.1
pidfile=./test.pid
# < this option doesn't work yet
#backend_port=10001
admin_jid=admin
admin_password=test
#cert=server.pfx #patch to PKCS#12 certificate
#cert_password=test #password to that certificate if any
users_per_backend=10
#backend=../..//backends/swiften/spectrum2_swiften_backend
#backend=../../../backends/twitter/spectrum2_twitter_backend
backend=../../backends/twitter/spectrum2_twitter_backend
protocol=prpl-jabber
#protocol=prpl-msn
#protocol=any
#protocol=prpl-icq
working_dir=./
portfile=./$jid.port
irc_server=localhost
[twitter]
fetch_timeout=5000
[backend]
#default_avatar=catmelonhead.jpg
#no_vcard_fetch=true
[logging]
#config=logging.cfg # log4cxx/log4j logging configuration file
#backend_config=/home/hanzz/code/libtransport/spectrum/src/backend-logging.cfg # log4cxx/log4j logging configuration file for backends
[database]
type=sqlite3
database=twitter.sql
prefix=
#type = mysql # or "none" without database backend.......................................................................................................................
#database = test
#prefix=
#user=root
#password=yourrootsqlpassword
#encryption_key=hanzzik

View file

@ -0,0 +1,77 @@
import optparse
import sys
import time
import subprocess
import os
import sleekxmpp
class Responder(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, room, room_password, nick):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.room_password = room_password
self.nick = nick
self.finished = False
self.tests = {}
class Client(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, room, nick):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.nick = nick
self.add_event_handler("session_start", self.start)
self.add_event_handler("message", self.message)
self.finished = False
self.tests = {}
self.tests["timeline_poll"] = ["Timeline received automatically", False]
self.tests["help"] = ["#help command", False]
self.tests["status1"] = ["#status command response", False]
self.tests["status2"] = ["#status command pushes status", False]
self.tests["follow"] = ["#follow command", False]
self.tests["friends"] = ["#friends command", False]
self.tests["unfollow"] = ["#unfollow command", False]
self.tests["friends2"] = ["#friends after unfollow command", False]
self.status = "timeline"
self.timestamp = int(time.time())
def message(self, msg):
if self.status == "timeline" and msg['body'].find("spectrum2tests") != -1 and msg['body'].find("MsgId") != -1:
self.tests["timeline_poll"][1] = True
self.send_message(mto=msg['from'], mbody="#help")
self.status = "help"
elif self.status == "help" and msg['body'].find("You will receive tweets of people you follow") != -1:
self.tests["help"][1] = True
self.send_message(mto=msg['from'], mbody="#status My testing status " + str(self.timestamp))
self.status = "status"
elif self.status == "status" and msg['body'].find("Status Update successful") != -1:
self.tests["status1"][1] = True
elif self.status == "status" and msg['body'].find("spectrum2tests") != -1 and msg['body'].find("MsgId") != -1:
self.tests["status2"][1] = True
self.status = "follow"
self.send_message(mto=msg['from'], mbody="#follow colinpwheeler")
elif self.status == "follow" and msg['body'] == "You are now following colinpwheeler":
self.status = "friends"
self.tests["follow"][1] = True
self.send_message(mto=msg['from'], mbody="#friends")
elif self.status == "friends" and msg['body'].find("USER LIST") != -1 and msg['body'].find("colinpwheeler") != -1:
self.status = "unfollow"
self.tests["friends"][1] = True
self.send_message(mto=msg['from'], mbody="#unfollow colinpwheeler")
elif self.status == "unfollow" and msg['body'] == "You are not following colinpwheeler anymore":
self.status = "friends2"
self.tests["unfollow"][1] = True
self.send_message(mto=msg['from'], mbody="#friends")
elif self.status == "friends2" and msg['body'].find("USER LIST") != -1 and msg['body'].find("colinpwheeler") == -1:
self.tests["friends2"][1] = True
self.finished = True
def start(self, event):
self.getRoster()
self.sendPresence()