Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public override async ValueTask When(CancellationToken token)
transactionOffset: 0xBEEF,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.Data,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override async ValueTask When(CancellationToken token)
transactionOffset: 0xBEEF,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.IsCommitted | PrepareFlags.TransactionEnd,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override async ValueTask When(CancellationToken token)
transactionOffset: 0xBEEF,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.SingleWrite,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,11 @@ public Rec(RecType type, int transaction, string streamId, string eventType, Dat
Transaction = transaction;
StreamId = streamId;
EventType = eventType ?? string.Empty;
if (timestamp.HasValue && timestamp.Value.Kind != DateTimeKind.Utc)
{
throw new ArgumentException("Scavenge fixture prepare timestamps must be UTC.", nameof(timestamp));
}

TimeStamp = timestamp ?? DateTime.UtcNow;
Version = version;
EventNumber = eventNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private IPrepareLogRecord<TStreamId> CreateRecord(long logPosition, int dataSize
{
return LogRecord.Prepare(_recordFactory, logPosition, Guid.NewGuid(), Guid.NewGuid(), 0, 0, _streamId, 1,
PrepareFlags.None, _eventTypeId, new byte[dataSize], Array.Empty<byte>(),
new DateTime(2000, 1, 1, 12, 0, 0));
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
}

private async ValueTask<TFChunk> CreateChunk(int numEvents, bool completed, bool scavenged,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public override async Task SetUp()
var eventTypeId = LogFormatHelper<TLogFormat, TStreamId>.EventTypeId;

var record = LogRecord.Prepare(recordFactory, 15556, _corrId, _eventId, 15556, 0, streamId, 1,
PrepareFlags.None, eventTypeId, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
_chunk = await TFChunkHelper.CreateNewChunk(Filename, 20);
_written = (await _chunk.TryAppend(record, CancellationToken.None)).Success;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public override async Task TestFixtureSetUp()
var streamId = LogFormatHelper<TLogFormat, TStreamId>.StreamId;
var eventTypeId = LogFormatHelper<TLogFormat, TStreamId>.EventTypeId;
_record = LogRecord.Prepare(recordFactory, 0, _corrId, _eventId, 0, 0, streamId, 1,
PrepareFlags.None, eventTypeId, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
_chunk = await TFChunkHelper.CreateNewChunk(Filename);
_result = await _chunk.TryAppend(_record, CancellationToken.None);
await _chunk.Flush(CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public override async Task TestFixtureSetUp()
var eventTypeId = LogFormatHelper<TLogFormat, TStreamId>.EventTypeId;

_record = LogRecord.Prepare(recordFactory, 0, _corrId, _eventId, 0, 0, streamId, 1,
PrepareFlags.None, eventTypeId, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
_chunk = await TFChunkHelper.CreateNewChunk(Filename);
_result = await _chunk.TryAppend(_record, CancellationToken.None);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public async Task open_starts_from_the_latest_chaser_checkpoint()
transactionOffset: 0,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -138,7 +138,7 @@ public async Task try_read_returns_record_when_writerchecksum_ahead()
transactionOffset: 0,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -195,7 +195,7 @@ public async Task try_read_returns_record_when_record_bigger_than_internal_buffe
transactionOffset: 0,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[9000],
Expand Down Expand Up @@ -239,7 +239,7 @@ public async Task try_read_returns_record_when_writerchecksum_equal()
transactionOffset: 0,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -274,7 +274,7 @@ public void try_read_returns_false_when_writer_checksum_is_ahead_but_not_enough_
transactionPosition: 0,
eventStreamId: "WorldEnding",
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: "type",
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -309,7 +309,7 @@ public void try_read_returns_false_when_writer_checksum_is_ahead_but_not_enough_
transactionPosition: 0,
eventStreamId: "WorldEnding",
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: "type",
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -352,7 +352,7 @@ public void try_read_returns_properly_when_writer_is_written_to_while_chasing()
transactionPosition: 0,
eventStreamId: "WorldEnding",
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: "type",
data: new byte[] { 1, 2, 3, 4, 5 },
Expand All @@ -377,7 +377,7 @@ public void try_read_returns_properly_when_writer_is_written_to_while_chasing()
transactionPosition: 0,
eventStreamId: "WorldEnding",
expectedVersion: 4321,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: "type",
data: new byte[] { 3, 2, 1 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static IPrepareLogRecord<TStreamId> CreateRecord()
transactionOffset: 0,
eventStreamId: streamId,
expectedVersion: -1,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.SingleWrite,
eventType: eventTypeId,
data: new byte[123],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public override async Task TestFixtureSetUp()
var eventTypeId = LogFormatHelper<TLogFormat, TStreamId>.EventTypeId;

_record = LogRecord.Prepare(recordFactory, 0, _corrId, _eventId, 0, 0, streamId, 1,
PrepareFlags.None, eventTypeId, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
_chunk = await TFChunkHelper.CreateNewChunk(Filename);
_result = await _chunk.TryAppend(_record, CancellationToken.None);
await _chunk.Flush(CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public override async Task TestFixtureSetUp()
var eventTypeId = LogFormatHelper<TLogFormat, TStreamId>.EventTypeId;

_record = LogRecord.Prepare(recordFactory, 0, _corrId, _eventId, 0, 0, streamId, 1,
PrepareFlags.None, eventTypeId, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
_chunk = await TFChunkHelper.CreateNewChunk(Filename);
_result = await _chunk.TryAppend(_record, CancellationToken.None);
await _chunk.Flush(CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task a_record_can_be_written()
transactionOffset: 0,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task a_record_can_be_written()
transactionPos: 0,
transactionOffset: 0,
eventStreamId: streamId,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async Task a_record_can_be_written()
transactionOffset: 543,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.SingleWrite,
eventType: eventTypeId,
data: bytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public async Task a_record_is_not_written_at_first_but_written_on_second_try()
transactionPos: 0,
transactionOffset: 0,
eventStreamId: streamId,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand All @@ -72,7 +72,7 @@ public async Task a_record_is_not_written_at_first_but_written_on_second_try()
transactionPos: pos,
transactionOffset: 0,
eventStreamId: streamId,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand All @@ -89,7 +89,7 @@ public async Task a_record_is_not_written_at_first_but_written_on_second_try()
transactionPos: pos,
transactionOffset: 0,
eventStreamId: streamId,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.None,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ public override async Task TestFixtureSetUp()
var eventTypeId2 = LogFormatHelper<TLogFormat, TStreamId>.EventTypeId2;

_prepare1 = LogRecord.Prepare(recordFactory, 0, _corrId, _eventId, 0, 0, streamId1, 1,
PrepareFlags.None, eventTypeId1, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId1, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
var r1 = await _chunk.TryAppend(_prepare1, CancellationToken.None);
_written1 = r1.Success;
_position1 = r1.OldPosition;

_prepare2 = LogRecord.Prepare(recordFactory, r1.NewPosition, _corrId, _eventId, 0, 0, streamId2, 2,
PrepareFlags.None, eventTypeId2, new byte[12], new byte[15], new DateTime(2000, 1, 1, 12, 0, 0));
PrepareFlags.None, eventTypeId2, new byte[12], new byte[15],
new DateTime(2000, 1, 1, 12, 0, 0, DateTimeKind.Utc));
var r2 = await _chunk.TryAppend(_prepare2, CancellationToken.None);
_written2 = r2.Success;
_position2 = r2.OldPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public async Task SetUp()
transactionOffset: 0xBEEF,
eventStreamId: streamId,
expectedVersion: 1234,
timeStamp: new DateTime(2012, 12, 21),
timeStamp: new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc),
flags: PrepareFlags.SingleWrite,
eventType: eventTypeId,
data: new byte[] { 1, 2, 3, 4, 5 },
Expand Down Expand Up @@ -86,7 +86,7 @@ public async Task the_data_is_written()
Assert.AreEqual(p.EventId, _eventId);
Assert.AreEqual(p.EventStreamId, streamId);
Assert.AreEqual(p.ExpectedVersion, 1234);
Assert.That(p.TimeStamp, Is.EqualTo(new DateTime(2012, 12, 21)).Within(7).Milliseconds);
Assert.That(p.TimeStamp, Is.EqualTo(new DateTime(2012, 12, 21, 0, 0, 0, DateTimeKind.Utc)).Within(7).Milliseconds);
Assert.AreEqual(p.Flags, PrepareFlags.SingleWrite);
Assert.AreEqual(p.EventType, eventTypeId);
Assert.AreEqual(p.Data.Length, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ public class StreamMetadatas
new StreamMetadata(truncateBefore: EventNumber.DeletedStream);

public static DateTime EffectiveNow { get; } = new DateTime(2022, 1, 5, 00, 00, 00);
public static DateTime Expired { get; } = EffectiveNow - TimeSpan.FromDays(3);
public static DateTime Cutoff { get; } = EffectiveNow - MaxAgeTimeSpan;
public static DateTime Active { get; } = EffectiveNow - TimeSpan.FromDays(1);
private static DateTime EffectiveNowRecordTimestamp { get; } = new DateTime(2022, 1, 5, 00, 00, 00, DateTimeKind.Utc);
public static DateTime Expired { get; } = EffectiveNowRecordTimestamp - TimeSpan.FromDays(3);
public static DateTime Cutoff { get; } = EffectiveNowRecordTimestamp - MaxAgeTimeSpan;
public static DateTime Active { get; } = EffectiveNowRecordTimestamp - TimeSpan.FromDays(1);

public static Rec ScavengePointRec(int transaction, int threshold = 0, DateTime? timeStamp = null) => Rec.Write(
transaction: transaction,
stream: SystemStreams.ScavengePointsStream,
eventType: SystemEventTypes.ScavengePoint,
timestamp: timeStamp ?? EffectiveNow,
timestamp: timeStamp ?? EffectiveNowRecordTimestamp,
data: new ScavengePointPayload
{
Threshold = threshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ public void should_have_correct_properties()
Assert.Equal(Version, _prepare.Version);
}

[Fact]
public void constructor_should_reject_non_utc_timestamp()
{
Assert.Throws<ArgumentException>(() => new PrepareLogRecord(
LogPosition,
_correlationId,
_eventId,
TransactionPosition,
TransactionOffset,
EventStreamId,
null,
ExpectedVersion,
DateTime.Now,
Flags,
EventType,
null,
_data,
_metadata,
Version));
}

[Fact]
public void parsed_record_should_preserve_utc_timestamp_kind()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void Write(string eventType, ReadOnlyMemory<byte> data)
eventStreamId: _streamName,
eventStreamIdSize: null,
expectedVersion: _eventNumber - 1,
timeStamp: DateTime.Now,
timeStamp: DateTime.UtcNow,
flags: Flags,
eventType: eventType,
eventTypeSize: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public PrepareLogRecord(long logPosition,
throw new ArgumentOutOfRangeException("expectedVersion");
}

if (timeStamp.Kind != DateTimeKind.Utc)
{
throw new ArgumentException("Prepare log record timestamps must be UTC.", nameof(timeStamp));
}

Flags = flags;
TransactionPosition = transactionPosition;
TransactionOffset = transactionOffset;
Expand Down
Loading