aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2013-09-07 00:05:24 +0200
committerMichał Górny <mgorny@gentoo.org>2013-09-12 15:39:26 +0200
commit60bf2d0a9890e484a7c70e1de178f5600ca2db39 (patch)
treea17d8a55d86e4ea336edcfa5ab5018a312132fc0
parentUse session identifiers for unique LDAP db aliases. (diff)
downloadidentity.gentoo.org-60bf2d0a9890e484a7c70e1de178f5600ca2db39.tar.gz
identity.gentoo.org-60bf2d0a9890e484a7c70e1de178f5600ca2db39.tar.bz2
identity.gentoo.org-60bf2d0a9890e484a7c70e1de178f5600ca2db39.zip
Use context managers to clean up settings.DATABASES after binds.
-rw-r--r--okupy/accounts/views.py418
-rw-r--r--okupy/common/ldap_helpers.py55
-rw-r--r--okupy/tests/unit/test_ldapuser.py38
3 files changed, 262 insertions, 249 deletions
diff --git a/okupy/accounts/views.py b/okupy/accounts/views.py
index 96b2614..ab96d87 100644
--- a/okupy/accounts/views.py
+++ b/okupy/accounts/views.py
@@ -376,245 +376,245 @@ def activate(request, token):
@otp_required
def profile_settings(request):
""" Primary account settings, """
- user_info = get_bound_ldapuser(request)
- profile_settings = None
- if request.method == "POST":
- profile_settings = ProfileSettingsForm(request.POST)
- if profile_settings.is_valid():
- try:
- #birthday = profile_settings.cleaned_data['birthday']
- first_name = profile_settings.cleaned_data['first_name']
- last_name = profile_settings.cleaned_data['last_name']
-
- if user_info.first_name != first_name:
- user_info.first_name = first_name
-
- if user_info.last_name != last_name:
- user_info.last_name = last_name
-
- user_info.full_name = '%s %s' % (first_name, last_name)
- user_info.gecos = '%s %s' % (first_name, last_name)
-
- """
- if user_info.birthday != birthday:
- user_info.birthday = birthday
- """
+ with get_bound_ldapuser(request) as user_info:
+ profile_settings = None
+ if request.method == "POST":
+ profile_settings = ProfileSettingsForm(request.POST)
+ if profile_settings.is_valid():
try:
- user_info.save()
- except IntegrityError:
+ #birthday = profile_settings.cleaned_data['birthday']
+ first_name = profile_settings.cleaned_data['first_name']
+ last_name = profile_settings.cleaned_data['last_name']
+
+ if user_info.first_name != first_name:
+ user_info.first_name = first_name
+
+ if user_info.last_name != last_name:
+ user_info.last_name = last_name
+
+ user_info.full_name = '%s %s' % (first_name, last_name)
+ user_info.gecos = '%s %s' % (first_name, last_name)
+
+ """
+ if user_info.birthday != birthday:
+ user_info.birthday = birthday
+ """
+ try:
+ user_info.save()
+ except IntegrityError:
+ pass
+ except ldap.TYPE_OR_VALUE_EXISTS:
pass
- except ldap.TYPE_OR_VALUE_EXISTS:
- pass
- except Exception as error:
- logger.critical(error, extra=log_extra_data(request))
- logger_mail.exception(error)
- raise OkupyError("Can't contact LDAP server")
- else:
- profile_settings = ProfileSettingsForm()
+ except Exception as error:
+ logger.critical(error, extra=log_extra_data(request))
+ logger_mail.exception(error)
+ raise OkupyError("Can't contact LDAP server")
+ else:
+ profile_settings = ProfileSettingsForm()
- return render(request, 'settings-profile.html', {
- 'profile_settings': profile_settings,
- 'user_info': user_info,
- })
+ return render(request, 'settings-profile.html', {
+ 'profile_settings': profile_settings,
+ 'user_info': user_info,
+ })
@strong_auth_required
@otp_required
def password_settings(request):
""" Password settings """
- user_info = get_bound_ldapuser(request)
- password_settings = None
- if request.method == "POST":
- password_settings = PasswordSettingsForm(request.POST)
- if password_settings.is_valid():
- try:
- new_password = password_settings.cleaned_data['new_password']
- new_password_verify = password_settings.cleaned_data[
- 'new_password_verify']
- old_password = password_settings.cleaned_data['old_password']
-
- if old_password and (new_password == new_password_verify):
- for hash in list(user_info.password):
- print hash
- try:
- if ldap_md5_crypt.verify(old_password, hash):
- user_info.password.append(
- ldap_md5_crypt.encrypt(
- new_password_verify))
- user_info.password.remove(hash)
- break
- except ValueError:
- # ignore unknown hashes
- pass
+ with get_bound_ldapuser(request) as user_info:
+ password_settings = None
+ if request.method == "POST":
+ password_settings = PasswordSettingsForm(request.POST)
+ if password_settings.is_valid():
try:
- user_info.save()
- except IntegrityError:
+ new_password = password_settings.cleaned_data['new_password']
+ new_password_verify = password_settings.cleaned_data[
+ 'new_password_verify']
+ old_password = password_settings.cleaned_data['old_password']
+
+ if old_password and (new_password == new_password_verify):
+ for hash in list(user_info.password):
+ print hash
+ try:
+ if ldap_md5_crypt.verify(old_password, hash):
+ user_info.password.append(
+ ldap_md5_crypt.encrypt(
+ new_password_verify))
+ user_info.password.remove(hash)
+ break
+ except ValueError:
+ # ignore unknown hashes
+ pass
+ try:
+ user_info.save()
+ except IntegrityError:
+ pass
+ except ldap.TYPE_OR_VALUE_EXISTS:
pass
- except ldap.TYPE_OR_VALUE_EXISTS:
- pass
- except Exception as error:
- logger.critical(error, extra=log_extra_data(request))
- logger_mail.exception(error)
- raise OkupyError("Can't contact LDAP server")
- else:
- password_settings = PasswordSettingsForm()
+ except Exception as error:
+ logger.critical(error, extra=log_extra_data(request))
+ logger_mail.exception(error)
+ raise OkupyError("Can't contact LDAP server")
+ else:
+ password_settings = PasswordSettingsForm()
- return render(request, 'settings-password.html', {
- 'password_settings': password_settings,
- 'user_info': user_info,
- })
+ return render(request, 'settings-password.html', {
+ 'password_settings': password_settings,
+ 'user_info': user_info,
+ })
@strong_auth_required
@otp_required
def email_settings(request):
""" Email Settings """
- user_info = get_bound_ldapuser(request)
- email_settings = None
- if request.method == "POST":
- email_settings = EmailSettingsForm(request.POST)
- if email_settings.is_valid():
- try:
- email = email_settings.cleaned_data['email']
-
- if request.POST.get('delete'):
- user_info.email.remove(email)
- else:
- user_info.email.append(email)
-
+ with get_bound_ldapuser(request) as user_info:
+ email_settings = None
+ if request.method == "POST":
+ email_settings = EmailSettingsForm(request.POST)
+ if email_settings.is_valid():
try:
- user_info.save()
- except IntegrityError:
+ email = email_settings.cleaned_data['email']
+
+ if request.POST.get('delete'):
+ user_info.email.remove(email)
+ else:
+ user_info.email.append(email)
+
+ try:
+ user_info.save()
+ except IntegrityError:
+ pass
+ except ldap.TYPE_OR_VALUE_EXISTS:
pass
- except ldap.TYPE_OR_VALUE_EXISTS:
- pass
- except Exception as error:
- logger.critical(error, extra=log_extra_data(request))
- logger_mail.exception(error)
- raise OkupyError("Can't contact LDAP server")
- else:
- email_settings = EmailSettingsForm()
+ except Exception as error:
+ logger.critical(error, extra=log_extra_data(request))
+ logger_mail.exception(error)
+ raise OkupyError("Can't contact LDAP server")
+ else:
+ email_settings = EmailSettingsForm()
- return render(request, 'settings-email.html', {
- 'email_settings': email_settings,
- 'user_info': user_info,
- })
+ return render(request, 'settings-email.html', {
+ 'email_settings': email_settings,
+ 'user_info': user_info,
+ })
@strong_auth_required
@otp_required
def contact_settings(request):
""" Contact details """
- user_info = get_bound_ldapuser(request)
- contact_settings = None
- if request.method == "POST":
- contact_settings = ContactSettingsForm(request.POST)
- if contact_settings.is_valid():
- try:
- gpg_fingerprint = contact_settings.cleaned_data[
- 'gpg_fingerprint']
- im = contact_settings.cleaned_data['im']
- latitude = contact_settings.cleaned_data['latitude']
- location = contact_settings.cleaned_data['location']
- longitude = contact_settings.cleaned_data['longitude']
- phone = contact_settings.cleaned_data['phone']
- website = contact_settings.cleaned_data['website']
-
- if user_info.location != location:
- user_info.location = location
-
- if user_info.phone != phone:
- user_info.phone = phone
-
- if user_info.website != website:
- user_info.website.pop()
- user_info.website.append(website)
-
- if user_info.im != im:
- user_info.im.pop()
- user_info.im.append(im)
-
- if user_info.longitude != longitude:
- user_info.longitude = longitude
-
- if user_info.latitude != latitude:
- user_info.latitude = latitude
-
- if user_info.gpg_fingerprint != gpg_fingerprint:
- user_info.gpg_fingerprint.pop()
- user_info.gpg_fingerprint.append(gpg_fingerprint)
-
+ with get_bound_ldapuser(request) as user_info:
+ contact_settings = None
+ if request.method == "POST":
+ contact_settings = ContactSettingsForm(request.POST)
+ if contact_settings.is_valid():
try:
- user_info.save()
- except IntegrityError:
+ gpg_fingerprint = contact_settings.cleaned_data[
+ 'gpg_fingerprint']
+ im = contact_settings.cleaned_data['im']
+ latitude = contact_settings.cleaned_data['latitude']
+ location = contact_settings.cleaned_data['location']
+ longitude = contact_settings.cleaned_data['longitude']
+ phone = contact_settings.cleaned_data['phone']
+ website = contact_settings.cleaned_data['website']
+
+ if user_info.location != location:
+ user_info.location = location
+
+ if user_info.phone != phone:
+ user_info.phone = phone
+
+ if user_info.website != website:
+ user_info.website.pop()
+ user_info.website.append(website)
+
+ if user_info.im != im:
+ user_info.im.pop()
+ user_info.im.append(im)
+
+ if user_info.longitude != longitude:
+ user_info.longitude = longitude
+
+ if user_info.latitude != latitude:
+ user_info.latitude = latitude
+
+ if user_info.gpg_fingerprint != gpg_fingerprint:
+ user_info.gpg_fingerprint.pop()
+ user_info.gpg_fingerprint.append(gpg_fingerprint)
+
+ try:
+ user_info.save()
+ except IntegrityError:
+ pass
+ except ldap.TYPE_OR_VALUE_EXISTS:
pass
- except ldap.TYPE_OR_VALUE_EXISTS:
- pass
- except Exception as error:
- logger.critical(error, extra=log_extra_data(request))
- logger_mail.exception(error)
- raise OkupyError("Can't contact LDAP server")
- else:
- contact_settings = ContactSettingsForm()
+ except Exception as error:
+ logger.critical(error, extra=log_extra_data(request))
+ logger_mail.exception(error)
+ raise OkupyError("Can't contact LDAP server")
+ else:
+ contact_settings = ContactSettingsForm()
- return render(request, 'settings-contact.html', {
- 'contact_settings': contact_settings,
- 'user_info': user_info,
- })
+ return render(request, 'settings-contact.html', {
+ 'contact_settings': contact_settings,
+ 'user_info': user_info,
+ })
@strong_auth_required
@otp_required
def gentoo_dev_settings(request):
""" Gentoo related information """
- user_info = get_bound_ldapuser(request)
- gentoo_account_settings = None
- if request.method == "POST":
- gentoo_account_settings = GentooAccountSettingsForm(request.POST)
- if gentoo_account_settings.is_valid():
- try:
- devbug = gentoo_account_settings.cleaned_data['developer_bug']
- gentoo_join_date = gentoo_account_settings.cleaned_data[
- 'gentoo_join_date']
- gentoo_mentor = gentoo_account_settings.cleaned_data['mentor']
- ssh_pubkey = gentoo_account_settings.cleaned_data['ssh_key']
-
- if user_info.developer_bug != devbug:
- user_info.developer_bug.append(devbug)
-
- if user_info.gentoo_join_date != gentoo_join_date:
- user_info.gentoo_join_date.append(gentoo_join_date)
-
- if user_info.mentor != gentoo_mentor:
- user_info.mentor.append(gentoo_mentor)
-
- if ssh_pubkey:
- user_info.ssh_key.append(ssh_pubkey)
-
- if user_info.is_retired or user_info.gentoo_retire_date:
- gentoo_retire_date = gentoo_account_settings.cleaned_data[
- 'gentoo_retire_date']
- if user_info.gentoo_retire_date != gentoo_retire_date:
- user_info.gentoo_retire_date.append(
- gentoo_retire_date)
-
+ with get_bound_ldapuser(request) as user_info:
+ gentoo_account_settings = None
+ if request.method == "POST":
+ gentoo_account_settings = GentooAccountSettingsForm(request.POST)
+ if gentoo_account_settings.is_valid():
try:
- user_info.save()
- except IntegrityError:
+ devbug = gentoo_account_settings.cleaned_data['developer_bug']
+ gentoo_join_date = gentoo_account_settings.cleaned_data[
+ 'gentoo_join_date']
+ gentoo_mentor = gentoo_account_settings.cleaned_data['mentor']
+ ssh_pubkey = gentoo_account_settings.cleaned_data['ssh_key']
+
+ if user_info.developer_bug != devbug:
+ user_info.developer_bug.append(devbug)
+
+ if user_info.gentoo_join_date != gentoo_join_date:
+ user_info.gentoo_join_date.append(gentoo_join_date)
+
+ if user_info.mentor != gentoo_mentor:
+ user_info.mentor.append(gentoo_mentor)
+
+ if ssh_pubkey:
+ user_info.ssh_key.append(ssh_pubkey)
+
+ if user_info.is_retired or user_info.gentoo_retire_date:
+ gentoo_retire_date = gentoo_account_settings.cleaned_data[
+ 'gentoo_retire_date']
+ if user_info.gentoo_retire_date != gentoo_retire_date:
+ user_info.gentoo_retire_date.append(
+ gentoo_retire_date)
+
+ try:
+ user_info.save()
+ except IntegrityError:
+ pass
+ except ldap.TYPE_OR_VALUE_EXISTS:
pass
- except ldap.TYPE_OR_VALUE_EXISTS:
- pass
- except Exception as error:
- logger.critical(error, extra=log_extra_data(request))
- logger_mail.exception(error)
- raise OkupyError("Can't contact LDAP server")
- else:
- gentoo_account_settings = GentooAccountSettingsForm()
+ except Exception as error:
+ logger.critical(error, extra=log_extra_data(request))
+ logger_mail.exception(error)
+ raise OkupyError("Can't contact LDAP server")
+ else:
+ gentoo_account_settings = GentooAccountSettingsForm()
- return render(request, 'settings-gentoo.html', {
- 'gentoo_account_settings': gentoo_account_settings,
- 'user_info': user_info,
- })
+ return render(request, 'settings-gentoo.html', {
+ 'gentoo_account_settings': gentoo_account_settings,
+ 'user_info': user_info,
+ })
@strong_auth_required
@@ -624,11 +624,11 @@ def otp_setup(request):
secret = None
conf_form = None
skeys = None
- user = get_bound_ldapuser(request)
if request.method == 'POST':
if 'disable' in request.POST:
- dev.disable(user)
+ with get_bound_ldapuser(request) as user:
+ dev.disable(user)
elif 'confirm' in request.POST and 'otp_secret' in request.session:
secret = request.session['otp_secret']
conf_form = OTPForm(request.POST)
@@ -646,11 +646,12 @@ def otp_setup(request):
messages.error(request, 'Token verification failed.')
conf_form = OTPForm()
else:
- dev.enable(user, secret)
- secret = None
- conf_form = None
- sdev = SOTPDevice.objects.get(user=request.user)
- skeys = sdev.gen_keys(user)
+ with get_bound_ldapuser(request) as user:
+ dev.enable(user, secret)
+ secret = None
+ conf_form = None
+ sdev = SOTPDevice.objects.get(user=request.user)
+ skeys = sdev.gen_keys(user)
messages.info(request, 'The new secret has been set.')
elif 'enable' in request.POST:
secret = dev.gen_secret()
@@ -658,7 +659,8 @@ def otp_setup(request):
conf_form = OTPForm()
elif 'recovery' in request.POST:
sdev = SOTPDevice.objects.get(user=request.user)
- skeys = sdev.gen_keys(user)
+ with get_bound_ldapuser(request) as user:
+ skeys = sdev.gen_keys(user)
messages.info(request, 'Your old recovery keys have been revoked.')
elif 'cancel' in request.POST:
messages.info(request, 'Secret change aborted. Previous settings'
diff --git a/okupy/common/ldap_helpers.py b/okupy/common/ldap_helpers.py
index ff8cd97..5b3e76a 100644
--- a/okupy/common/ldap_helpers.py
+++ b/okupy/common/ldap_helpers.py
@@ -34,24 +34,21 @@ def get_bound_ldapuser(request, password=None):
def set_secondary_password(request, password):
""" Generate a secondary passsword and encrypt it in the session """
- user = get_bound_ldapuser(request, password)
-
- secondary_password = Random.get_random_bytes(48)
- request.session['secondary_password'] = (
- cipher.encrypt(secondary_password))
- # Clean up possible leftover secondary passwords from the LDAP account
- if len(user.password) > 1:
- for hash in list(user.password):
- try:
- if not ldap_md5_crypt.verify(password, hash):
- user.password.remove(hash)
- except ValueError:
- # don't remove unknown hashes
- pass
- # Add a new generated encrypted password to LDAP
- user.password.append(
- ldap_md5_crypt.encrypt(b64encode(secondary_password)))
- user.save()
+ with get_bound_ldapuser(request, password) as user:
+ secondary_password = Random.get_random_bytes(48)
+ request.session['secondary_password'] = cipher.encrypt(secondary_password)
+ # Clean up possible leftover secondary passwords from the LDAP account
+ if len(user.password) > 1:
+ for hash in list(user.password):
+ try:
+ if not ldap_md5_crypt.verify(password, hash):
+ user.password.remove(hash)
+ except ValueError:
+ # don't remove unknown hashes
+ pass
+ # Add a new generated encrypted password to LDAP
+ user.password.append(ldap_md5_crypt.encrypt(b64encode(secondary_password)))
+ user.save()
def remove_secondary_password(request):
@@ -61,15 +58,15 @@ def remove_secondary_password(request):
request.session['secondary_password'], 48))
except KeyError:
return
- user = get_bound_ldapuser(request, password)
- if len(user.password) > 1:
- for hash in list(user.password):
- try:
- if ldap_md5_crypt.verify(password, hash):
- user.password.remove(hash)
- break
- except ValueError:
- # ignore unknown hashes
- pass
- user.save()
+ with get_bound_ldapuser(request, password) as user:
+ if len(user.password) > 1:
+ for hash in list(user.password):
+ try:
+ if ldap_md5_crypt.verify(password, hash):
+ user.password.remove(hash)
+ break
+ except ValueError:
+ # ignore unknown hashes
+ pass
+ user.save()
diff --git a/okupy/tests/unit/test_ldapuser.py b/okupy/tests/unit/test_ldapuser.py
index a160571..410e9f1 100644
--- a/okupy/tests/unit/test_ldapuser.py
+++ b/okupy/tests/unit/test_ldapuser.py
@@ -44,8 +44,8 @@ class LDAPUserUnitTests(TestCase):
request = set_request('/', user=vars.USER_ALICE)
request.session['secondary_password'] = cipher.encrypt(
secondary_password)
- user = get_bound_ldapuser(request)
- self.assertEqual(user.username, vars.USER_ALICE.username)
+ with get_bound_ldapuser(request) as user:
+ self.assertEqual(user.username, vars.USER_ALICE.username)
def test_get_bound_ldapuser_bind_as_is_properly_set_from_request(self):
secondary_password = Random.get_random_bytes(48)
@@ -56,22 +56,22 @@ class LDAPUserUnitTests(TestCase):
request = set_request('/', user=vars.USER_ALICE)
request.session['secondary_password'] = cipher.encrypt(
secondary_password)
- get_bound_ldapuser(request)
- db_alias = 'ldap_%s' % request.session.cache_key
- self.assertEqual(settings.DATABASES[db_alias]['PASSWORD'],
- b64encode(secondary_password))
+ with get_bound_ldapuser(request) as user: # noqa
+ db_alias = 'ldap_%s' % request.session.cache_key
+ self.assertEqual(settings.DATABASES[db_alias]['PASSWORD'],
+ b64encode(secondary_password))
def test_get_bound_ldapuser_bind_as_is_properly_set_from_password(self):
request = set_request('/', user=vars.USER_ALICE)
- get_bound_ldapuser(request, password='ldaptest')
- db_alias = 'ldap_%s' % request.session.cache_key
- self.assertTrue(ldap_md5_crypt.verify(settings.DATABASES[db_alias][
- 'PASSWORD'], ldap_users('alice')[1]['userPassword'][0]))
+ with get_bound_ldapuser(request, password='ldaptest') as user: # noqa
+ db_alias = 'ldap_%s' % request.session.cache_key
+ self.assertTrue(ldap_md5_crypt.verify(settings.DATABASES[db_alias][
+ 'PASSWORD'], ldap_users('alice')[1]['userPassword'][0]))
def test_get_bound_ldapuser_password_set(self):
request = set_request('/', user=vars.USER_ALICE)
- user = get_bound_ldapuser(request, password='ldaptest')
- self.assertEqual(user.username, vars.USER_ALICE.username)
+ with get_bound_ldapuser(request, password='ldaptest') as user:
+ self.assertEqual(user.username, vars.USER_ALICE.username)
def test_get_bound_ldapuser_no_password_available(self):
request = set_request('/', user=vars.USER_ALICE)
@@ -89,3 +89,17 @@ class LDAPUserUnitTests(TestCase):
request = set_request('/', user=vars.USER_ALICE)
self.assertRaises(ldap.INVALID_CREDENTIALS, get_bound_ldapuser,
request, 'test')
+
+ def test_get_bound_ldapuser_context_manager_cleans_up_settings(self):
+ secondary_password = Random.get_random_bytes(48)
+ secondary_password_crypt = ldap_md5_crypt.encrypt(b64encode(
+ secondary_password))
+ self.ldapobject.directory[ldap_users('alice')[0]][
+ 'userPassword'].append(secondary_password_crypt)
+ request = set_request('/', user=vars.USER_ALICE)
+ request.session['secondary_password'] = cipher.encrypt(
+ secondary_password)
+ with get_bound_ldapuser(request) as user: # noqa
+ pass
+ db_alias = 'ldap_%s' % request.session.cache_key
+ self.assertNotIn(db_alias, settings.DATABASES)