Unverified Commit 0604dd80 authored by Birte Kristina Friesel's avatar Birte Kristina Friesel
Browse files

WIP: HAFAS support

parent 944688cf
Loading
Loading
Loading
Loading
+191 −9
Original line number Diff line number Diff line
@@ -440,20 +440,15 @@ sub startup {
			my $db       = $opt{db}  // $self->pg->db;
			my $hafas;

			if ( $train_id =~ m{[|]} ) {
				$hafas = 1;
			}

			if ($hafas) {
				return Mojo::Promise->reject(
					'HAFAS checkins are not supported yet, sorry');
			}

			my $user = $self->get_user_status( $uid, $db );
			if ( $user->{checked_in} or $user->{cancelled} ) {
				return Mojo::Promise->reject('You are already checked in');
			}

			if ( $train_id =~ m{[|]} ) {
				return $self->_checkin_hafas_p(%opt);
			}

			my $promise = Mojo::Promise->new;

			$self->iris->get_departures_p(
@@ -515,6 +510,73 @@ sub startup {
		}
	);

	$self->helper(
		'_checkin_hafas_p' => sub {
			my ( $self, %opt ) = @_;

			my $station  = $opt{station};
			my $train_id = $opt{train_id};
			my $uid      = $opt{uid} // $self->current_user->{id};
			my $db       = $opt{db}  // $self->pg->db;
			my $hafas;

			my $promise = Mojo::Promise->new;

			$self->hafas->get_journey_p( trip_id => $train_id )->then(
				sub {
					my ($journey) = @_;
					my $found;
					for my $stop ( $journey->route ) {
						if ( $stop->eva == $station ) {
							$found = $stop;
							last;
						}
					}
					if ( not $found ) {
						$promise->reject(
							"Did not find journey $train_id at $station");
						return;
					}
					for my $stop ( $journey->route ) {
						$self->stations->add_or_update(
							stop => $stop,
							db   => $db,
						);
					}
					eval {
						$self->in_transit->add(
							uid     => $uid,
							db      => $db,
							journey => $journey,
							stop    => $found,
						);
					};
					if ($@) {
						$self->app->log->error(
							"Checkin($uid): INSERT failed: $@");
						$promise->reject( 'INSERT failed: ' . $@ );
						return;
					}
					$self->in_transit->update_data(
						uid  => $uid,
						db   => $db,
						data => { trip_id => $journey->id }
					);

					$promise->resolve($journey);
				}
			)->catch(
				sub {
					my ($err) = @_;
					$promise->reject($err);
					return;
				}
			)->wait;

			return $promise;
		}
	);

	$self->helper(
		'undo' => sub {
			my ( $self, $journey_id, $uid ) = @_;
@@ -638,6 +700,10 @@ sub startup {
				return $promise->resolve( 0, 'race condition' );
			}

			if ( $train_id =~ m{[|]} ) {
				return $self->_checkout_hafas_p(%opt);
			}

			my $now     = DateTime->now( time_zone => 'Europe/Berlin' );
			my $journey = $self->in_transit->get(
				uid       => $uid,
@@ -873,6 +939,122 @@ sub startup {
		}
	);

	$self->helper(
		'_checkout_hafas_p' => sub {
			my ( $self, %opt ) = @_;

			my $station = $opt{station};
			my $force   = $opt{force};
			my $uid     = $opt{uid} // $self->current_user->{id};
			my $db      = $opt{db}  // $self->pg->db;

			my $promise = Mojo::Promise->new;

			my $now     = DateTime->now( time_zone => 'Europe/Berlin' );
			my $journey = $self->in_transit->get(
				uid             => $uid,
				with_data       => 1,
				with_timestamps => 1,
				with_visibility => 1,
				postprocess     => 1,
			);

			# with_visibility needed due to postprocess

			my $found;
			my $has_arrived;
			for my $stop ( @{ $journey->{route_after} } ) {
				if ( $station eq $stop->[0] or $station eq $stop->[1] ) {
					$found = 1;
					$self->in_transit->set_arrival_eva(
						uid         => $uid,
						db          => $db,
						arrival_eva => $stop->[1],
					);
					if ( defined $journey->{checkout_station_id}
						and $journey->{checkout_station_id} != $stop->{eva} )
					{
						$self->in_transit->unset_arrival_data(
							uid => $uid,
							db  => $db
						);
					}
					$self->in_transit->set_arrival_times(
						uid           => $uid,
						db            => $db,
						sched_arrival => $stop->[2]{sched_arr},
						rt_arrival    =>
						  ( $stop->[2]{rt_arr} || $stop->[2]{sched_arr} )
					);
					if (
						$now > ( $stop->[2]{rt_arr} || $stop->[2]{sched_arr} ) )
					{
						$has_arrived = 1;
					}
					last;
				}
			}
			if ( not $found ) {
				return $promise->resolve( 1, 'station not found in route' );
			}

			eval {
				my $tx;
				if ( not $opt{in_transaction} ) {
					$tx = $db->begin;
				}

				if ( $has_arrived or $force ) {
					$journey = $self->in_transit->get(
						uid => $uid,
						db  => $db
					);
					$self->journeys->add_from_in_transit(
						db      => $db,
						journey => $journey
					);
					$self->in_transit->delete(
						uid => $uid,
						db  => $db
					);

					my $cache_ts = $now->clone;
					if ( $journey->{real_departure}
						=~ m{ ^ (?<year> \d{4} ) - (?<month> \d{2} ) }x )
					{
						$cache_ts->set(
							year  => $+{year},
							month => $+{month}
						);
					}
					$self->journey_stats_cache->invalidate(
						ts  => $cache_ts,
						db  => $db,
						uid => $uid
					);
				}

				$tx->commit;
			};

			if ($@) {
				$self->app->log->error("Checkout($uid): $@");
				return $promise->resolve( 1, 'Checkout error: ' . $@ );
			}

			if ( $has_arrived or $force ) {
				if ( not $opt{in_transaction} ) {
					$self->run_hook( $uid, 'checkout' );
				}
				return $promise->resolve( 0, undef );
			}
			if ( not $opt{in_transaction} ) {
				$self->run_hook( $uid, 'update' );
			}
			return $promise->resolve( 1, undef );
		}
	);

	# This helper should only be called directly when also providing a user ID.
	# If you don't have one, use current_user() instead (get_user_data will
	# delegate to it anyways).
+66 −2
Original line number Diff line number Diff line
@@ -37,6 +37,70 @@ sub run {
		my $arr      = $entry->{arr_eva};
		my $train_id = $entry->{train_id};

		if ( $train_id =~ m{[|]} ) {

			$self->app->hafas->get_journey_p( trip_id => $train_id )->then(
				sub {
					my ($journey) = @_;

					my $found_dep;
					my $found_arr;
					for my $stop ( $journey->route ) {
						if ( $stop->eva == $dep ) {
							$found_dep = $stop;
						}
						if ( $arr and $stop->eva == $arr ) {
							$found_arr = $stop;
							last;
						}
					}
					if ( not $found_dep ) {
						return Mojo::Promise->reject(
							"Did not find $dep within journey $train_id");
					}

					if ( $found_dep->{rt_dep} ) {
						$self->app->in_transit->update_departure_hafas(
							uid     => $uid,
							journey => $journey,
							stop    => $found_dep,
							dep_eva => $dep,
							arr_eva => $arr
						);
					}

					if ( $found_arr and $found_arr->{rt_arr} ) {
						$self->app->in_transit->update_arrival_hafas(
							uid     => $uid,
							journey => $journey,
							stop    => $found_arr,
							dep_eva => $dep,
							arr_eva => $arr
						);
					}
				}
			)->catch(
				sub {
					my ($err) = @_;
					$self->app->log->error("work($uid)/journey: $err");
				}
			)->wait;

			if (    $arr
				and $entry->{real_arr_ts}
				and $now->epoch - $entry->{real_arr_ts} > 600 )
			{
				$self->app->checkout_p(
					station => $arr,
					force   => 2,
					dep_eva => $dep,
					arr_eva => $arr,
					uid     => $uid
				)->wait;
			}
			next;
		}

		# Note: IRIS data is not always updated in real-time. Both departure and
		# arrival delays may take several minutes to appear, especially in case
		# of large-scale disturbances. We work around this by continuing to
@@ -183,7 +247,7 @@ sub run {
				)->catch(
					sub {
						my ($error) = @_;
						$self->app->log->error("work($uid)/arrival: $@");
						$self->app->log->error("work($uid)/arrival: $error");
						$errors += 1;
					}
				)->wait;
@@ -194,7 +258,7 @@ sub run {
			$errors += 1;
		}

		eval { }
		eval { };
	}

	my $started_at       = $now;
+7 −2
Original line number Diff line number Diff line
@@ -747,8 +747,13 @@ sub travel_action {
		else {
			my $redir = '/';
			if ( $status->{checked_in} or $status->{cancelled} ) {
				if ( $status->{dep_ds100} ) {
					$redir = '/s/' . $status->{dep_ds100};
				}
				else {
					$redir = '/s/' . $status->{dep_eva} . '?hafas=1';
				}
			}
			$self->render(
				json => {
					success     => 1,
@@ -880,7 +885,7 @@ sub station {
	if ($use_hafas) {
		$promise = $self->hafas->get_departures_p(
			eva        => $station,
			lookbehind => 120,
			lookbehind => 30,
			lookahead  => 30,
		);
	}
+37 −1
Original line number Diff line number Diff line
@@ -98,6 +98,43 @@ sub get_departures_p {
	);
}

sub get_journey_p {
	my ( $self, %opt ) = @_;

	my $promise = Mojo::Promise->new;
	my $now     = DateTime->now( time_zone => 'Europe/Berlin' );

	Travel::Status::DE::HAFAS->new_p(
		journey => {
			id => $opt{trip_id},
		},
		with_polyline => 0,
		cache         => $self->{realtime_cache},
		promise       => 'Mojo::Promise',
		user_agent    => $self->{user_agent}->request_timeout(10),
	)->then(
		sub {
			my ($hafas) = @_;
			my $journey = $hafas->result;

			if ($journey) {
				$promise->resolve($journey);
				return;
			}
			$promise->reject('no journey');
			return;
		}
	)->catch(
		sub {
			my ($err) = @_;
			$promise->reject($err);
			return;
		}
	)->wait;

	return $promise;
}

sub get_route_timestamps_p {
	my ( $self, %opt ) = @_;

@@ -133,7 +170,6 @@ sub get_route_timestamps_p {
					rt_dep    => _epoch( $stop->{rt_dep} ),
					arr_delay => $stop->{arr_delay},
					dep_delay => $stop->{dep_delay},
					eva       => $stop->{eva},
					load      => $stop->{load}
				};
				if (    ( $stop->{arr_cancelled} or not $stop->{sched_arr} )
+162 −21
Original line number Diff line number Diff line
@@ -27,6 +27,12 @@ my %visibility_atoi = (
	private   => 10,
);

sub _epoch {
	my ($dt) = @_;

	return $dt ? $dt->epoch : 0;
}

sub epoch_to_dt {
	my ($epoch) = @_;

@@ -78,11 +84,14 @@ sub add {
	my $uid                = $opt{uid};
	my $db                 = $opt{db} // $self->{pg}->db;
	my $train              = $opt{train};
	my $journey            = $opt{journey};
	my $stop               = $opt{stop};
	my $checkin_station_id = $opt{departure_eva};
	my $route              = $opt{route};

	my $json = JSON->new;

	if ($train) {
		$db->insert(
			'in_transit',
			{
@@ -106,6 +115,50 @@ sub add {
			}
		);
	}
	elsif ( $journey and $stop ) {
		my @route;
		for my $j_stop ( $journey->route ) {
			push(
				@route,
				[
					$j_stop->name,
					$j_stop->eva,
					{
						sched_arr => _epoch( $j_stop->{sched_arr} ),
						sched_dep => _epoch( $j_stop->{sched_dep} ),
						rt_arr    => _epoch( $j_stop->{rt_arr} ),
						rt_dep    => _epoch( $j_stop->{rt_dep} ),
						arr_delay => $j_stop->{arr_delay},
						dep_delay => $j_stop->{dep_delay},
						load      => $j_stop->{load}
					}
				]
			);
		}
		$db->insert(
			'in_transit',
			{
				user_id   => $uid,
				cancelled => $stop->{dep_cancelled}
				? 1
				: 0,
				checkin_station_id => $stop->eva,
				checkin_time => DateTime->now( time_zone => 'Europe/Berlin' ),
				dep_platform => $stop->{platform},
				train_type   => $journey->type,
				train_line   => $journey->line_no,
				train_no     => $journey->number // q{},
				train_id     => $journey->id,
				sched_departure => $stop->{sched_dep},
				real_departure  => $stop->{rt_dep} // $stop->{sched_dep},
				route           => $json->encode( [@route] ),
			}
		);
	}
	else {
		die('neither train nor journey specified');
	}
}

sub add_from_journey {
	my ( $self, %opt ) = @_;
@@ -576,6 +629,33 @@ sub update_departure_cancelled {
	return $rows;
}

sub update_departure_hafas {
	my ( $self, %opt ) = @_;
	my $uid     = $opt{uid};
	my $db      = $opt{db} // $self->{pg}->db;
	my $dep_eva = $opt{dep_eva};
	my $arr_eva = $opt{arr_eva};
	my $journey = $opt{journey};
	my $stop    = $opt{stop};
	my $json    = JSON->new;

	# selecting on user_id and train_no avoids a race condition if a user checks
	# into a new train while we are fetching data for their previous journey. In
	# this case, the new train would receive data from the previous journey.
	$db->update(
		'in_transit',
		{
			real_departure => $stop->{rt_dep},
		},
		{
			user_id             => $uid,
			train_id            => $journey->id,
			checkin_station_id  => $dep_eva,
			checkout_station_id => $arr_eva,
		}
	);
}

sub update_arrival {
	my ( $self, %opt ) = @_;
	my $uid     = $opt{uid};
@@ -618,6 +698,67 @@ sub update_arrival {
	return $rows;
}

sub update_arrival_hafas {
	my ( $self, %opt ) = @_;
	my $uid     = $opt{uid};
	my $db      = $opt{db} // $self->{pg}->db;
	my $dep_eva = $opt{dep_eva};
	my $arr_eva = $opt{arr_eva};
	my $journey = $opt{journey};
	my $stop    = $opt{stop};
	my $json    = JSON->new;

	# TODO use old rt data if available
	my @route;
	for my $j_stop ( $journey->route ) {
		push(
			@route,
			[
				$j_stop->name,
				$j_stop->eva,
				{
					sched_arr => _epoch( $j_stop->{sched_arr} ),
					sched_dep => _epoch( $j_stop->{sched_dep} ),
					rt_arr    => _epoch( $j_stop->{rt_arr} ),
					rt_dep    => _epoch( $j_stop->{rt_dep} ),
					arr_delay => $j_stop->{arr_delay},
					dep_delay => $j_stop->{dep_delay},
					load      => $j_stop->{load}
				}
			]
		);
	}

	my $res_h = $db->select( 'in_transit', ['route'], { user_id => $uid } )
	  ->expand->hash;
	my $old_route = $res_h ? $res_h->{route} : [];

	for my $i ( 0 .. $#route ) {
		if ( $old_route->[$i] and $old_route->[$i][1] == $route[$i][1] ) {
			for my $k (qw(rt_arr rt_dep arr_delay dep_delay)) {
				$route[$i][2]{$k} //= $old_route->[$i][2]{$k};
			}
		}
	}

	# selecting on user_id and train_no avoids a race condition if a user checks
	# into a new train while we are fetching data for their previous journey. In
	# this case, the new train would receive data from the previous journey.
	$db->update(
		'in_transit',
		{
			real_arrival => $stop->{rt_arr},
			route        => $json->encode( [@route] ),
		},
		{
			user_id             => $uid,
			train_id            => $journey->id,
			checkin_station_id  => $dep_eva,
			checkout_station_id => $arr_eva,
		}
	);
}

sub update_data {
	my ( $self, %opt ) = @_;

Loading