Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions keepercommander/commands/nested_share_folder/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
re.IGNORECASE,
)

MIN_SHARE_EXPIRATION_MS = 60_000
"""Minimum share expiration is one minute (milliseconds)."""


# ═══════════════════════════════════════════════════════════════════════════
# Error-handling patterns (eliminates repetitive try/except boilerplate)
Expand Down Expand Up @@ -306,6 +309,18 @@ def walk(fuid):
# Expiration parsing
# ═══════════════════════════════════════════════════════════════════════════

def validate_share_expiration_timestamp(expiration_ms, cmd_name):
"""Reject finite expirations that are less than one minute."""
if expiration_ms is None or expiration_ms == -1:
return
min_allowed = int(datetime.datetime.now(timezone.utc).timestamp() * 1000) + MIN_SHARE_EXPIRATION_MS
if expiration_ms < min_allowed:
raise CommandError(
cmd_name,
'Share expiration must be at least 1 minute.',
)


def parse_expiration(expire_at, expire_in, cmd_name):
"""Parse ``--expire-at`` / ``--expire-in`` into a millisecond timestamp.

Expand All @@ -320,13 +335,15 @@ def parse_expiration(expire_at, expire_in, cmd_name):
if expire_at:
try:
dt = datetime.datetime.fromisoformat(raw.replace('Z', '+00:00'))
return int(dt.timestamp() * 1000)
expiration_ms = int(dt.timestamp() * 1000)
except ValueError:
raise CommandError(
cmd_name,
f'Invalid --expire-at format: {raw!r}. '
f'Use ISO datetime, e.g. 2027-01-01T00:00:00Z or "never"',
)
validate_share_expiration_timestamp(expiration_ms, cmd_name)
return expiration_ms

m = _EXPIRATION_RE.fullmatch(raw)
if not m:
Expand All @@ -336,6 +353,11 @@ def parse_expiration(expire_at, expire_in, cmd_name):
)
amount = int(m.group(1))
unit = m.group(2).lower()
if unit.startswith('mi') and amount < 1:
raise CommandError(
cmd_name,
'Share expiration must be at least 1 minute.',
)
now = datetime.datetime.now(timezone.utc)
delta_map = {
'mi': timedelta(minutes=amount),
Expand All @@ -345,7 +367,9 @@ def parse_expiration(expire_at, expire_in, cmd_name):
'y': timedelta(days=amount * 365),
}
delta = next(v for k, v in delta_map.items() if unit.startswith(k))
return int((now + delta).timestamp() * 1000)
expiration_ms = int((now + delta).timestamp() * 1000)
validate_share_expiration_timestamp(expiration_ms, cmd_name)
return expiration_ms


# ═══════════════════════════════════════════════════════════════════════════
Expand Down
47 changes: 30 additions & 17 deletions keepercommander/commands/nested_share_folder/sharing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,23 @@ def _dispatch(params, action, record_uid, email, access_role_type, expiration):
params=params, record_uid=record_uid, new_owner_email=email), 'owner')

if action == 'grant':
if NestedShareRecordShareCommand._is_already_shared(
params, record_uid, email):
existing = NestedShareRecordShareCommand._get_direct_user_share(
params, record_uid, email)
if existing:
if _nsf.is_record_share_update_noop(
existing, access_role_type, expiration):
logging.info(
"Record '%s' already shared with '%s' at the requested "
"role and expiration; no change needed.",
record_uid, email)
return ({
'results': [{
'record_uid': record_uid,
'success': True,
'message': 'Already has requested access',
}],
'success': True,
}, 'update')
logging.debug(
"Record '%s' is already shared with user '%s'; switching to update.",
record_uid, email)
Expand All @@ -115,6 +130,17 @@ def _dispatch(params, action, record_uid, email, access_role_type, expiration):
return (_nsf.unshare_record_v3(
params=params, record_uid=record_uid, recipient_email=email), 'revoke')

@staticmethod
def _get_direct_user_share(params, record_uid, email):
"""Return direct AT_USER share metadata for *email*, or None."""
try:
access_result = _nsf.get_record_accesses_v3(params, [record_uid])
return _nsf.find_direct_user_share_access(
access_result, record_uid, email)
except Exception as exc:
logging.debug("Could not fetch record accesses for '%s': %s", record_uid, exc)
return None

@staticmethod
def _is_already_shared(params, record_uid, email):
"""Return True if *email* already has a *direct* non-owner share on *record_uid*.
Expand All @@ -126,21 +152,8 @@ def _is_already_shared(params, record_uid, email):
causes the caller to dispatch ``share_record_v3`` (a fresh direct
grant) which correctly overrides the inherited folder permission.
"""
try:
access_result = _nsf.get_record_accesses_v3(params, [record_uid])
for a in access_result.get('record_accesses', []):
if a.get('record_uid') != record_uid or a.get('owner', False):
continue
if a.get('access_type') and a.get('access_type') != 'AT_USER':
continue
if a.get('inherited'):
continue
if a.get('accessor_name', '').casefold() == email.casefold():
return True
return False
except Exception as exc:
logging.debug("Could not fetch record accesses for '%s': %s", record_uid, exc)
return False
return NestedShareRecordShareCommand._get_direct_user_share(
params, record_uid, email) is not None

@staticmethod
def _log_results(result, action, email):
Expand Down
8 changes: 4 additions & 4 deletions keepercommander/commands/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,15 +1558,15 @@ def execute(self, params, **kwargs):
'record_type': record.record_type,
'title': record.title,
'description': vault_extensions.get_record_description(record),
'record_category': 'Nested' if is_nsf else 'Classic',
'record_category': 'nested' if is_nsf else 'classic',
}
all_results.append(result_item)
else:
logging.info('')
table = []
headers = ['Record UID', 'Type', 'Title', 'Description', 'Record Category']
for record in records:
record_category = 'Nested' if record.record_uid in nsf_records_map else 'Classic'
record_category = 'nested' if record.record_uid in nsf_records_map else 'classic'
row = [record.record_uid, record.record_type, record.title,
vault_extensions.get_record_description(record), record_category]
table.append(row)
Expand Down Expand Up @@ -1655,7 +1655,7 @@ def execute(self, params, **kwargs):
for item in all_results:
if item['type'] == 'record':
row = [item['type'], item['record_uid'], item['title'],
f"Type: {item['record_type']}, Description: {item['description']}, Record Category: {item.get('record_category', 'Classic')}"]
f"Type: {item['record_type']}, Description: {item['description']}, Record Category: {item.get('record_category', 'classic')}"]
elif item['type'] == 'shared_folder':
row = [item['type'], item['shared_folder_uid'], item['name'],
f"Folder Category: Classic, Can Edit: {item['can_edit']}, Can Share: {item['can_share']}"]
Expand Down Expand Up @@ -1832,7 +1832,7 @@ def execute(self, params, **kwargs):
for record in records:
# Determine if record is from Nested Share Folder or Classic
is_nested_share = hasattr(params, 'nested_share_records') and record.record_uid in params.nested_share_records
record_category = 'Nested' if is_nested_share else 'Classic'
record_category = 'nested' if is_nested_share else 'classic'
row = [record.record_uid, record.record_type, record.title,
vault_extensions.get_record_description(record), record.shared, record_category]
table.append(row)
Expand Down
19 changes: 15 additions & 4 deletions keepercommander/commands/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def register_command_info(aliases, command_info):
create_account_parser = argparse.ArgumentParser(prog='create-account', description='Create Keeper Account')
create_account_parser.add_argument('email', help='email')

def get_share_expiration(expire_at, expire_in): # (Optional[str], Optional[str]) -> Optional[int]
def get_share_expiration(expire_at, expire_in, cmd_name='share-record'): # (Optional[str], Optional[str], str) -> Optional[int]
if not expire_at and not expire_in:
return

Expand All @@ -240,11 +240,20 @@ def get_share_expiration(expire_at, expire_in): # (Optional[str], Optional[s
if expire_in == 'never':
return -1
td = parse_timeout(expire_in)
if td < datetime.timedelta(minutes=1):
raise CommandError(
cmd_name,
'Share expiration must be at least 1 minute.',

)
dt = datetime.datetime.now() + td
if dt is None:
raise ValueError(f'Incorrect expiration: {expire_at or expire_in}')

return int(dt.timestamp())
expiration_seconds = int(dt.timestamp())
from .nested_share_folder.helpers import validate_share_expiration_timestamp
validate_share_expiration_timestamp(expiration_seconds * 1000, cmd_name)
return expiration_seconds


class ShareFolderCommand(Command):
Expand Down Expand Up @@ -310,7 +319,8 @@ def get_share_admin_obj_uids(obj_names, obj_type):

share_expiration = None
if action == 'grant':
share_expiration = get_share_expiration(kwargs.get('expire_at'), kwargs.get('expire_in'))
share_expiration = get_share_expiration(
kwargs.get('expire_at'), kwargs.get('expire_in'), cmd_name='share-folder')

rotate_on_expiration = bool(kwargs.get('rotate_on_expiration'))
if rotate_on_expiration:
Expand Down Expand Up @@ -690,7 +700,8 @@ def prep_request(params, kwargs): # type: (KeeperParams, Dict[str, Any]) -> Un

share_expiration = None
if action == 'grant':
share_expiration = get_share_expiration(kwargs.get('expire_at'), kwargs.get('expire_in'))
share_expiration = get_share_expiration(
kwargs.get('expire_at'), kwargs.get('expire_in'), cmd_name='share-record')

rotate_on_expiration = bool(kwargs.get('rotate_on_expiration'))
if rotate_on_expiration:
Expand Down
12 changes: 6 additions & 6 deletions keepercommander/commands/supershell/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def _get_welcome_screen_content(self) -> str:
[bold {t['primary_bright']}]Folder Icons[/bold {t['primary_bright']}]
[{t['text_dim']}]•[/{t['text_dim']}] Legacy Personal Folder 🔒
[{t['text_dim']}]•[/{t['text_dim']}] Legacy Shared Folder 📦
[{t['text_dim']}]•[/{t['text_dim']}] Drive Shared Folder 👥
[{t['text_dim']}]•[/{t['text_dim']}] Drive NonShared Folder 📁
[{t['text_dim']}]•[/{t['text_dim']}] Nested Shared Folder (Shared) 👥
[{t['text_dim']}]•[/{t['text_dim']}] Nested Shared Folder (NonShared) 📁

[{t['text_dim']}]Press [/{t['text_dim']}][{t['primary']}]?[/{t['primary']}][{t['text_dim']}] for full keyboard shortcuts[/{t['text_dim']}]"""

Expand Down Expand Up @@ -393,8 +393,8 @@ async def on_mount(self):
[bold {t['primary_bright']}]Folder Icons[/bold {t['primary_bright']}]
[{t['text_dim']}]•[/{t['text_dim']}] Legacy Personal Folder 🔒
[{t['text_dim']}]•[/{t['text_dim']}] Legacy Shared Folder 📦
[{t['text_dim']}]•[/{t['text_dim']}] Drive Shared Folder 👥
[{t['text_dim']}]•[/{t['text_dim']}] Drive NonShared Folder 📁
[{t['text_dim']}]•[/{t['text_dim']}] Nested Shared Folder (Shared) 👥
[{t['text_dim']}]•[/{t['text_dim']}] Nested Shared Folder (NonShared) 📁

[{t['text_dim']}]Press [/{t['text_dim']}][{t['primary']}]?[/{t['primary']}][{t['text_dim']}] for full keyboard shortcuts[/{t['text_dim']}]"""
detail_widget.update(help_content)
Expand Down Expand Up @@ -984,8 +984,8 @@ def _get_folder_icon(self, folder_node):
- Legacy Personal Folder (user_folder) → 🔒
- Legacy Shared Folder (shared_folder) → 📦
- Subfolder in Shared (shared_folder_folder) → 📦
- Drive Shared Folder (nested_share_folder, shared) → 👥
- Drive NonShared Folder (nested_share_folder, not shared) → 📁
- Nested Shared Folder (Shared) (nested_share_folder, shared) → 👥
- Nested Shared Folder (NonShared) (nested_share_folder, not shared) → 📁
"""
from ...subfolder import BaseFolderNode
if folder_node is None:
Expand Down
4 changes: 2 additions & 2 deletions keepercommander/commands/supershell/screens/help.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def compose(self) -> ComposeResult:
[green]Folder Icons:[/green]
🔒 Legacy Personal Folder
📦 Legacy Shared Folder
👥 Drive Shared Folder
📁 Drive NonShared Folder""", classes="help_column")
👥 Nested Shared Folder (Shared)
📁 Nested Shared Folder (NonShared)""", classes="help_column")
yield Static("[dim]Press Esc or q to close[/dim]", id="help_footer")

def action_dismiss(self):
Expand Down
1 change: 1 addition & 0 deletions keepercommander/nested_share_folder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
'create_record_data_v3', 'record_add_v3', 'record_update_v3',
'create_record_v3', 'update_record_v3', 'create_records_batch_v3',
'get_record_details_v3', 'get_record_accesses_v3',
'find_direct_user_share_access', 'is_record_share_update_noop',
'share_record_v3', 'update_record_share_v3', 'unshare_record_v3',
'batch_update_record_shares_v3', 'batch_create_record_shares_v3',
'batch_unshare_records_v3',
Expand Down
15 changes: 9 additions & 6 deletions keepercommander/nested_share_folder/folder_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,15 @@ def grant_folder_access_v3(params, folder_uid, user_uid, role='viewer',
existing = _check_existing_access(params, folder_uid, actual_uid_bytes,
target_role_name, access_type_label)
if existing is not None:
if existing == target_role_name:
if existing == target_role_name and expiration_timestamp is None:
return {'folder_uid': folder_uid, 'user_uid': identifier_label,
'access_type': access_type_label,
'status': 'SUCCESS',
'message': f"{'Team' if as_team else 'User'} already has {role} access",
'success': True, 'action_taken': 'already_had_access'}
result = update_folder_access_v3(params, folder_uid, identifier_label,
role=role, as_team=as_team)
role=role, as_team=as_team,
expiration_timestamp=expiration_timestamp)
result['action_taken'] = 'updated'
return result

Expand All @@ -382,7 +383,7 @@ def grant_folder_access_v3(params, folder_uid, user_uid, role='viewer',
ad.accessRoleType = access_role
ad.permissions.CopyFrom(get_folder_permissions_for_role(access_role))

if expiration_timestamp:
if expiration_timestamp is not None:
ad.tlaProperties.expiration = expiration_timestamp

if share_folder_key:
Expand Down Expand Up @@ -430,9 +431,9 @@ def _check_existing_access(params, folder_uid, uid_bytes, target_role_name,


def update_folder_access_v3(params, folder_uid, user_uid, role=None, hidden=None,
as_team=False):
if role is None and hidden is None:
raise ValueError("At least one field (role or hidden) required")
expiration_timestamp=None, as_team=False):
if role is None and hidden is None and expiration_timestamp is None:
raise ValueError("At least one field (role, hidden, or expiration) required")
resolved = resolve_folder_identifier(params, folder_uid)
if not resolved:
raise ValueError(f"Folder '{folder_uid}' not found")
Expand All @@ -453,6 +454,8 @@ def update_folder_access_v3(params, folder_uid, user_uid, role=None, hidden=None
ad.permissions.CopyFrom(get_folder_permissions_for_role(resolved_role))
if hidden is not None:
ad.hidden = hidden
if expiration_timestamp is not None:
ad.tlaProperties.expiration = expiration_timestamp

response = folder_access_update_v3(params, folder_access_updates=[ad])
result = parse_folder_access_result(response, folder_uid, identifier_label,
Expand Down
Loading
Loading