Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions keepercommander/commands/nested_share_folder/record_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
(create, update, link/unlink, shortcut management, delete).
"""

import json
import logging
from typing import List
from typing import Any, Dict, List, Optional

from ..base import Command, GroupCommand
from ..record_edit import RecordEditMixin, record_fields_description, ParsedFieldValue
from ...enforcement import PasswordComplexityEnforcer, RecordTypeEnforcer
from ...error import CommandError
from ... import nested_share_folder as _nsf, vault
from .helpers import (
Expand Down Expand Up @@ -48,6 +50,7 @@ class NestedShareRecordAddCommand(Command, RecordEditMixin):

def __init__(self):
super().__init__()
RecordEditMixin.__init__(self)

def get_parser(self):
return nested_share_record_add_parser
Expand All @@ -64,12 +67,17 @@ def execute(self, params, **kwargs):
if not record_type:
raise CommandError('nsf-record-add', 'Record type parameter is required.')

RecordTypeEnforcer.enforce(params, record_type, 'nsf-record-add')

self.warnings.clear()
self._password_policy = PasswordComplexityEnforcer.get_policy(params)

notes = kwargs.get('notes')
record_fields, add_attachments = self._parse_fields(kwargs.get('fields', []))
folder_uid = self._resolve_folder(params, kwargs.get('folder_uid'))
self.warnings.clear()

data = self._build_record_data(params, record_type, title, notes, record_fields)
self._check_password_policy(params, data, **kwargs)

if self.warnings:
for w in self.warnings:
Expand Down Expand Up @@ -165,6 +173,13 @@ def _legacy_to_data(record, title, notes=None):
data['notes'] = notes
return data

def _check_password_policy(self, params, data, **kwargs):
pw_failures = PasswordComplexityEnforcer.validate_record(params, data)
for failure in pw_failures:
self.on_warning(failure)
if pw_failures and not kwargs.get('force'):
self.on_warning('Use --force to bypass password policy warnings.')


# ══════════════════════════════════════════════════════════════════════════
# nsf-record-update
Expand All @@ -175,6 +190,7 @@ class NestedShareRecordUpdateCommand(Command, RecordEditMixin):

def __init__(self):
super().__init__()
RecordEditMixin.__init__(self)

def get_parser(self):
return nested_share_record_update_parser
Expand All @@ -190,7 +206,7 @@ def _resolve_field_value(self, parsed):
action_params.clear()
if self.is_generate_value(raw, action_params):
if parsed.type == 'password':
return self.generate_password(action_params)
return self.generate_password(action_params, policy=self._password_policy)
if parsed.type in ('oneTimeCode', 'otp'):
return self.generate_totp_url()
return raw
Expand All @@ -208,8 +224,12 @@ def execute(self, params, **kwargs):
if not record_uids:
raise CommandError('nsf-record-update', 'Record UID is required (use -r or --record)')

self.warnings.clear()
self._password_policy = PasswordComplexityEnforcer.get_policy(params)

record_type = kwargs.get('record_type')
if record_type and record_type not in ('legacy', 'general'):
RecordTypeEnforcer.enforce(params, record_type, 'nsf-record-update')
rt_fields = self.get_record_type_fields(params, record_type)
if not rt_fields:
raise CommandError('nsf-record-update', f'Record type "{record_type}" cannot be found.')
Expand Down Expand Up @@ -245,6 +265,20 @@ def execute(self, params, **kwargs):
ensure_nested_share_record(params, record_uid, 'nsf-record-update',
identifier=identifier)
check_record_edit_permission(params, record_uid, 'nsf-record-update')
merged = self._merge_update_data(
params, record_uid,
title=kwargs.get('title'),
record_type=record_type,
fields=fields or None,
notes=kwargs.get('notes'),
)
self._check_password_policy(params, merged, **kwargs)
if self.warnings:
for w in self.warnings:
logging.warning(w)
if not kwargs.get('force'):
return
self.warnings.clear()
result = _nsf.update_record_v3(
params=params, record_uid=record_uid,
title=kwargs.get('title'), record_type=record_type,
Expand All @@ -253,6 +287,53 @@ def execute(self, params, **kwargs):
check_result(result, 'nsf-record-update')
params.sync_data = True

def _check_password_policy(self, params, data, **kwargs):
pw_failures = PasswordComplexityEnforcer.validate_record(params, data)
for failure in pw_failures:
self.on_warning(failure)
if pw_failures and not kwargs.get('force'):
self.on_warning('Use --force to bypass password policy warnings.')

@staticmethod
def _load_record_data(params, record_uid): # type: (Any, str) -> Optional[Dict]
rec = params.record_cache.get(record_uid) or {}
raw = rec.get('data_unencrypted')
if raw is None:
nsf_data = getattr(params, 'nested_share_record_data', {}).get(record_uid) or {}
raw = nsf_data.get('data_json')
if raw is None:
return None
if isinstance(raw, bytes):
return json.loads(raw.decode('utf-8'))
if isinstance(raw, str):
return json.loads(raw)
if isinstance(raw, dict):
return raw.copy()
return None

@classmethod
def _merge_update_data(cls, params, record_uid, title=None, record_type=None,
fields=None, notes=None): # type: (...) -> Dict
existing = cls._load_record_data(params, record_uid)
data = existing.copy() if existing else {'fields': []}
if title is not None:
data['title'] = title
if record_type is not None:
data['type'] = record_type
if fields is not None:
by_type = {}
for ef in data.get('fields', []):
by_type.setdefault(ef.get('type'), []).append(ef)
for ft, fv in fields.items():
fv = fv if isinstance(fv, list) else [fv]
if ft in by_type and by_type[ft]:
by_type[ft][0]['value'] = fv
else:
data.setdefault('fields', []).append({'type': ft, 'value': fv})
if notes is not None:
data['notes'] = notes
return data


# ══════════════════════════════════════════════════════════════════════════
# nsf-ln
Expand Down
2 changes: 1 addition & 1 deletion keepercommander/commands/record_edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def assign_legacy_fields(self, record, fields):
elif parsed_field.type == 'password':
action_params.clear()
if self.is_generate_value(parsed_field.value, action_params):
record.password = self.generate_password(action_params)
record.password = self.generate_password(action_params, policy=self._password_policy)
elif self.is_base64_value(parsed_field.value, action_params):
if action_params:
record.password = action_params[0]
Expand Down
38 changes: 25 additions & 13 deletions keepercommander/enforcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from typing import Tuple, Optional, List, Dict, Any, Set

from . import api, utils, crypto
from .proto import APIRequest_pb2
from .proto import APIRequest_pb2, record_pb2
from .display import bcolors
from .params import KeeperParams
from .error import KeeperApiError, CommandError
Expand Down Expand Up @@ -528,18 +528,30 @@ def get_restricted_record_types(cls, params): # type: (KeeperParams) -> Option

cache = getattr(params, 'record_type_cache', None) or {}
restricted = set() # type: Set[str]
for rt_id in list(policy.get('std') or []) + list(policy.get('ent') or []):
entry = cache.get(rt_id)
if not entry:
continue
try:
schema = json.loads(entry) if isinstance(entry, str) else entry
except (json.JSONDecodeError, TypeError):
continue
if isinstance(schema, dict):
name = schema.get('$id')
if name:
restricted.add(name)
scope_buckets = (
('std', record_pb2.RT_STANDARD),
('ent', record_pb2.RT_ENTERPRISE),
)
for bucket, scope in scope_buckets:
for rt_id in policy.get(bucket) or []:
try:
rt_id = int(rt_id)
except (TypeError, ValueError):
continue
# Role policy stores bare recordTypeId; sync_down keys cache by
# recordTypeId + scope * 1_000_000.
scoped_id = rt_id + scope * 1_000_000
entry = cache.get(scoped_id) or cache.get(rt_id)
if not entry:
continue
try:
schema = json.loads(entry) if isinstance(entry, str) else entry
except (json.JSONDecodeError, TypeError):
continue
if isinstance(schema, dict):
name = schema.get('$id')
if name:
restricted.add(name)
return restricted

@classmethod
Expand Down
155 changes: 155 additions & 0 deletions unit-tests/test_nested_share_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Key utility/parsing functions
"""

import json
import os
import time
from unittest import TestCase, mock
Expand Down Expand Up @@ -283,6 +284,160 @@ def test_add_record(self, mock_create):
title='New Record', folder_uid=fuid, force=True,
record_type='general', fields=[])

@patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3')
def test_add_record_rejects_restricted_record_type(self, mock_create):
from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand
fuid, fobj = _make_folder()
params = _make_params(
nested_share_folders={fuid: fobj},
record_type_cache={1: json.dumps({'$id': 'login'})},
enforcements={
'jsons': [{'key': 'restrict_record_types', 'value': '{"std": [1], "ent": []}'}],
},
)
cmd = NestedShareRecordAddCommand()
with self.assertRaises(CommandError) as ctx:
cmd.execute(params, title='Blocked', record_type='login', fields=[], force=True)
self.assertIn('restricted', str(ctx.exception).lower())
mock_create.assert_not_called()

@patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3')
def test_add_record_rejects_weak_password_without_force(self, mock_create):
from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand
fuid, fobj = _make_folder()
params = _make_params(
nested_share_folders={fuid: fobj},
enforcements={
'jsons': [{
'key': 'generated_password_complexity',
'value': json.dumps([{
'length': 12,
'lower-use': True, 'lower-min': 1,
'upper-use': True, 'upper-min': 1,
'digit-use': True, 'digit-min': 1,
}]),
}],
},
)
cmd = NestedShareRecordAddCommand()
cmd.execute(params, title='Weak', record_type='general',
fields=['password=abc'], force=False)
mock_create.assert_not_called()

@patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3')
def test_add_record_allows_weak_password_with_force(self, mock_create):
from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand
mock_create.return_value = {
'record_uid': utils.generate_uid(), 'status': 'SUCCESS',
'message': '', 'success': True, 'revision': 1,
}
fuid, fobj = _make_folder()
params = _make_params(
nested_share_folders={fuid: fobj},
enforcements={
'jsons': [{
'key': 'generated_password_complexity',
'value': json.dumps([{
'length': 12,
'lower-use': True, 'lower-min': 1,
'upper-use': True, 'upper-min': 1,
'digit-use': True, 'digit-min': 1,
}]),
}],
},
)
cmd = NestedShareRecordAddCommand()
cmd.execute(params, title='Weak', record_type='general',
fields=['password=abc'], force=True)
mock_create.assert_called_once()

@patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3')
def test_add_record_gen_uses_password_policy(self, mock_create):
from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand
mock_create.return_value = {
'record_uid': utils.generate_uid(), 'status': 'SUCCESS',
'message': '', 'success': True, 'revision': 1,
}
fuid, fobj = _make_folder()
params = _make_params(
nested_share_folders={fuid: fobj},
enforcements={
'jsons': [{
'key': 'generated_password_complexity',
'value': json.dumps([{
'length': 16,
'lower-use': True, 'lower-min': 2,
'upper-use': True, 'upper-min': 2,
'digit-use': True, 'digit-min': 2,
'special-use': True, 'special-min': 1,
'special': '!@#$',
}]),
}],
},
)
cmd = NestedShareRecordAddCommand()
cmd.execute(params, title='Generated', record_type='general',
fields=['password=$GEN'], force=False)
mock_create.assert_called_once()
record_data = mock_create.call_args.kwargs['record_data']
password = next(
v[0] for f in record_data['fields']
if f.get('type') == 'password' for v in [f.get('value', [])] if v
)
self.assertGreaterEqual(len(password), 16)
self.assertGreaterEqual(sum(1 for c in password if c.islower()), 2)
self.assertGreaterEqual(sum(1 for c in password if c.isupper()), 2)
self.assertGreaterEqual(sum(1 for c in password if c.isdigit()), 2)
self.assertGreaterEqual(sum(1 for c in password if c in '!@#$'), 1)

@patch('keepercommander.commands.nested_share_folder.record_commands._nsf.update_record_v3')
@patch('keepercommander.commands.nested_share_folder.helpers.check_record_edit_permission')
def test_update_record_rejects_restricted_record_type(self, mock_perm, mock_update):
from keepercommander.commands.nested_share_folder import NestedShareRecordUpdateCommand
ruid, robj = _make_record()
params = _make_params(
nested_share_records={ruid: robj},
record_cache={ruid: {'revision': 1, 'data_unencrypted': json.dumps({
'type': 'login', 'title': 'Old', 'fields': [],
})}},
record_type_cache={1: json.dumps({'$id': 'login'})},
enforcements={
'jsons': [{'key': 'restrict_record_types', 'value': '{"std": [1], "ent": []}'}],
},
)
cmd = NestedShareRecordUpdateCommand()
with self.assertRaises(CommandError) as ctx:
cmd.execute(params, record_uids=[ruid], record_type='login', fields=[])
self.assertIn('restricted', str(ctx.exception).lower())
mock_update.assert_not_called()

@patch('keepercommander.commands.nested_share_folder.record_commands._nsf.update_record_v3')
@patch('keepercommander.commands.nested_share_folder.helpers.check_record_edit_permission')
def test_update_record_rejects_weak_password_without_force(self, mock_perm, mock_update):
from keepercommander.commands.nested_share_folder import NestedShareRecordUpdateCommand
ruid, robj = _make_record()
params = _make_params(
nested_share_records={ruid: robj},
record_cache={ruid: {'revision': 1, 'data_unencrypted': json.dumps({
'type': 'login', 'title': 'Old',
'fields': [{'type': 'password', 'value': ['ExistingPass123']}],
})}},
enforcements={
'jsons': [{
'key': 'generated_password_complexity',
'value': json.dumps([{
'length': 12,
'lower-use': True, 'lower-min': 1,
'upper-use': True, 'upper-min': 1,
'digit-use': True, 'digit-min': 1,
}]),
}],
},
)
cmd = NestedShareRecordUpdateCommand()
cmd.execute(params, record_uids=[ruid], fields=['password=abc'], force=False)
mock_update.assert_not_called()

@patch('keepercommander.nested_share_folder.folder_record_api.add_record_to_folder_v3')
def test_add_record_to_folder(self, mock_add):
pass
Expand Down
Loading